【ML】Machine Learning pipeline Template in PyTorch
This time, I introduce the template of machine learning pipiline in pytorch.
I envision this used as baseline of machine laerning model when creating it from 0.
note: It doesn't work as is, please modify to fit your dataset and model.
0. Import Library
Import necesary library.
import pandas as pd
import numpy as np
import torch
import torchvision
from torch import nn
from torch import optim
from torch.optim import lr_scheduler
from torch.cuda import amp
from torch.nn import BCELoss
from torch.utils.data import Dataset
from mamba_ssm import Mamba
# etc.
define the config class.
class CFG:
TRAIN_CLM_PATH = Path('../../data/processed/20000_50per_CLM.parquet')
TEST_ENC_PATH = Path('../../data/external/test_enc.parquet')
TRAIN_PATH = Path('../../data/raw/train.parquet')
TEST_PATH = Path('../../data/raw/test.parquet')
folds = 5
max_epoch = 9 # number of max epoch. 1epoch means going around the training dataset.
batch_size = 32 # batch size. Number of samples passed to the network in one training step
lr = 1.0e-03 # learning rate. determine step size when updating model's weight
weight_decay = 1.0e-02 # weight decay. Append regularization term for prevent over fitting
es_patience = 5 # Timing for early stopping. If there is no improvement within this number of epochs, training will be stopped early.
seed = 1086 # Random number seed
deterministic = True # Enable/disable deterministic behavior. If enabled, the program will produce the same results every time it starts with the same initial conditions and inputs.
enable_amp = False # Enable/disable Automatic Mixed Precision. Optimizations for floating point etc.
device = "cuda"
# etc.
1. Prepare data
Preparing train/test data.
train = train_data # pd.DataFrame
test = test_data # pd.DataFrame
2. Split fold
Split data into folds. Please refer to sklearn document as you need.
from sklearn.model_selection import KFold
def split_fold(df:pd.DataFrame):
# config
N_FOLDS = 5
RANDAM_SEED = 42
df['fold'] = -1
# object
skf = KFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDAM_SEED)
for i, (train_index, test_index) in enumerate(skf.split(df)):
df.loc[test_index, 'fold'] = i
return df
train = split_fold(train)
train.head()
3. Dataset
Define the dataset.
class EXDataset(Dataset):
def __init__(
self,
train: pd.DataFrame,
label: pd.DataFrame = pd.DataFrame(),
is_test: bool = False,
transform = None
):
self.train = train
self.label = label
self.is_test = is_test
self.transform = transform
def __len__(self):
# return total num of data
return len(self.train)
def __getitem__(self, index:int):
# return data and target assosiated with index
X = self.train.iloc[index]
X = self._apply_transform(X)
if self.is_test:
y = np.argmax(np.zeros(CFG.n_classes))
# y = [0, 0, 0]
else:
y = np.argmax(self.label.iloc[index].values)
# y = self.label.iloc[index].values
return X, y
def _apply_transform(self, X):
if self.transform:
X = self.transform(X)
return X
4. Model
Define the model. I use Mamba model as example.
class MambaModel(nn.Module):
def __init__(self,
dim_model=384, # Model dimension d_model (embedding size)
d_state=16, # SSM state expansion factor
d_conv=4, # Local convolution width
expand=2, # Block expansion factor
output = 3 # number of classes (or output number simply)
):
super().__init__()
self.model = Mamba(
d_model=dim_model,
d_state=d_state,
d_conv=d_conv,
expand=expand,
).to("cuda")
# mamba pass trought input size as is.
self.output = nn.Linear(dim_model, output)
self.softmax = nn.Softmax(dim=-1)
def forward(self, x):
# Add the length dimension if input has only 2 dimensions
if len(x.shape) == 2:
x = x.unsqueeze(1)
x = self.model(x)
x = self.output(x)
x = x.squeeze()
# x = self.softmax(x) # nn.CrossEntropyLoss include softmax. Note to don't forget softmax when inference.
return x
5. Train
We train the model at here, but before, need to define some functions for prepare.
set seeds
def set_random_seed(seed: int = 42, deterministic: bool = False):
"""Set seeds"""
random.seed(seed)
np.random.seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed) # type: ignore
torch.backends.cudnn.deterministic = deterministic # type: ignore
move tensors to device
def to_device(
tensors: tp.Union[tp.Tuple[torch.Tensor], tp.Dict[str, torch.Tensor]],
device: torch.device, *args, **kwargs
):
if isinstance(tensors, tuple):
return (t.to(device, *args, **kwargs) for t in tensors)
elif isinstance(tensors, dict):
return {
k: t.to(device, *args, **kwargs) for k, t in tensors.items()}
else:
return tensors.to(device, *args, **kwargs)
transform values to tensor
def to_tensor(x):
return torch.tensor(x.values, dtype=torch.float32)
The preparation is done. Let's train.
Train
def train_one_fold(CFG,
val_fold: int,
train: pd.DataFrame,
output_path
):
feature_columns = [str(i) for i in range(384)]
label_columns = ['bind1', 'bind2', 'bind3']
set_random_seed(CFG.seed, deterministic=CFG.deterministic)
device = torch.device(CFG.device)
train_dataset = EXDataset(train = train[feature_columns][train['fold']!=val_fold].reset_index(drop=True),
label = train[label_columns][train['fold']!=val_fold].reset_index(drop=True),
transform = to_tensor)
val_dataset = EXDataset(train = train[feature_columns][train['fold']==val_fold].reset_index(drop=True),
label = train[label_columns][train['fold']==val_fold].reset_index(drop=True),
transform = to_tensor)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=True, drop_last=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)
model = MambaModel()
model.to(device)
optimizer = optim.AdamW(params=model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay)
scheduler = lr_scheduler.OneCycleLR(
optimizer=optimizer, epochs=CFG.max_epoch,
pct_start=0.0, steps_per_epoch=len(train_loader),
max_lr=CFG.lr, div_factor=25, final_div_factor=4.0e-01
)
loss_func = nn.CrossEntropyLoss()
loss_func.to(device)
loss_func_val = nn.CrossEntropyLoss()
use_amp = CFG.enable_amp
scaler = amp.GradScaler(enabled=use_amp)
best_val_loss = 1.0e+09
best_epoch = 0
train_loss = 0
val_loss = 0
for epoch in range(1, CFG.max_epoch + 1):
epoch_start = time()
model.train()
for batch in train_loader:
x, t = batch
# print(x)
# print(t)
x = to_device(x, device)
t = to_device(t, device)
# sys.exit()
optimizer.zero_grad()
with amp.autocast(use_amp):
y = model(x)
loss = loss_func(y, t)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
train_loss += loss.item()
scheduler.step()
# Monitor memory usage
process = psutil.Process()
mem_info = process.memory_info()
print(f"Epoch: {epoch}, Memory Usage: {mem_info.rss / (1024 * 1024)} MB")
# Optional: sleep for a bit to avoid cluttering output
time.sleep(0.1)
train_loss /= len(train_loader)
model.eval()
for batch in val_loader:
x, t = batch
x = to_device(x, device)
with torch.no_grad(), amp.autocast(use_amp):
y = model(x)
# y = torch.sigmoid(y)
y = y.detach().cpu().to(torch.float32)
loss = loss_func_val(y, t)
val_loss += loss.item()
val_loss /= len(val_loader)
if val_loss < best_val_loss:
best_epoch = epoch
best_val_loss = val_loss
# print("save model")
torch.save(model.state_dict(), str(output_path / f'snapshot_epoch_{epoch}.pth'))
elapsed_time = time() - epoch_start
print(
f"[epoch {epoch}] train loss: {train_loss: .6f}, val loss: {val_loss: .6f}, elapsed_time: {elapsed_time: .3f}")
if epoch - best_epoch > CFG.es_patience:
print("Early Stopping!")
break
train_loss = 0
val_loss = 0
return val_fold, best_epoch, best_val_loss
Do the above function actually, and save the best model of each epoch.
score_list = []
for fold_id in range(CFG.folds):
output_path = Path(f"fold{fold_id}")
output_path.mkdir(exist_ok=True)
print(f"[fold{fold_id}]")
score_list.append(train_one_fold(CFG, fold_id, train, output_path))
Check the result.
print(score_list)
delete models without best:
# select the best model and delete others
best_log_list = []
for (fold_id, best_epoch, _) in score_list:
# select the best model
exp_dir_path = Path(f"fold{fold_id}")
best_model_path = exp_dir_path / f"snapshot_epoch_{best_epoch}.pth"
# copy to new place
copy_to = f"./best_model_fold{fold_id}.pth"
shutil.copy(best_model_path, copy_to)
for p in exp_dir_path.glob("*.pth"):
# delete
p.unlink()
6. Infer
define a function for inference.
def run_inference_loop(model, loader, device):
model.to(device)
model.eval()
pred_list = []
with torch.no_grad():
for batch in tqdm(loader):
x = to_device(batch[0], device)
y = model(x)
pred_list.append(y.detach().cpu().numpy())
# concatenate to vertical (to df that like long scroll)
pred_arr = np.concatenate(pred_list)
del pred_list
return pred_arr
Do inference actually.
def inference(train):
test_pred_arr = np.zeros((len(train), CFG.n_classes))
score_list = []
for fold_id in range(CFG.folds):
print(f"\n[fold {fold_id}]")
device = torch.device(CFG.device)
feature_columns = [str(i) for i in range(384)]
test_dataset = EXDataset(test[feature_columns], transform = torch.tensor())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)
# get model
model_path = f"./best_model_fold{fold_id}.pth"
model = MambaModel()
model.load_state_dict(torch.load(model_path, map_location=device))
# inference
test_pred = run_inference_loop(model, test_loader, device)
test_pred_arr[fold_id] = test_pred
del val_idx, val_path_label
del model, val_loader
torch.cuda.empty_cache()
gc.collect()
return test_pred_arr
test_preds_arr = inference(train_clm)
mean each predict.
test_pred = test_preds_arr.mean(axis=0)
It's over!
Summary
This time, I introduced machine learning pipeline in pytorch, please note to it doesn't work as is, modify to your dataset and model. Thank you for reading.
Discussion