🐈

japanese-stablelmを低コストEC2でaccelerateとpeft使ってQlora並列学習させてみるコード

2023/08/11に公開

概要

stabilityaiさんからでた日本語7BモデルのQLora学習コードメモ

環境

AWS EC2 g4dn.12xlage
Deep Learning AMI GPU PyTorch 2.0.0 (Amazon Linux 2) 20230524

Lora学習コード

今回は4bitで学習させたがコード自体はは4/8bit-lora/Full-FTに対応
deepspeedにも対応しているのでメモリが少ないGPUでも学習は回せる(qloraには対応していない)
学習管理がしやすいようにメトリクスはmlflowに保存&ハイパーパラメーター系はyamlに記述

GPT-NeoX-Trainer クラス

importとかとか

GPT_NeoX_Trainer.py
import gc
import os
import json
import psutil
import subprocess
import torch
import pprint
import hydra
import mlflow
import json
from omegaconf import OmegaConf
from accelerate import Accelerator
from accelerate.utils import LoggerType
from accelerate.tracking import GeneralTracker, on_main_process
from typing import Any, Dict, List, Optional, Union
from datasets import load_dataset, Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, get_linear_schedule_with_warmup, set_seed
from peft import LoraConfig, TaskType, get_peft_model, PeftModel, prepare_model_for_kbit_training
from langchain import PromptTemplate
from os.path import join, dirname
from dotenv import load_dotenv
from random_word import RandomWords
from lion_pytorch import Lion

load_dotenv(verbose=True)

dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)

r = RandomWords()

accelerate用のtrackerクラス

GPT_NeoX_Trainer.py
class Custom_Mlflow_Tracker(GeneralTracker):
    name = "mlflow"
    requires_logging_directory = False

    @on_main_process
    def __init__(
        self,
        tracking_uri: str = "http://0.0.0.0:5000/",
        experiment_name: str = None,
        artifact_dir: Optional[Union[str, os.PathLike]] = None,
        tags: Optional[Union[Dict[str, Any], str]] = None,
        nested_run: Optional[bool] = False,
        run_name: Optional[str] = None,
        description: Optional[str] = None,
    ):

        if isinstance(tags, str):
            tags = json.loads(tags)

        mlflow.set_tracking_uri(tracking_uri)
        exps = mlflow.search_experiments(filter_string=f"name = '{experiment_name}'")
        if len(exps) > 0:
            if len(exps) > 1:
                logger.warning("Multiple experiments with the same name found. Using first one.")
            experiment_id = exps[0].experiment_id
        else:
            if artifact_dir == None:
                experiment_id = mlflow.create_experiment(
                name=experiment_name,
                tags=tags,
                )
            else:
                experiment_id = mlflow.create_experiment(
                    name=experiment_name,
                    artifact_location=artifact_dir,
                    tags=tags,
                )

        self.experiment_name = experiment_name
        self.active_run = mlflow.start_run(
            experiment_id=experiment_id,
            run_name=run_name,
            nested=nested_run,
            tags=tags,
            description=description,
        )


    @property
    def tracker(self):
        return self.active_run

    @on_main_process
    def store_init_configuration(self, values: dict):
    
        for name, value in list(values.items()):
            # internally, all values are converted to str in MLflow
            if len(str(value)) > mlflow.utils.validation.MAX_PARAM_VAL_LENGTH:
                del values[name]

        values_list = list(values.items())

        # MLflow cannot log more than 100 values in one go, so we have to split it
        for i in range(0, len(values_list), mlflow.utils.validation.MAX_PARAMS_TAGS_PER_BATCH):
            mlflow.log_params(dict(values_list[i : i + mlflow.utils.validation.MAX_PARAMS_TAGS_PER_BATCH]))


    @on_main_process
    def log(self, values: dict, step: Optional[int]):
        metrics = {}
        for k, v in values.items():
            if isinstance(v, (int, float)):
                metrics[k] = v
        mlflow.log_metrics(metrics, step=step)
        
    
    @on_main_process
    def run_id(self):
        return self.active_run.info.run_id

    @on_main_process
    def finish(self):
        mlflow.end_run()
    

メインの学習用クラス

GPT_NeoX_Trainer.py
class Accelerate_Trainer:

    def __init__(self,train_config):

        # create run name
        self.accelerator = Accelerator()
        if self.accelerator.is_local_main_process:
            with open("r_word.txt", mode='w') as f:
                f.write(r.get_random_word())
        while not os.path.exists("r_word.txt"):
            pass
        with open("r_word.txt") as f:
            self.r_word = f.read()

        # load config file
        self.train_config = train_config
        track_config = {
            "lr" : self.train_config.parameter.lr,
            "optimizer":self.train_config.parameter.optimizer,
            "num_epochs" : self.train_config.parameter.num_epochs,
            "batch_size" : self.train_config.parameter.batch_size,
            "cutoff_len" : self.train_config.parameter.cutoff_len,
            "gradient_accumulation_steps" : self.train_config.parameter.gradient_accumulation_steps,
        }

        with open(self.train_config.template, 'r') as file:
            self.prompt_template = json.load(file)

        # lora hyperparams
        if self.train_config.lora.use:
            self.peft_config = LoraConfig(
                r = int(self.train_config.lora.r),
                lora_alpha = int(self.train_config.lora.alpha),
                target_modules = ['query_key_value'],
                lora_dropout = float(self.train_config.lora.drop_out),
                bias="none",
                task_type=TaskType.CAUSAL_LM,
            )
            track_config.update({
                "lora_r" : self.train_config.lora.r,
                "lora_alpha" : self.train_config.lora.alpha,
                "lora_drop_out" : self.train_config.lora.drop_out,
            })
            self.run_name = "Lora_{:.0e}_{}_{}_{}_{}_{}".format(
                float(self.train_config.parameter.lr),
                int(self.train_config.lora.r),
                int(self.train_config.lora.alpha),
                int(self.train_config.parameter.gradient_accumulation_steps),
                int(float(self.train_config.lora.drop_out) * 100),
                self.r_word
            )
        else:
            track_config.update({
                "drop_out" : self.train_config.parameter.drop_out,
                "weight_decay" : self.train_config.parameter.weight_decay,
            })
            self.run_name = "FFT_{:.0e}_{}_{}_{}".format(
                float(self.train_config.parameter.lr),
                int(self.train_config.parameter.gradient_accumulation_steps),
                int(float(self.train_config.parameter.drop_out) * 100),
                self.r_word
            )

        # load tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(self.train_config.model, use_fast=False)

        # load accelerater
        self.tracker = Custom_Mlflow_Tracker(
            tracking_uri = os.environ.get("TRACKING_URI"),
            experiment_name = self.train_config.tracking.experiment_name,
            #artifact_dir = None,
            tags = OmegaConf.to_object(self.train_config.tracking.tags),
            run_name = self.run_name,
            description = self.train_config.tracking.description,
        )
        self.accelerator = Accelerator(
            gradient_accumulation_steps = self.train_config.parameter.gradient_accumulation_steps,
            log_with = self.tracker
        )

        self.accelerator.init_trackers("project", track_config)

        if self.accelerator.is_local_main_process:
            os.remove('r_word.txt')

        
    def print_config(self):
        # check config
        if self.accelerator.is_local_main_process:
            self.accelerator.print("\n####################### train config #######################\n")
            pprint.pprint(OmegaConf.to_container(self.train_config))
            self.accelerator.print("\n##############################################################\n")

    def get_gpu_info(self, nvidia_smi_path='nvidia-smi', keys=('memory.total', 'memory.used', 'utilization.gpu'), no_units=True):
        nu_opt = '' if not no_units else ',nounits'
        cmd = '%s --query-gpu=%s --format=csv,noheader%s' % (nvidia_smi_path, ','.join(keys), nu_opt)
        output = subprocess.check_output(cmd, shell=True)
        lines = output.decode().split('\n')
        lines = [ line.strip() for line in lines if line.strip() != '' ]

        return [ { k: v for k, v in zip(keys, line.split(', ')) } for line in lines ]

    def tokenize(self, prompt, return_tensors="pt"):
        # there's probably a way to do this with the tokenizer settings
        # but again, gotta move fast
        result = self.tokenizer(
            prompt,
            truncation=True,
            max_length=self.train_config.parameter.cutoff_len,
            padding="max_length",
            return_tensors=return_tensors
        )
        
        if return_tensors == "pt":
            result["labels"] = result["input_ids"].clone()
        else:
            result["labels"] = result["input_ids"].copy()

        return result

    def collate_fn(self,examples):
        return self.tokenizer.pad(examples,max_length=self.train_config.parameter.cutoff_len, padding="max_length",return_tensors="pt")

    # dataset preprocces
    def generate_and_tokenize_prompt(self,data_point):

        train_prompt_template = PromptTemplate(
            input_variables=self.prompt_template[self.train_config.prompt_type]["train"]["input_variables"],
            template=self.prompt_template[self.train_config.prompt_type]["train"]["template"],
        )

        train_prompt = []
        if self.train_config.prompt_type == "text":
            for i in range(len(data_point["text"])):
                train_prompt.append(train_prompt_template.format(text=data_point["text"][i]))
        
        elif self.train_config.prompt_type == "summarize":
            for i in range(len(data_point["text"])):
                train_prompt.append(train_prompt_template.format(text=data_point["text"][i],summarize=data_point["summarize"][i]))

        elif self.train_config.prompt_type == "open-QA":
            for i in range(len(data_point["instruction"])):
                train_prompt.append(train_prompt_template.format(input=data_point["instruction"][i],output=data_point["output"][i]))

        #print(train_prompt)
        return self.tokenize(train_prompt)

    def tokenize_dataset(self, train_dataset):
        if self.accelerator.is_local_main_process:
            self.accelerator.print(train_dataset)

        with self.accelerator.main_process_first():
            processed_train_dataset = train_dataset.shuffle().map(
                self.generate_and_tokenize_prompt,
                batched=True,
                batch_size = self.train_config.parameter.batch_size,
                num_proc=1,
                remove_columns=train_dataset.column_names,
                load_from_cache_file=False,
                desc="Running tokenizer on train_dataset",
            )
      

        self.accelerator.wait_for_everyone()
        self.train_dataloader = DataLoader(processed_train_dataset, shuffle=True, collate_fn=self.collate_fn, batch_size=self.train_config.parameter.batch_size, pin_memory=True)

        return processed_train_dataset

    def train(self):
    # creating model
        if self.train_config.parameter.kbit == 8:
            model = AutoModelForCausalLM.from_pretrained(
                self.train_config.model,
                load_in_8bit=True,
                trust_remote_code=True
            )
            model = prepare_model_for_kbit_training(model,use_gradient_checkpointing=True)
            model.config.use_cache = False

        elif self.train_config.parameter.kbit == 4:
            model = AutoModelForCausalLM.from_pretrained(
                self.train_config.model,
                load_in_4bit=True,
                trust_remote_code=True
            )
            model = prepare_model_for_kbit_training(model,use_gradient_checkpointing=True)
            model.config.use_cache = False

        else:
            model = AutoModelForCausalLM.from_pretrained(
                self.train_config.model,
                torch_dtype=torch.bfloat16,
                trust_remote_code=True
            )

    
        if self.train_config.lora.use:
            model = get_peft_model(model, self.peft_config)
            model.print_trainable_parameters()

        # optimizer
        if self.train_config.parameter.optimizer == "adamw":
            optimizer = torch.optim.AdamW(model.parameters(), lr=self.train_config.parameter.lr, weight_decay=self.train_config.parameter.weight_decay)
        elif self.train_config.parameter.optimizer == "lion":
            optimizer = Lion(model.parameters(), weight_decay=self.train_config.parameter.weight_decay, lr=self.train_config.parameter.lr,use_triton=True)

        # lr scheduler
        lr_scheduler = get_linear_schedule_with_warmup(
            optimizer=optimizer,
            num_warmup_steps=0,
            num_training_steps=(len(self.train_dataloader) * self.train_config.parameter.num_epochs),
        )

        model, self.train_dataloader, optimizer, lr_scheduler = self.accelerator.prepare(
            model, self.train_dataloader, optimizer, lr_scheduler
        )
        self.accelerator.print(model)

        # check deepspeed zero
        is_ds_zero_3 = False
        if getattr(self.accelerator.state, "deepspeed_plugin", None):
            is_ds_zero_3 = self.accelerator.state.deepspeed_plugin.zero_stage == 3

        # train procces
        total_step = 0
        for epoch in range(1,self.train_config.parameter.num_epochs+1):
            model.train()
            total_loss = 0
            step_loss = 0
            pbar = tqdm(self.train_dataloader)
            pbar.set_description(f'[Epoch {epoch}/{self.train_config.parameter.num_epochs}]')
            for step, batch in enumerate(pbar):
                with self.accelerator.accumulate(model):
                    outputs = model(**batch)
                    loss = outputs.loss
                    total_loss += self.accelerator.gather(loss.detach().float())
                    step_loss = self.accelerator.gather(loss.detach().float())
                    self.accelerator.backward(loss)
                    optimizer.step()
                    lr_scheduler.step()
                    optimizer.zero_grad()
                    if self.accelerator.is_local_main_process:
                        step_loss_avg = torch.mean(input=step_loss)
                        step_ppl_avg = torch.exp(step_loss_avg)         
                        gpu_utilize = 0
                        memory_total = 0 
                        memory_used = 0
                        for gpu_info in self.get_gpu_info():
                            gpu_utilize += int(gpu_info["utilization.gpu"])
                            memory_total += int(gpu_info["memory.total"]) 
                            memory_used += int(gpu_info["memory.used"])
                        self.accelerator.log({
                            "loss" : step_loss_avg.item(),
                            "ppl" : step_ppl_avg.item(),
                            "gpu_utilize" : gpu_utilize/len(gpu_info),
                            "memory_utilize" : (memory_used/memory_total)*100,
                            "memory_used" : memory_used/1000
                        }, step = total_step)
                    total_step += 1

            train_epoch_loss = torch.mean(input=total_loss)/ len(self.train_dataloader)
            train_ppl = torch.exp(train_epoch_loss)
            self.accelerator.print(f"{epoch=}: {train_ppl=} {train_epoch_loss=}")

            # save checkpoint
            self.accelerator.wait_for_everyone()
            unwrapped_model = self.accelerator.unwrap_model(model)
            unwrapped_model.save_pretrained(self.train_config.output_dir+"/"+self.run_name+"/checkpoint-{}".format(epoch), is_main_process=self.accelerator.is_main_process, save_function=self.accelerator.save, state_dict=self.accelerator.get_state_dict(model))
            if self.accelerator.is_local_main_process and not self.train_config.lora.use:
                self.tokenizer.save_pretrained(self.train_config.output_dir+"/"+self.run_name+"/checkpoint-{}".format(epoch))

        self.accelerator.end_training()

学習コード

train.py
from GPT_NeoX_Trainer import Accelerate_Trainer
from datasets import load_dataset, Dataset, load_from_disk
import hydra

@hydra.main(config_name="train_config")
def main(cfg):
    
    dataset = load_from_disk(cfg.dataset)

    stablelm_trainer = Accelerate_Trainer(cfg)
    stablelm_trainer.print_config()
    stablelm_trainer.tokenize_dataset(dataset)
    stablelm_trainer.train()

if __name__ == "__main__":
    main()

prompt_template.json

{
    "text": {
        "train": {
            "input_variables": [
                "text"
            ],
            "template": "{text}"
        },
        "inf": {
            "input_variables": [
                "text"
            ],
            "template": "{text}"
        }
    },
    "summarize": {
        "train": {
            "input_variables": [
                "text",
                "summarize"
            ],
            "template": "ユーザー:以下の文章を要約してください\n###文章:{text}\nシステム:###要約:{summarize}"
        },
        "inf": {
            "input_variables": [
                "text"
            ],
            "template": "ユーザー:以下の文章を要約してください\n###文章:{text}\nシステム:###要約:"
        }
    },
    "open-QA": {
        "train": {
            "input_variables": [
                "input",
                "output"
            ],
            "template": "ユーザー:{input}\nシステム:{output}"
        },
        "inf": {
            "input_variables": [
                "text"
            ],
            "template": "ユーザー:{input}\n"
        }
    },
}

train-config

train_config.yaml
dataset: dataset_dir
model: model_dir
output_dir: output_dir
template: template_dir
parameter:
  lr: 3e-4
  optimizer : adamw
  weight_decay: 1e-2
  num_epochs: 10
  batch_size: 2
  cutoff_len: 256
  gradient_accumulation_steps: 1
  drop_out: 0
  kbit: 4
lora:
  use: true
  r: 8
  alpha: 16
  drop_out: 0.05
prompt_type: open-QA
tracking:
  experiment_name: stablelm-qlora
  description: stablelm

学習コード実行

$ mlflow server --host 0.0.0.0
$ accelerate launch train.py

Discussion