🐈
japanese-stablelmを低コストEC2でaccelerateとpeft使ってQlora並列学習させてみるコード
概要
stabilityaiさんからでた日本語7BモデルのQLora学習コードメモ
環境
AWS EC2 g4dn.12xlage
Deep Learning AMI GPU PyTorch 2.0.0 (Amazon Linux 2) 20230524
Lora学習コード
今回は4bitで学習させたがコード自体はは4/8bit-lora/Full-FTに対応
deepspeedにも対応しているのでメモリが少ないGPUでも学習は回せる(qloraには対応していない)
学習管理がしやすいようにメトリクスはmlflowに保存&ハイパーパラメーター系はyamlに記述
GPT-NeoX-Trainer クラス
importとかとか
GPT_NeoX_Trainer.py
import gc
import os
import json
import psutil
import subprocess
import torch
import pprint
import hydra
import mlflow
import json
from omegaconf import OmegaConf
from accelerate import Accelerator
from accelerate.utils import LoggerType
from accelerate.tracking import GeneralTracker, on_main_process
from typing import Any, Dict, List, Optional, Union
from datasets import load_dataset, Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, get_linear_schedule_with_warmup, set_seed
from peft import LoraConfig, TaskType, get_peft_model, PeftModel, prepare_model_for_kbit_training
from langchain import PromptTemplate
from os.path import join, dirname
from dotenv import load_dotenv
from random_word import RandomWords
from lion_pytorch import Lion
load_dotenv(verbose=True)
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
r = RandomWords()
accelerate用のtrackerクラス
GPT_NeoX_Trainer.py
class Custom_Mlflow_Tracker(GeneralTracker):
name = "mlflow"
requires_logging_directory = False
@on_main_process
def __init__(
self,
tracking_uri: str = "http://0.0.0.0:5000/",
experiment_name: str = None,
artifact_dir: Optional[Union[str, os.PathLike]] = None,
tags: Optional[Union[Dict[str, Any], str]] = None,
nested_run: Optional[bool] = False,
run_name: Optional[str] = None,
description: Optional[str] = None,
):
if isinstance(tags, str):
tags = json.loads(tags)
mlflow.set_tracking_uri(tracking_uri)
exps = mlflow.search_experiments(filter_string=f"name = '{experiment_name}'")
if len(exps) > 0:
if len(exps) > 1:
logger.warning("Multiple experiments with the same name found. Using first one.")
experiment_id = exps[0].experiment_id
else:
if artifact_dir == None:
experiment_id = mlflow.create_experiment(
name=experiment_name,
tags=tags,
)
else:
experiment_id = mlflow.create_experiment(
name=experiment_name,
artifact_location=artifact_dir,
tags=tags,
)
self.experiment_name = experiment_name
self.active_run = mlflow.start_run(
experiment_id=experiment_id,
run_name=run_name,
nested=nested_run,
tags=tags,
description=description,
)
@property
def tracker(self):
return self.active_run
@on_main_process
def store_init_configuration(self, values: dict):
for name, value in list(values.items()):
# internally, all values are converted to str in MLflow
if len(str(value)) > mlflow.utils.validation.MAX_PARAM_VAL_LENGTH:
del values[name]
values_list = list(values.items())
# MLflow cannot log more than 100 values in one go, so we have to split it
for i in range(0, len(values_list), mlflow.utils.validation.MAX_PARAMS_TAGS_PER_BATCH):
mlflow.log_params(dict(values_list[i : i + mlflow.utils.validation.MAX_PARAMS_TAGS_PER_BATCH]))
@on_main_process
def log(self, values: dict, step: Optional[int]):
metrics = {}
for k, v in values.items():
if isinstance(v, (int, float)):
metrics[k] = v
mlflow.log_metrics(metrics, step=step)
@on_main_process
def run_id(self):
return self.active_run.info.run_id
@on_main_process
def finish(self):
mlflow.end_run()
メインの学習用クラス
GPT_NeoX_Trainer.py
class Accelerate_Trainer:
def __init__(self,train_config):
# create run name
self.accelerator = Accelerator()
if self.accelerator.is_local_main_process:
with open("r_word.txt", mode='w') as f:
f.write(r.get_random_word())
while not os.path.exists("r_word.txt"):
pass
with open("r_word.txt") as f:
self.r_word = f.read()
# load config file
self.train_config = train_config
track_config = {
"lr" : self.train_config.parameter.lr,
"optimizer":self.train_config.parameter.optimizer,
"num_epochs" : self.train_config.parameter.num_epochs,
"batch_size" : self.train_config.parameter.batch_size,
"cutoff_len" : self.train_config.parameter.cutoff_len,
"gradient_accumulation_steps" : self.train_config.parameter.gradient_accumulation_steps,
}
with open(self.train_config.template, 'r') as file:
self.prompt_template = json.load(file)
# lora hyperparams
if self.train_config.lora.use:
self.peft_config = LoraConfig(
r = int(self.train_config.lora.r),
lora_alpha = int(self.train_config.lora.alpha),
target_modules = ['query_key_value'],
lora_dropout = float(self.train_config.lora.drop_out),
bias="none",
task_type=TaskType.CAUSAL_LM,
)
track_config.update({
"lora_r" : self.train_config.lora.r,
"lora_alpha" : self.train_config.lora.alpha,
"lora_drop_out" : self.train_config.lora.drop_out,
})
self.run_name = "Lora_{:.0e}_{}_{}_{}_{}_{}".format(
float(self.train_config.parameter.lr),
int(self.train_config.lora.r),
int(self.train_config.lora.alpha),
int(self.train_config.parameter.gradient_accumulation_steps),
int(float(self.train_config.lora.drop_out) * 100),
self.r_word
)
else:
track_config.update({
"drop_out" : self.train_config.parameter.drop_out,
"weight_decay" : self.train_config.parameter.weight_decay,
})
self.run_name = "FFT_{:.0e}_{}_{}_{}".format(
float(self.train_config.parameter.lr),
int(self.train_config.parameter.gradient_accumulation_steps),
int(float(self.train_config.parameter.drop_out) * 100),
self.r_word
)
# load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.train_config.model, use_fast=False)
# load accelerater
self.tracker = Custom_Mlflow_Tracker(
tracking_uri = os.environ.get("TRACKING_URI"),
experiment_name = self.train_config.tracking.experiment_name,
#artifact_dir = None,
tags = OmegaConf.to_object(self.train_config.tracking.tags),
run_name = self.run_name,
description = self.train_config.tracking.description,
)
self.accelerator = Accelerator(
gradient_accumulation_steps = self.train_config.parameter.gradient_accumulation_steps,
log_with = self.tracker
)
self.accelerator.init_trackers("project", track_config)
if self.accelerator.is_local_main_process:
os.remove('r_word.txt')
def print_config(self):
# check config
if self.accelerator.is_local_main_process:
self.accelerator.print("\n####################### train config #######################\n")
pprint.pprint(OmegaConf.to_container(self.train_config))
self.accelerator.print("\n##############################################################\n")
def get_gpu_info(self, nvidia_smi_path='nvidia-smi', keys=('memory.total', 'memory.used', 'utilization.gpu'), no_units=True):
nu_opt = '' if not no_units else ',nounits'
cmd = '%s --query-gpu=%s --format=csv,noheader%s' % (nvidia_smi_path, ','.join(keys), nu_opt)
output = subprocess.check_output(cmd, shell=True)
lines = output.decode().split('\n')
lines = [ line.strip() for line in lines if line.strip() != '' ]
return [ { k: v for k, v in zip(keys, line.split(', ')) } for line in lines ]
def tokenize(self, prompt, return_tensors="pt"):
# there's probably a way to do this with the tokenizer settings
# but again, gotta move fast
result = self.tokenizer(
prompt,
truncation=True,
max_length=self.train_config.parameter.cutoff_len,
padding="max_length",
return_tensors=return_tensors
)
if return_tensors == "pt":
result["labels"] = result["input_ids"].clone()
else:
result["labels"] = result["input_ids"].copy()
return result
def collate_fn(self,examples):
return self.tokenizer.pad(examples,max_length=self.train_config.parameter.cutoff_len, padding="max_length",return_tensors="pt")
# dataset preprocces
def generate_and_tokenize_prompt(self,data_point):
train_prompt_template = PromptTemplate(
input_variables=self.prompt_template[self.train_config.prompt_type]["train"]["input_variables"],
template=self.prompt_template[self.train_config.prompt_type]["train"]["template"],
)
train_prompt = []
if self.train_config.prompt_type == "text":
for i in range(len(data_point["text"])):
train_prompt.append(train_prompt_template.format(text=data_point["text"][i]))
elif self.train_config.prompt_type == "summarize":
for i in range(len(data_point["text"])):
train_prompt.append(train_prompt_template.format(text=data_point["text"][i],summarize=data_point["summarize"][i]))
elif self.train_config.prompt_type == "open-QA":
for i in range(len(data_point["instruction"])):
train_prompt.append(train_prompt_template.format(input=data_point["instruction"][i],output=data_point["output"][i]))
#print(train_prompt)
return self.tokenize(train_prompt)
def tokenize_dataset(self, train_dataset):
if self.accelerator.is_local_main_process:
self.accelerator.print(train_dataset)
with self.accelerator.main_process_first():
processed_train_dataset = train_dataset.shuffle().map(
self.generate_and_tokenize_prompt,
batched=True,
batch_size = self.train_config.parameter.batch_size,
num_proc=1,
remove_columns=train_dataset.column_names,
load_from_cache_file=False,
desc="Running tokenizer on train_dataset",
)
self.accelerator.wait_for_everyone()
self.train_dataloader = DataLoader(processed_train_dataset, shuffle=True, collate_fn=self.collate_fn, batch_size=self.train_config.parameter.batch_size, pin_memory=True)
return processed_train_dataset
def train(self):
# creating model
if self.train_config.parameter.kbit == 8:
model = AutoModelForCausalLM.from_pretrained(
self.train_config.model,
load_in_8bit=True,
trust_remote_code=True
)
model = prepare_model_for_kbit_training(model,use_gradient_checkpointing=True)
model.config.use_cache = False
elif self.train_config.parameter.kbit == 4:
model = AutoModelForCausalLM.from_pretrained(
self.train_config.model,
load_in_4bit=True,
trust_remote_code=True
)
model = prepare_model_for_kbit_training(model,use_gradient_checkpointing=True)
model.config.use_cache = False
else:
model = AutoModelForCausalLM.from_pretrained(
self.train_config.model,
torch_dtype=torch.bfloat16,
trust_remote_code=True
)
if self.train_config.lora.use:
model = get_peft_model(model, self.peft_config)
model.print_trainable_parameters()
# optimizer
if self.train_config.parameter.optimizer == "adamw":
optimizer = torch.optim.AdamW(model.parameters(), lr=self.train_config.parameter.lr, weight_decay=self.train_config.parameter.weight_decay)
elif self.train_config.parameter.optimizer == "lion":
optimizer = Lion(model.parameters(), weight_decay=self.train_config.parameter.weight_decay, lr=self.train_config.parameter.lr,use_triton=True)
# lr scheduler
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=0,
num_training_steps=(len(self.train_dataloader) * self.train_config.parameter.num_epochs),
)
model, self.train_dataloader, optimizer, lr_scheduler = self.accelerator.prepare(
model, self.train_dataloader, optimizer, lr_scheduler
)
self.accelerator.print(model)
# check deepspeed zero
is_ds_zero_3 = False
if getattr(self.accelerator.state, "deepspeed_plugin", None):
is_ds_zero_3 = self.accelerator.state.deepspeed_plugin.zero_stage == 3
# train procces
total_step = 0
for epoch in range(1,self.train_config.parameter.num_epochs+1):
model.train()
total_loss = 0
step_loss = 0
pbar = tqdm(self.train_dataloader)
pbar.set_description(f'[Epoch {epoch}/{self.train_config.parameter.num_epochs}]')
for step, batch in enumerate(pbar):
with self.accelerator.accumulate(model):
outputs = model(**batch)
loss = outputs.loss
total_loss += self.accelerator.gather(loss.detach().float())
step_loss = self.accelerator.gather(loss.detach().float())
self.accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
if self.accelerator.is_local_main_process:
step_loss_avg = torch.mean(input=step_loss)
step_ppl_avg = torch.exp(step_loss_avg)
gpu_utilize = 0
memory_total = 0
memory_used = 0
for gpu_info in self.get_gpu_info():
gpu_utilize += int(gpu_info["utilization.gpu"])
memory_total += int(gpu_info["memory.total"])
memory_used += int(gpu_info["memory.used"])
self.accelerator.log({
"loss" : step_loss_avg.item(),
"ppl" : step_ppl_avg.item(),
"gpu_utilize" : gpu_utilize/len(gpu_info),
"memory_utilize" : (memory_used/memory_total)*100,
"memory_used" : memory_used/1000
}, step = total_step)
total_step += 1
train_epoch_loss = torch.mean(input=total_loss)/ len(self.train_dataloader)
train_ppl = torch.exp(train_epoch_loss)
self.accelerator.print(f"{epoch=}: {train_ppl=} {train_epoch_loss=}")
# save checkpoint
self.accelerator.wait_for_everyone()
unwrapped_model = self.accelerator.unwrap_model(model)
unwrapped_model.save_pretrained(self.train_config.output_dir+"/"+self.run_name+"/checkpoint-{}".format(epoch), is_main_process=self.accelerator.is_main_process, save_function=self.accelerator.save, state_dict=self.accelerator.get_state_dict(model))
if self.accelerator.is_local_main_process and not self.train_config.lora.use:
self.tokenizer.save_pretrained(self.train_config.output_dir+"/"+self.run_name+"/checkpoint-{}".format(epoch))
self.accelerator.end_training()
学習コード
train.py
from GPT_NeoX_Trainer import Accelerate_Trainer
from datasets import load_dataset, Dataset, load_from_disk
import hydra
@hydra.main(config_name="train_config")
def main(cfg):
dataset = load_from_disk(cfg.dataset)
stablelm_trainer = Accelerate_Trainer(cfg)
stablelm_trainer.print_config()
stablelm_trainer.tokenize_dataset(dataset)
stablelm_trainer.train()
if __name__ == "__main__":
main()
prompt_template.json
{
"text": {
"train": {
"input_variables": [
"text"
],
"template": "{text}"
},
"inf": {
"input_variables": [
"text"
],
"template": "{text}"
}
},
"summarize": {
"train": {
"input_variables": [
"text",
"summarize"
],
"template": "ユーザー:以下の文章を要約してください\n###文章:{text}\nシステム:###要約:{summarize}"
},
"inf": {
"input_variables": [
"text"
],
"template": "ユーザー:以下の文章を要約してください\n###文章:{text}\nシステム:###要約:"
}
},
"open-QA": {
"train": {
"input_variables": [
"input",
"output"
],
"template": "ユーザー:{input}\nシステム:{output}"
},
"inf": {
"input_variables": [
"text"
],
"template": "ユーザー:{input}\n"
}
},
}
train-config
train_config.yaml
dataset: dataset_dir
model: model_dir
output_dir: output_dir
template: template_dir
parameter:
lr: 3e-4
optimizer : adamw
weight_decay: 1e-2
num_epochs: 10
batch_size: 2
cutoff_len: 256
gradient_accumulation_steps: 1
drop_out: 0
kbit: 4
lora:
use: true
r: 8
alpha: 16
drop_out: 0.05
prompt_type: open-QA
tracking:
experiment_name: stablelm-qlora
description: stablelm
学習コード実行
$ mlflow server --host 0.0.0.0
$ accelerate launch train.py
Discussion