🐱

【ML】Machine Learning pipeline Template in PyTorch

2024/07/02に公開

This time, I introduce the template of machine learning pipiline in pytorch.
I envision this used as baseline of machine laerning model when creating it from 0.

note: It doesn't work as is, please modify to fit your dataset and model.

0. Import Library

Import necesary library.

import pandas as pd
import numpy as np
import torch
import torchvision
from torch import nn
from torch import optim
from torch.optim import lr_scheduler
from torch.cuda import amp
from torch.nn import BCELoss
from torch.utils.data import Dataset
from mamba_ssm import Mamba
# etc.

define the config class.

class CFG:
    TRAIN_CLM_PATH = Path('../../data/processed/20000_50per_CLM.parquet')
    TEST_ENC_PATH = Path('../../data/external/test_enc.parquet')
    TRAIN_PATH = Path('../../data/raw/train.parquet')
    TEST_PATH = Path('../../data/raw/test.parquet')
    folds = 5
    max_epoch = 9             # number of max epoch. 1epoch means going around the training dataset.
    batch_size = 32           # batch size. Number of samples passed to the network in one training step
    lr = 1.0e-03              # learning rate. determine step size when updating model's weight
    weight_decay = 1.0e-02    # weight decay. Append regularization term for prevent over fitting
    es_patience = 5           # Timing for early stopping. If there is no improvement within this number of epochs, training will be stopped early.
    seed = 1086               # Random number seed
    deterministic = True      # Enable/disable deterministic behavior. If enabled, the program will produce the same results every time it starts with the same initial conditions and inputs.
    enable_amp = False        # Enable/disable Automatic Mixed Precision. Optimizations for floating point etc.
    device = "cuda"
    # etc.

1. Prepare data

Preparing train/test data.

train = train_data # pd.DataFrame
test = test_data # pd.DataFrame

2. Split fold

Split data into folds. Please refer to sklearn document as you need.

from sklearn.model_selection import KFold

def split_fold(df:pd.DataFrame):
    # config
    N_FOLDS = 5
    RANDAM_SEED = 42
    df['fold'] = -1

    # object
    skf = KFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDAM_SEED)

    for i, (train_index, test_index) in enumerate(skf.split(df)):
        df.loc[test_index, 'fold'] = i
    
    return df

train = split_fold(train)
train.head()

3. Dataset

Define the dataset.

class EXDataset(Dataset):
    def __init__(
        self,
        train: pd.DataFrame,
        label: pd.DataFrame = pd.DataFrame(),
        is_test: bool = False,
        transform = None
    ):
        self.train = train
        self.label = label
        self.is_test = is_test
        self.transform = transform
        
    def __len__(self):
        # return total num of data
        return len(self.train)
    
    def __getitem__(self, index:int):
        # return data and target assosiated with index
        X = self.train.iloc[index]
        X = self._apply_transform(X)
        
        if self.is_test:
            y = np.argmax(np.zeros(CFG.n_classes))
            # y = [0, 0, 0]
        else:
            y = np.argmax(self.label.iloc[index].values)
            # y = self.label.iloc[index].values
        return X, y
    
    def _apply_transform(self, X):
        if self.transform:
            X = self.transform(X)
        return X

4. Model

Define the model. I use Mamba model as example.

class MambaModel(nn.Module):
    def __init__(self, 
                 dim_model=384, # Model dimension d_model (embedding size)
                 d_state=16, # SSM state expansion factor
                 d_conv=4, # Local convolution width
                 expand=2, # Block expansion factor
                 output = 3 # number of classes (or output number simply)
                 ):
        super().__init__()
        self.model = Mamba(
            d_model=dim_model,  
            d_state=d_state,  
            d_conv=d_conv,    
            expand=expand,    
        ).to("cuda")
        # mamba pass trought input size as is.
        self.output = nn.Linear(dim_model, output)
        self.softmax = nn.Softmax(dim=-1)
        

    def forward(self, x):
        # Add the length dimension if input has only 2 dimensions
        if len(x.shape) == 2:
            x = x.unsqueeze(1)
            
        x = self.model(x)
        x = self.output(x)
        x = x.squeeze()
        # x = self.softmax(x) # nn.CrossEntropyLoss include softmax. Note to don't forget softmax when inference.
        return x

5. Train

We train the model at here, but before, need to define some functions for prepare.

set seeds
def set_random_seed(seed: int = 42, deterministic: bool = False):
    """Set seeds"""
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)  # type: ignore
    torch.backends.cudnn.deterministic = deterministic  # type: ignore
move tensors to device
def to_device(
    tensors: tp.Union[tp.Tuple[torch.Tensor], tp.Dict[str, torch.Tensor]],
    device: torch.device, *args, **kwargs
):
    if isinstance(tensors, tuple):
        return (t.to(device, *args, **kwargs) for t in tensors)
    elif isinstance(tensors, dict):
        return {
            k: t.to(device, *args, **kwargs) for k, t in tensors.items()}
    else:
        return tensors.to(device, *args, **kwargs)
transform values to tensor
def to_tensor(x):
    return torch.tensor(x.values, dtype=torch.float32)

The preparation is done. Let's train.

Train

def train_one_fold(CFG,
                   val_fold: int,
                   train: pd.DataFrame,
                   output_path
                   ):
    feature_columns = [str(i) for i in range(384)]
    label_columns = ['bind1', 'bind2', 'bind3']

    set_random_seed(CFG.seed, deterministic=CFG.deterministic)
    device = torch.device(CFG.device)
    train_dataset = EXDataset(train = train[feature_columns][train['fold']!=val_fold].reset_index(drop=True), 
                              label = train[label_columns][train['fold']!=val_fold].reset_index(drop=True), 
                              transform = to_tensor)
    val_dataset = EXDataset(train = train[feature_columns][train['fold']==val_fold].reset_index(drop=True), 
                            label = train[label_columns][train['fold']==val_fold].reset_index(drop=True), 
                            transform = to_tensor)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=True, drop_last=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)

    model = MambaModel()
    model.to(device)

    optimizer = optim.AdamW(params=model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay)
    scheduler = lr_scheduler.OneCycleLR(
        optimizer=optimizer, epochs=CFG.max_epoch,
        pct_start=0.0, steps_per_epoch=len(train_loader),
        max_lr=CFG.lr, div_factor=25, final_div_factor=4.0e-01
    )
    loss_func = nn.CrossEntropyLoss()
    loss_func.to(device)
    loss_func_val = nn.CrossEntropyLoss()

    use_amp = CFG.enable_amp
    scaler = amp.GradScaler(enabled=use_amp)

    best_val_loss = 1.0e+09
    best_epoch = 0
    train_loss = 0
    val_loss = 0
    
    for epoch in range(1, CFG.max_epoch + 1):
        epoch_start = time()
        model.train()
        for batch in train_loader:
            
            x, t = batch
            # print(x)
            # print(t)
            x = to_device(x, device)
            t = to_device(t, device)
            # sys.exit()
                
            optimizer.zero_grad()
            with amp.autocast(use_amp):
                y = model(x)
                loss = loss_func(y, t)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            train_loss += loss.item()
            scheduler.step()
            
            # Monitor memory usage
            process = psutil.Process()
            mem_info = process.memory_info()
            print(f"Epoch: {epoch}, Memory Usage: {mem_info.rss / (1024 * 1024)} MB")

            # Optional: sleep for a bit to avoid cluttering output
            time.sleep(0.1)
            
        train_loss /= len(train_loader)
            
        model.eval()
        for batch in val_loader:
            x, t = batch
            x = to_device(x, device)
            with torch.no_grad(), amp.autocast(use_amp):
                y = model(x)
#                 y = torch.sigmoid(y)
            y = y.detach().cpu().to(torch.float32)
            loss = loss_func_val(y, t)
            val_loss += loss.item()
        val_loss /= len(val_loader)
        
        if val_loss < best_val_loss:
            best_epoch = epoch
            best_val_loss = val_loss
            # print("save model")
            torch.save(model.state_dict(), str(output_path / f'snapshot_epoch_{epoch}.pth'))
        
        elapsed_time = time() - epoch_start
        print(
            f"[epoch {epoch}] train loss: {train_loss: .6f}, val loss: {val_loss: .6f}, elapsed_time: {elapsed_time: .3f}")
        
        if epoch - best_epoch > CFG.es_patience:
            print("Early Stopping!")
            break
            
        train_loss = 0
        val_loss = 0
            
    return val_fold, best_epoch, best_val_loss

Do the above function actually, and save the best model of each epoch.

score_list = []
for fold_id in range(CFG.folds):
    output_path = Path(f"fold{fold_id}")
    output_path.mkdir(exist_ok=True)
    print(f"[fold{fold_id}]")
    score_list.append(train_one_fold(CFG, fold_id, train, output_path))

Check the result.

print(score_list)

delete models without best:

# select the best model and delete others
best_log_list = []
for (fold_id, best_epoch, _) in score_list:
    
    # select the best model
    exp_dir_path = Path(f"fold{fold_id}")
    best_model_path = exp_dir_path / f"snapshot_epoch_{best_epoch}.pth"
    # copy to new place
    copy_to = f"./best_model_fold{fold_id}.pth"
    shutil.copy(best_model_path, copy_to)
    
    for p in exp_dir_path.glob("*.pth"):
        # delete
        p.unlink()

6. Infer

define a function for inference.

def run_inference_loop(model, loader, device):
    model.to(device)
    model.eval()
    pred_list = []
    with torch.no_grad():
        for batch in tqdm(loader):
            x = to_device(batch[0], device)
            y = model(x)
            pred_list.append(y.detach().cpu().numpy())
    
    # concatenate to vertical (to df that like long scroll)
    pred_arr = np.concatenate(pred_list)
    del pred_list
    return pred_arr

Do inference actually.

def inference(train):
    test_pred_arr = np.zeros((len(train), CFG.n_classes))
    score_list = []

    for fold_id in range(CFG.folds):
        print(f"\n[fold {fold_id}]")
        device = torch.device(CFG.device)
        
        feature_columns = [str(i) for i in range(384)]
        test_dataset = EXDataset(test[feature_columns], transform = torch.tensor())
    
        test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)

        # get model
        model_path = f"./best_model_fold{fold_id}.pth"
        model = MambaModel()
        model.load_state_dict(torch.load(model_path, map_location=device))

        # inference
        test_pred = run_inference_loop(model, test_loader, device)
        test_pred_arr[fold_id] = test_pred

        del val_idx, val_path_label
        del model, val_loader
        torch.cuda.empty_cache()
        gc.collect()
    return test_pred_arr

test_preds_arr = inference(train_clm)

mean each predict.

test_pred = test_preds_arr.mean(axis=0)

It's over!

Summary

This time, I introduced machine learning pipeline in pytorch, please note to it doesn't work as is, modify to your dataset and model. Thank you for reading.

Reference

[1] scikit learn document

Discussion