学习编写NER模型
在初始化模型时,传入id2label和label2id可以用符合当前任务的分类头替换原来的分类头,如果只传num_labels也是可以的,不过后续模型输出需要自己写个映射。定义TrainingArgumentsargs=args,
准备数据
使用CoNLL-2003数据集,包含路透社的新闻报道
from datasets import load_dataset
raw_dataset = load_dataset("conll2003")
加载完了就可以看一看数据集raw_dataset
以及raw_dataset['train'][0]``raw_dataset['train'].features
等
解码标签
words = raw_datasets['train'][0]['tokens']
labels = raw_datasets['train'][0]['ner_tags']
line1 = ""
line2 = ""
for word,label in zip(words,labels):
full_label = label_names[label]
max_length = max(len(word),len(full_label))
line1 += word + " " * (max_length-len(word)+1)
line2 += full_label + " " * (max_length-len(full_label)+1)
print(line1)
print(line2)
以上直接转换为DataFrame来理解数据更方便
处理数据
from transformers import AutoTokenizer
model_checkpoint = 'bert-base-cased'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
调用时需注意一下,因为数据集中的’tokens’是已经分词好了的,所以is_split_into_word=True
inputs = tokenizer(raw_datasets['train'][0]['tokens'],is_split_into_words=True)
tokenizer返回的对象可以调用tokens()方法得到分词后的token,调用word_ids()方法可以得到每个词来自于之前的对应词的下标
这样就从token变为了input_ids,token_type_ids,attention_mask的字典
这也引出了问题,因为经过tokenizer处理后,有的单词被分为了两个或多个,导致之前的label无法与分词后的结果一一对应了,故需要写一个函数来匹配
这个函数需要注意的是,因为原来的人名、地名、组织名可能是多个单词组成的,在分词后,单个单词又可能被分为多个子词,对于原来的xx名的首个单词,label是B-‘xxx’,而分词后,只有最前面的分词才是B-,其他分词都是I-。对于新添加的[CLS]和[SEP],label置为-100是为了后面忽略他们的损失
def align_labels_with_tokens(labels,word_ids):
new_labels = []
current_word_id = None
for word_id in word_ids:
if word_id != current_word_id: #遍历到下一个单词了
current_word_id = word_id
new_labels.append(labels[word_id] if word_id != None else -100)
elif word_id == None:
new_labels.append(-100) #应对开始的[CLS],因为初始current_word_id设置为了None
else: #遇见同一个单词的第n(n>=2)个子词了
if labels[word_id]%2 == 1: #说明是B-类型单词的子词
new_labels.append(labels[word_id]+1) #+1原因见下图
else: #I-类型单词的子词
new_labels.append(labels[word_id])
return new_labels
然后是编写用于批量处理的函数
def tokenize_and_align_labels(examples):
tokenized_examples = tokenizer(examples['tokens'], is_split_into_words=True)
all_labels = examples['ner_tags']
new_labels = []
for i,labels in enumerate(all_labels):
word_ids = tokenized_examples.word_ids(i)
new_labels.append(align_labels_with_tokens(labels, word_ids))
tokenized_examples['labels'] = new_labels
return tokenized_examples
然后是对整个数据集运用刚刚编写的函数
tokenized_datasets = raw_datasets.map(tokenized_and_align_labels,batched=True,remove_columns=raw_datasets['train'].column_names)
用Trainer API对模型进行微调
首先要进行数据整理,这次不能用DataCollatorWithPadding来填充输入,因为它只会填充input_ids,attention_mask和token_type_ids(这次我们的数据还有labels需要填充),这次需要使用DataCollatorForTokenClassification来填充。
from transformers import DataCollatorForTokenClassification
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)
这里初始化要传入一个tokenizer,因为这个collator需要获取对input_ids、token_type_ids等的填充值,label填充值是自己默认的-100,也还有其他原因,这只是方便记忆要传个tokenizer。
后面调用data_collator处理的时候注意传入的数据是List[Dict],故不能从数据集切片来传入,因为切片得到的是Dict{List}
数据处理好了,就该确定评估指标了。
import evaluate
metric = evaluate.load("seqeval")
import numpy as np
def compute_metrics(eval_preds):
logits,labels = eval_preds
predictions = np.argmax(logits,-1)
true_labels = [[all_labels[l] for l in label if l!=-100] for label in labels]
pred_labels = [[all_labels[p] for p,l in zip(predic,label) if l!=-100] for predict,label in zip(predictions,labels)]
metric.compute(pred_labels,true_labels)
return {
"precision": all_metrics["overall_precision"],
"recall": all_metrics["overall_recall"],
"f1": all_metrics["overall_f1"],
"accuracy": all_metrics["overall_accuracy"],
}
定义模型
id2label = {i:label for i,label in enumerate(label_names)}
label2id = {v:k for k,v in id2label.items()}
在初始化模型时,传入id2label和label2id可以用符合当前任务的分类头替换原来的分类头,如果只传num_labels也是可以的,不过后续模型输出需要自己写个映射。
from transformers import AutoModelForTokenClassification
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint,id2label,label2id)
定义TrainingArguments
from transformers import TrainingArguments
args = TrainingArguments(
"bert-finetued-ner",
evaluation_strategy="epoch",
save_strategy="epoch",
learning_rate=2e-5,
num_train_epochs=3,
weight_decay=0.01
)
from transformer import Trainer
trainer = Trainer(
model=model,
args=args,
train_dataset=tokenized_datasets['train'],
eval_dataset=tokenized_datasets['validation'],
data_collator=data_collator,
compute_metrics=compute_metrics,
processing_class=tokenizer
)
trainer.train()
训练准备
首先要构建DataLoader
from torch.utils.data import DataLoader
train_dataloader = DataLoader(
tokenized_datasets['train'],
shuffle=True,
collate_fn=data_collator,
batch_size=8
)
eval_dataloader = DataLoader(tokenized_datasets['validation'],collate_fn=data_loader,batch_size=8)
实例化模型
model = AutoModelForTokenClassification.from_pretrained(
mdoel_checkpoint,
id2label=id2label,
label2id=label2id
)
选择优化器
from torch.optim import AdamW
optimizer = AdamW(model.parameters(),lr=2e-5)
送入accelerator.prepare()
from accelerate import Accelerator
accelerator = Accelerator()
model,optimizer,train_dataloader,eval_dataloader = accelerator.prepare(
model,optimizer,train_dataloader,eval_dataloader
)
利用train_dataloader的长度来计算训练步数
from transformers import get_scheduler
num_train_epochs = 3
num_update_steps_per_epoch = len(train_dataloader)
num_training_steps = num_train_epochs*num_update_steps_per_epoch
lr_scheduler = get_scheduler(
'linear',
optimizer = optimizer,
num_warmup_steps = 0,
num_training_steps = num_training_steps
)
推送到hugging face
from huggingface_hub import Repository, get_full_repo_name
model_name = "bert-finetuned-ner-accelerate"
repo_name = get_full_repo_name(model_name)
output_dir = "bert-finetuned-ner-accelerate"
repo = Repository(output_dir, clone_from=repo_name)
后处理函数(接受预测和标签,转换为字符串列表)
def postprocess(predictions, labels):
predictions = predictions.detach().cpu().clone().numpy()
labels = labels.detach().cpu().clone().numpy()
# Remove ignored index (special tokens) and convert to labels
true_labels = [[label_names[l] for l in label if l != -100] for label in labels]
true_predictions = [
[label_names[p] for (p, l) in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
return true_labels, true_predictions
训练循环的完整代码:
from tqdm.auto import tqdm
import torch
progress_bar = tqdm(range(num_training_steps))
for epoch in range(num_train_epochs):
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
progress_bar.update(1)
model.eval()
for batch in eval_dataloader:
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
labels = batch['labels']
predictions = accelerator.pad_across_processes(predictions,dim=1,pad_index=-100)
labels = accelerator.pad_across_processes(labels,dim=1,pad_index=-100)
predictions_gathered = accelerator.gather(predictions)
labels_gathered = accelerator.gather(labels)
true_predictions, true_labels = postprocess(predictions_gathered, labels_gathered)
metric.add_batch(predictions=true_predictions, references=true_labels)
results = metric.compute()
print(
f"epoch {epoch}:",
{
key: results[f"overall_{key}"]
for key in ["precision", "recall", "f1", "accuracy"]
},
)
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.save_pretrained(output_dir, save_function=accelerator.save)
if accelerator.is_main_process:
tokenizer.save_pretrained(output_dir)
repo.push_to_hub(
commit_message=f"Training in progress epoch {epoch}", blocking=False
)
更多推荐
所有评论(0)