huggingface transformersでMultitask Text classification
はじめに
研究で2つのText classificationを予測するようなマルチタスク予測をする機会があったのですが、ネット上に参考にする記事が少なかったので、まとめました。
詳細は以下のrepositoryにまとめているので、参照してください。
HuggingfaceのTransformersの利用は初心者なので、間違いや質問があれば気軽に指摘していただければと思います。
使用モジュール
Transformers
言わずとしれたHuggingface によるNLP(自然言語処理)のライブラリです。pytorchやTensorflowに対応しており、Exampleもたくさんあるため、初心者にはとっつきやすいものです。
今回は、Pytorchを用いて実装しました。githubのexample/pytorch/text-classification/run-glue.pyを参考に実装しています。
Datasets
Huggingfaceが提供する、様々なデータソースからデータセットを読み込むことができるツールであり、ローカルにあるファイルの他に、hubにあるデータも引っ張ってきて使うことができます。今回はCSVファイルを読み込んで使っています。
ここがわかりやすいかも
タスク概要
今回は、とある文章から以下の2つのラベルを予測するマルチタスク学習を行いました。
- task1:ポジティブ,普通,ネガティブ
- task2:文章カテゴリ
そのため、以下のようなcsvファイルを作成します。
columnはlabel,sentence,labelsの3つとします。labelsの列は空ですが、これがないとうまく動かなかったので追加しています。
label,sentence,labels
0,今日は温かいですね。
2,えーー。じゃあパスで!
label,sentence,labels
3,今日は温かいですね。
4,えーー。じゃあパスで!
train.csv valid.csvに分割
以下のようなコマンドにより学習データと検証データに分割します。
head -n 60000 task1.csv > train.csv
tail -n 20000 task1.csv > valid.csv
datasets作成
先程作成したCSVデータを読み込んで、Datasets形式にするモジュールを作成します。
タスクデータクラス
まずタスクを見分けるために、TaskのDataclassを定義します。
@dataclass
class Task:
id: int
name: str
type: str
num_labels: int
tokenize
与えられたデータをトークナイズします。
"sentence"をTokenizerを用いてtokenに分割してidに変換します。
また、training data とevaluate dataの別々にDatasets形式にします。
def tokenize_seq_classification_dataset(
tokenizer, raw_datasets, task_id, task_name, data_args, training_args
):
# Padding strategy
if data_args.pad_to_max_length:
padding = "max_length"
else:
# We will pad later, dynamically at batch creation, to the max sequence length in each batch
padding = False
# padding strategy
if data_args.max_seq_length > tokenizer.model_max_length:
logger.warning(
f"The max_seq_length passed ({data_args.max_seq_length}) is larger than the maximum length for the"
f"model ({tokenizer.model_max_length}). Using max_seq_length={tokenizer.model_max_length}."
)
max_seq_length = min(data_args.max_seq_length, tokenizer.model_max_length)
def tokenize_text(examples):
result = tokenizer(
examples["sentence"],
padding=padding,
max_length=max_seq_length,
truncation=True,
)
examples["labels"] = examples.pop("label")
result["task_ids"] = [task_id] * len(examples["labels"])
return result
# トークナイズとパディング
def tokenize_and_pad_text(examples):
result = tokenize_text(examples)
examples["labels"] = [
[l] + [-100] * (max_seq_length - 1) for l in examples["labels"]
]
return result
with training_args.main_process_first(desc="dataset map pre-processing"):
train_dataset = raw_datasets["train"].map(
tokenize_and_pad_text,
batched=True,
load_from_cache_file=not data_args.overwrite_cache,
desc="Running tokenizer on dataset",
)
validation_dataset = raw_datasets["validation"].map(
tokenize_text,
batched=True,
load_from_cache_file=not data_args.overwrite_cache,
desc="Running tokenizer on dataset",
)
return train_dataset, validation_dataset
csvの読み込み
train_path,eval_pathからcsvを読み込む関数を作ります。
def load_seq_classification_dataset(
task_id,
task_name,
tokenizer,
data_args,
training_args,
model_args,
train_path,
eval_path,
num_labels,
):
data_files = {
"train": train_path,
"validation": eval_path,
}
raw_datasets = load_dataset(
"csv", data_files=data_files, cache_dir=model_args.cache_dir
)
train_dataset, validation_dataset = tokenize_seq_classification_dataset(
tokenizer,
raw_datasets,
task_id,
task_name,
data_args,
training_args,
)
task_info = Task(
id=task_id,
name=task_name,
num_labels=num_labels,
type="seq_classification",
)
return (train_dataset, validation_dataset, task_info)
データのマージ
上記で作成した2つのタスクのデータを結合していきます。
validation datasets は学習が終わった際の評価でタスクごとのaccuracyを知りたいので、リストにしておきます。
def load_datasets(tokenizer, data_args, training_args, model_args):
# task1 negaposi
(
train_dataset_task1,
validation_dataset_task1,
task1_info,
) = load_seq_classification_dataset(
0,
"negaposi",
tokenizer,
data_args,
training_args,
model_args,
data_args.train_file_task1,
data_args.validation_file_task1,
3,
)
# task2 category
(
train_dataset_task2,
validation_dataset_task2,
task2_info,
) = load_seq_classification_dataset(
1,
"category",
tokenizer,
data_args,
training_args,
model_args,
data_args.train_file_task2,
data_args.validation_file_task2,
5,
)
# merge train datasets
train_dataset_df = train_dataset_task1.to_pandas().append(
train_dataset_task2.to_pandas()
)
train_dataset = datasets.Dataset.from_pandas(train_dataset_df)
train_dataset.shuffle(seed=123)
# Append validation datasets
validation_dataset = [validation_dataset_task1, validation_dataset_task2]
dataset = datasets.DatasetDict(
{"train": train_dataset, "validation": validation_dataset}
)
tasks = [task1_info, task2_info]
return tasks, dataset
custom model作成
データセットが作り終えたので、次にカスタムモデルを作成していきます。マルチタスクなので、以下のような概念図となります。
結果はsoftmaxなので、argmax()をかけて、最大値があるindexを返して予測ラベルを取得します。この予測ラベルと正解ラベルとの差によってlossを計算して、それぞれの和を全体のlossにします。
Test classification head
事前学習モデルからEncodeされた特徴ベクトルを元に、予測ラベルを生成して、lossの計算をします。
今回は、特徴ベクトルにdropoutを与えて、Linear(全結合層)によって、labelを予測しました。
class ClassificationHead(torch.nn.Module):
def __init__(self, hidden_size: int, num_labels: int, dropout_p=0.1):
super().__init__()
self.num_labels = num_labels
self.dropout = torch.nn.Dropout(dropout_p)
self.classifier = torch.nn.Linear(hidden_size, num_labels)
self._init_weights()
def forward(self, pooled_output, labels=None, **kwargs):
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
# calculate loss
loss = None
if labels is not None:
if labels.dim() != 1:
# remove padding
labels = labels[:, 0]
loss_fct = torch.nn.CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.long().view(-1))
return (logits, loss)
def _init_weights(self):
self.classifier.weight.data.normal_(mean=0.0, std=0.02)
if self.classifier.bias is not None:
self.classifier.bias.data.zero_()
Multi task Model
Pre finetuningモデルからEncodeした特徴ベクトルを2つのTest classification headに入力していきます。
今回は、BERTを使っているので、CLS Tokenの値をを特徴ベクトル(pooler_output)としています。また、学習に用いたDatasetsはTask1,Task2をごちゃまぜにしているので、task_id_filter = task_ids == unique_task_id
によってタスクごとにText Classification headにデータを入力するようにしています。
class MultiTaskModel(torch.nn.Module):
def __init__(self, encoder_name: str, tasks: List[Task]):
super().__init__()
self.encoder = AutoModel.from_pretrained(encoder_name)
self.output_heads = torch.nn.ModuleDict()
self.output_heads[str(tasks[0].id)] = ClassificationHead(
hidden_size=self.encoder.config.hidden_size, num_labels=tasks[0].num_labels
)
self.output_heads[str(tasks[1].id)] = ClassificationHead(
hidden_size=self.encoder.config.hidden_size, num_labels=tasks[1].num_labels
)
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: torch.FloatTensor = None,
token_type_ids: torch.LongTensor = None,
position_id: torch.LongTensor = None,
head_mask: torch.FloatTensor = None,
inputs_embeds: torch.FloatTensor = None,
output_attentions: torch.FloatTensor = None,
labels=None,
task_ids=None,
**kwargs,
):
outputs = self.encoder(
input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_id,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
# encoder_hidden_state=encoder_hidden_state,
output_attentions=output_attentions,
)
pooler_output = outputs["pooler_output"]
# unique_task_ids_list = torch.unique(task_ids).tolist()
unique_task_ids_list = [0, 1]
loss_list = []
logits = []
for unique_task_id in unique_task_ids_list:
task_id_filter = task_ids == unique_task_id
(task_logits, task_loss) = self.output_heads[str(unique_task_id)].forward(
pooler_output[task_id_filter],
labels=None if labels is None else labels[task_id_filter],
)
# logit
if task_logits == []:
logits.append(torch.tensor(0).to(device))
else:
logits.append(task_logits)
# loss
if torch.any(torch.isnan(task_loss)):
loss_list.append(torch.tensor(0).to(device))
else:
loss_list.append(task_loss)
task_logits = []
count1 = 0
count2 = 0
pad = torch.tensor([-100.0, -100.0]).to(device)
for id in task_ids:
if id == 0:
temp = logits[0][count1]
l = torch.cat((temp, pad), 0)
task_logits.append(l)
count1 += 1
elif id == 1:
task_logits.append(logits[1][count2])
count2 += 1
else:
logger.error("例外発生")
sys.exit(1)
loss = torch.stack(loss_list)
task_logits = torch.stack(task_logits)
outputs = (loss.sum(), task_logits, task_ids)
return outputs
compute metrics
評価の際に使うcompute_metrics関数を定義していきます。パラメータは以下のようになっています。
- p.predictions : custom Modelの
output[1:]
の値になります。つまり、loss以外です。 - p.label_ids : 正解ラベルのリスト
accuracy は今回は正解:1.0,不正解:0.0として、すべての平均値としています。
その他にもload_metric("accuracy")
を使ってツールを呼び出して計算することもできます。
def compute_metrics(p: EvalPrediction):
# output
preds = p.predictions[0]
task_ids = p.predictions[1]
unique_task_ids_list = [1, 0]
# true label
label = p.label_ids.astype(int)
# metric
accuracy = []
for unique_task_id in unique_task_ids_list:
task_id_filter = task_ids == unique_task_id
p = preds[task_id_filter]
l = label[task_id_filter]
if len(p) != len(l):
sys.exit(1)
p = np.argmax(p, axis=1)
if len(p) != 0:
result = (p == l).astype(np.float32).mean().item()
accuracy.append(result)
# result
return {
"accuracy_task1": accuracy[0],
"accuracy_task2": accuracy[1],
}
すべてを結合
最終的に、データ作成とカスタムモデルを結合して学習していきます。
def main():
# See all possible arguments in src/transformers/training_args.py
# or by passing the --help flag to this script.
# We now keep distinct sets of args, for a cleaner separation of concerns.
parser = HfArgumentParser(
(ModelArguments, DataTrainingArguments, TrainingArguments)
)
if len(sys.argv) == 2 and sys.argv[1].endswith(".json"):
# If we pass only one argument to the script and it's the path to a json file,
# let's parse it to get our arguments.
model_args, data_args, training_args = parser.parse_json_file(
json_file=os.path.abspath(sys.argv[1])
)
else:
(
model_args,
data_args,
training_args,
) = parser.parse_args_into_dataclasses()
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log_level = logging.DEBUG
logger.setLevel(log_level)
datasets.utils.logging.set_verbosity(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
# Log on each process the small summary:
logger.warning(
f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}"
+ f"distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}"
)
logger.info(f"Training/evaluation parameters {training_args}")
# Detecting last checkpoint.
last_checkpoint = None
if (
os.path.isdir(training_args.output_dir)
and training_args.do_train
and not training_args.overwrite_output_dir
):
last_checkpoint = get_last_checkpoint(training_args.output_dir)
if last_checkpoint is None and len(os.listdir(training_args.output_dir)) > 0:
raise ValueError(
f"Output directory ({training_args.output_dir}) already exists and is not empty. "
"Use --overwrite_output_dir to overcome."
)
elif (
last_checkpoint is not None and training_args.resume_from_checkpoint is None
):
logger.info(
f"Checkpoint detected, resuming training at {last_checkpoint}. To avoid this behavior, change "
"the `--output_dir` or add `--overwrite_output_dir` to train from scratch."
)
# Set seed before initializing model.
set_seed(training_args.seed)
tokenizer = AutoTokenizer.from_pretrained(
model_args.model_name_or_path,
cache_dir=model_args.cache_dir,
use_fast=model_args.use_fast_tokenizer,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
)
# datasetsの作成
tasks, raw_datasets = load_datasets(tokenizer, data_args, training_args, model_args)
# モデルの定義
model = MultiTaskModel(model_args.model_name_or_path, tasks)
# train dataの取得
if training_args.do_train:
if "train" not in raw_datasets:
raise ValueError("--do_train requires a train dataset")
train_dataset = raw_datasets["train"]
if data_args.max_train_samples is not None:
train_dataset = train_dataset.select(range(data_args.max_train_samples))
# validation dataの取得
if training_args.do_eval:
if (
"validation" not in raw_datasets
and "validation_matched" not in raw_datasets
):
raise ValueError("--do_eval requires a validation dataset")
eval_dataset = raw_datasets["validation"]
if data_args.max_eval_samples is not None:
new_ds = []
for ds in eval_dataset:
new_ds.append(ds.select(range(data_args.max_eval_samples)))
eval_dataset = new_ds
# Log a few random samples from the training set:
if training_args.do_train:
for index in random.sample(range(len(train_dataset)), 2):
logger.info(
f"Sample {index}/{len(train_dataset)} of the training set: {train_dataset[index]}."
)
eval_datasets_df = eval_dataset[0].to_pandas().append(eval_dataset[1].to_pandas())
eval_datasets = datasets.Dataset.from_pandas(eval_datasets_df)
eval_datasets.shuffle(seed=123)
# Initialize our Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset if training_args.do_train else None,
eval_dataset=eval_datasets if training_args.do_eval else None,
compute_metrics=compute_metrics,
tokenizer=tokenizer,
callbacks=[EarlyStoppingCallback(early_stopping_patience=5)],
)
# Training
if training_args.do_train:
checkpoint = None
if training_args.resume_from_checkpoint is not None:
checkpoint = training_args.resume_from_checkpoint
elif last_checkpoint is not None:
checkpoint = last_checkpoint
train_result = trainer.train(resume_from_checkpoint=checkpoint)
metrics = train_result.metrics
max_train_samples = (
data_args.max_train_samples
if data_args.max_train_samples is not None
else len(train_dataset)
)
metrics["train_samples"] = min(max_train_samples, len(train_dataset))
trainer.save_model() # Saves the tokenizer too for easy upload
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()
# Evaluation
if training_args.do_eval:
# タスクごとに評価
for eval_d, task in zip(eval_dataset, tasks):
logger.info(f"*** Evaluate of {task.name} ***")
metrics = trainer.evaluate(eval_dataset=eval_d)
max_eval_samples = (
data_args.max_eval_samples
if data_args.max_eval_samples is not None
else len(eval_d)
)
metrics["eval_samples"] = min(max_eval_samples, len(eval_d))
trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)
kwargs = {
"finetuned_from": model_args.model_name_or_path,
"tasks": "text-classification",
}
if training_args.push_to_hub:
trainer.push_to_hub(**kwargs)
else:
trainer.create_model_card(**kwargs)
def _mp_fn(index):
# For xla_spawn (TPUs)
main()
if __name__ == "__main__":
main()
まとめ
結構端折ったところもありますが、以上のコードによりうまく学習できました。
今後はタスクが全く別物(Ex. テキスト分類と応答生成)のマルチタスク学習や入力が2つのマルチモーダル学習をやってみたいなと思います。
また、 KaggleのNLPではTransformerがめっちゃ使われているそうなので、コンペに参加してもっとツヨツヨになりたいですね。
Discussion