PointNetのClassification Networkを書いてみる【ディープラーニング勉強会】
Iwaken Lab.内ディープラーニング勉強会(2023/3/13)向け資料
やったこと
- 上の記事を見ながらPointNetをPytorchで写経&実装した
- Scaniverseしたウルトラマンとゼットンの点群から,indexに対するランダムサンプリングとfarthest point samplingをして,ダウンサンプリングされた点群とそのクラスラベルをつけたデータセットを作った
- クラス分類するように学習して,4つのデータでテストした
(細かい試行錯誤はスクラップにまとめた)
コード:PointNetとは
PointNetは,三次元点群のための機械学習ネットワーク
詳細は詳解 3次元点群処理 Pythonによる基礎アルゴリズムの実装 129ページからあります
原著論文は
- 点群入力の向きを揃えるために,変換行列を求める学習(T-Net)を最初に行う
- 特徴ベクトルを算出した後も,特徴ベクトルを揃えるための?学習をT-Netで行う
- 各点への特徴ベクトル算出処理が終わったらmax poolingで各点の特徴を点群全体の特徴にまとめる.この時,max poolingを使うことで,点群の順番に依存しない学習が行える
- Classification Networkはこうして求められた点群全体の特徴ベクトルを,クラス数にまとめてクラスを算出&学習(点群:ラベル のデータ)
- Segmentation Networkは,点群全体の特徴ベクトルと各点の特徴ベクトルをつなげて各点のラベルを算出&学習(点:ラベル のデータ)
実装
環境構築
Nvidia Docker イメージからビルド
今回は,Nvidia Dockerを試したかったのもあって,Dockerでやりました
これの通りにやってそのまま動いた.データのやり取りや永続化のためにボリュームをマウント
wsl側からWeindows側には
/mnt/c/Users/user_name
でアクセスできるので,Windows配下のディレクトリをDockerにマウントしておけば,docker ⇄ wsl ⇄ Windows でファイル共有可能になる
モデル実装
transform T-Net
class InputTNet(nn.Module):
def __init__(self, num_points):
super(InputTNet, self).__init__()
self.num_points = num_points
self.transT = nn.Sequential(
NonLinear(3, 64),
NonLinear(64, 128),
NonLinear(128, 1024),
# 点群数でMax Poolingすることで,点群全体に対する特徴にまとめる
MaxPool(input_channels=1024, num_kernel=self.num_points),
NonLinear(1024, 512),
NonLinear(512, 256),
# 9次のベクトルを生成することで,後で3x3の変換ベクトルに変形する
nn.Linear(256, 9)
)
# shape of input_data is (batchsize x num_points, channel)
def forward(self, input_data):
matrix = self.transT(input_data).view(-1, 3, 3)
# 点群全体を
out = torch.matmul(input_data.view(-1, self.num_points, 3), matrix)
out = out.view(-1, 3)
return out
NonLinearには,Linear→ReLU→Batch Normalizationをセットにした自作Module
学び①: MaxPoolingで各点の特徴を点群の特徴に
MaxPoolで,1024個の点に対してMax Poolingすることによって,各点の特徴ベクトルを点群の特徴ベクトルに変換している.
ここは点群の数も1024点だし,チャンネル数も1024だったので少し混乱してしまった.
out = input_data.view(-1, self.num_channels, self.num_kernel)
の変換が,なぜそうなるのかちょっとモヤモヤしたまま...
class MaxPool(nn.Module):
def __init__(self, input_channels, num_kernel):
super(MaxPool, self).__init__()
self.num_channels = input_channels
self.num_kernel = num_kernel
self.main = nn.MaxPool1d(self.num_kernel)
def forward(self, input_data):
out = input_data.view(-1, self.num_channels, self.num_kernel)
out = self.main(out)
out = out.view(-1, self.num_channels)
return out
feature T-Net
class FeatureTNet(nn.Module):
def __init__(self, num_points):
super(FeatureTNet, self).__init__()
self.num_points = num_points
self.featT = nn.Sequential(
NonLinear(64, 64),
NonLinear(64, 128),
NonLinear(128, 1024),
# 点群数でMax Poolingすることで,各点の特徴を点群全体に対する特徴にまとめる
MaxPool(input_channels=1024, num_kernel=self.num_points),
NonLinear(1024, 512),
NonLinear(512, 256),
# 64 x 64の変換行列
nn.Linear(256, 4096)
)
# shape of input_data is (batchsize x num_points, channel)
def forward(self, input_data):
matrix = self.featT(input_data).view(-1, 64, 64)
# transform whole point cloud
out = torch.matmul(input_data.view(-1, self.num_points, 64), matrix)
out = out.view(-1, 64)
return out
全体
class PointNet(nn.Module):
def __init__(self, num_points, num_labels):
super(PointNet, self).__init__()
self.num_points = num_points
self.num_labels = num_labels
self.pointNet = nn.Sequential(
InputTNet(self.num_points),
NonLinear(3, 64),
NonLinear(64, 64),
FeatureTNet(self.num_points),
NonLinear(64, 64),
NonLinear(64, 128),
NonLinear(128, 1024),
MaxPool(1024, self.num_points),
NonLinear(1024, 512),
nn.Dropout(p = 0.3),
NonLinear(512, 256),
nn.Dropout(p = 0.3),
# classification label
NonLinear(256, self.num_labels),
)
def forward(self, input_data):
return self.pointNet(input_data)
データセット実装
DatasetとDataLoaderを理解した
作ったデータセット
Dataset: 点群を点群数分サブサンプリングして部分点群: ラベルのデータセットを作る
indexサンプリング 1024点 × 128点群
farthest point sampling 1024点 × 128点群
を2つの密点群に対して処理
{128(index)+128(FPS)}×2(種類)=512のデータセットにしてみる
DataLoader: Datasetからバッチサイズ分データを取り出す
farthest point sampling: 点数がn点になるように均等にサンプルしてくれるやつ.
FPSでは、はじめに1点をランダムに選択します。次に,この点と他のすべての点との間の距離を計算し、最も距離の大きい点を選択します。そして,今回選択された点と他のすべての点をの距離を計算します。各点について,最初に選ばれた点との距離と,今回選ばれた点との距離のうち、より近いほうの(最小となる)距離に注目し、この値が最大となる点を第三の点として選択します。この操作を繰り返して目的の個数の点を選択します。
→おおよそ等間隔にサンプリングするアルゴリズム!ということ
直書きで書いてあるので,遅い.open3d使うとoctree使って早く計算してくれるけど,今回はデータセットを作ったらキャッシュしておく程度で計算時間は我慢してました.
本当は,向きやスケールをシャッフルしたデータセットを作らないといけなかった
import torch
import os
import json
from pathlib import Path
import warnings
import numpy as np
import random
from torch.utils.data import Dataset
def farthest_point_sample(point, npoint):
"""
Input:
xyz: pointcloud data, [N, D]
npoint: number of samples
Return:
centroids: sampled pointcloud index, [npoint, D]
"""
N, D = point.shape
xyz = point[:,:3]
centroids = np.zeros((npoint,))
distance = np.ones((N,)) * 1e10
farthest = np.random.randint(0, N)
for i in range(npoint):
centroids[i] = farthest
centroid = xyz[farthest, :]
dist = np.sum((xyz - centroid) ** 2, -1)
mask = dist < distance
distance[mask] = dist[mask]
farthest = np.argmax(distance, -1)
point = point[centroids.astype(np.int32)]
return point
class PcdNormalize(object):
def __init__(self):
pass
def __call__(self, sample):
sample = sample[:, 0:3]
centroid = torch.mean(sample, axis=0)
sample = sample - centroid
m = torch.max(torch.sqrt(torch.sum(sample ** 2, axis=1)))
sample = sample / m
return sample
class PcdDataset(Dataset):
def __init__(self, data_num=512, npoints=1024, transform=PcdNormalize(), cache_dir='./data/', for_test=False):
self.npoints = npoints
self.data_num = data_num
self.transform = transform
data_cache_path = Path(cache_dir + 'data.pt')
label_cache_path = Path(cache_dir + 'label.pt')
if not for_test and data_cache_path.exists() and label_cache_path.exists():
self.input_data = torch.load(data_cache_path)
self.labels = torch.load(label_cache_path)
return
class_0_pcd = np.loadtxt('./data/Moebius.txt').astype(np.float32)
class_1_pcd = np.loadtxt('./data/Zetton.txt').astype(np.float32)
print("============pcd files loaded")
pcds_0 = np.empty((0,npoints,class_0_pcd.shape[1]))
pcds_1 = np.empty((0,npoints,class_1_pcd.shape[1]))
print("============random sampling")
for i in range(self.data_num//4):
random_idx_0 = random.sample(range(len(class_0_pcd)), self.npoints)
pcds_0 = np.append(pcds_0, np.array([class_0_pcd[random_idx_0]]), axis=0)
random_idx_1 = random.sample(range(len(class_1_pcd)), self.npoints)
pcds_1 = np.append(pcds_1, np.array([class_1_pcd[random_idx_1]]), axis=0)
print("============farthest point sampling")
for i in range(self.data_num//4):
sampled_0 = farthest_point_sample(class_0_pcd, self.npoints)
sampled_1 = farthest_point_sample(class_1_pcd, self.npoints)
pcds_0 = np.append(pcds_0, np.array([sampled_0]), axis=0)
pcds_1 = np.append(pcds_1, np.array([sampled_1]), axis=0)
print("============create label")
labels_0 = torch.zeros(len(pcds_0))
labels_1 = torch.ones(len(pcds_1))
class_0_tensor = torch.from_numpy(pcds_0)
class_1_tensor = torch.from_numpy(pcds_1)
self.input_data = torch.cat((class_0_tensor[:, :,0:3], class_1_tensor[:, :,0:3]), dim=0)
self.labels = torch.cat((labels_0, labels_1), dim=0)
# dataset = torch.cat((self.input_data, self.labels), dim=1)
if not for_test:
torch.save(self.input_data, data_cache_path)
torch.save(self.labels, label_cache_path)
def __len__(self):
return self.data_num
def __getitem__(self, index):
data = self.input_data[index]
label = self.labels[index]
if self.transform:
data = self.transform(data)
return data.view(-1, 3), label.view(-1, 1)
def main():
data_set = PcdDataset(32)
dataloader = torch.utils.data.DataLoader(data_set, batch_size=2, shuffle=True)
for i in dataloader:
print(i)
if __name__ == '__main__':
main()
shapeとかnumpyなのかtensorなのかみたいなところに引っかかりつつちまちまデバッグして整形する.
main関数でDataLoaderの使い方もミニマムで学ぶ
Main実装
from Model import *
from sampler import *
batch_size = 64
num_points = 1024
num_labels = 1
epochs = 100
def main():
pointnet = PointNet(num_points, num_labels)
new_param = pointnet.state_dict()
new_param['pointNet.0.transT.6.bias'] = torch.eye(3, 3).view(-1)
new_param['pointNet.3.featT.6.bias'] = torch.eye(64, 64).view(-1)
pointnet.load_state_dict(new_param)
pointnet = pointnet.float()
criterion = nn.BCELoss()
optimizer = optim.Adam(pointnet.parameters(), lr=0.001)
loss_list = []
accuracy_list = []
dataset = PcdDataset(512, num_points)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
for epoch in range(epochs):
for input_data, labels in dataloader:
pointnet.zero_grad()
input_data = input_data.view(-1, 3)
labels = labels.view(-1, 1)
output = pointnet(input_data.float())
output = nn.Sigmoid()(output)
error = criterion(output, labels)
# ここで勾配を計算している
error.backward()
# 各レイヤのパラメタを更新
optimizer.step()
with torch.no_grad():
output[output > 0.5] = 1
output[output < 0.5] = 0
accuracy = (output==labels).sum().item()/batch_size
loss_list.append(error.item())
accuracy_list.append(accuracy)
torch.save(pointnet.state_dict(), 'pointnet_weight.pth')
print('epoch : {} Loss : {}'.format(epoch, error.item()))
print('epoch : {} Accuracy : {}'.format(epoch, accuracy))
if __name__ == '__main__':
main()
ほぼ写しただけだけど,元のコードと比べるとDataLoaderを使う都合でbatchとepochにループを分けてみたり
テスト実装
from Model import *
from sampler import *
batch_size = 64
num_points = 1024
num_labels = 1
def test():
pointnet = PointNet(num_points, num_labels)
pointnet.load_state_dict(torch.load('pointnet_weight.pth'))
pointnet = pointnet.float()
# target = 0
dataset = PcdDataset(4, num_points, for_test=True)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=4, shuffle=False)
pointnet.eval()
for input_data, labels in dataloader:
input_data = input_data.view(-1, 3)
labels = labels.view(-1, 1)
with torch.no_grad():
pred = pointnet(input_data.float())
pred = nn.Sigmoid()(pred)
pred[pred > 0.5] = 1
pred[pred < 0.5] = 0
for predicted, actual in zip(pred, labels):
print(f'Predicted: "{predicted.item()}", Actual: "{actual.item()}"')
if predicted.item() == actual.item():
print("correct!")
else:
print("incorrect...")
if __name__ == '__main__':
test()
基礎の基礎なんだけど,model.eval()
と,with torch.no_grad():
で一つからデータを入れられる(evalにしないでモデルに1つデータ入れたら「batch_normできないよ!」と怒られた)
結果
学び
model.state_dict().keys()のアクセスの仕方がわかった
modelの各レイヤのパラメータが格納されている
new_param = pointnet.state_dict()
new_param['pointNet.0.transT.6.bias'] = torch.eye(3, 3).view(-1)
new_param['pointNet.3.featT.6.bias'] = torch.eye(64, 64).view(-1)
pointnet.load_state_dict(new_param)
こんな感じで,各層のパラメータ初期値を任意の値に設定できる.
でも,この'pointNet.0.transT.6.bias'みたいなのは何で決まってるの...と思った
結論
各モジュールで設定した
class InputTNet(nn.Module):
def __init__(self, num_points):
super(InputTNet, self).__init__()
self.num_points = num_points
self.transT = nn.Sequential(
NonLinear(3, 64),
NonLinear(64, 128),
NonLinear(128, 1024),
# max pooling by number of points
MaxPool(input_channels=1024, num_kernel=self.num_points),
NonLinear(1024, 512),
NonLinear(512, 256),
# make 9 dimension feature for transformation matrix
nn.Linear(256, 9)
)
# shape of input_data is (batchsize x num_points, channel)
def forward(self, input_data):
matrix = self.transT(input_data).view(-1, 3, 3)
# transform whole point cloud
out = torch.matmul(input_data.view(-1, self.num_points, 3), matrix)
out = out.view(-1, 3)
return out
forwardで呼び出しているメソッド名 matrix = self.transT(input_data).view(-1, 3, 3)
の部分,ここではtransTが入っている.
https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html] を使う方法と,nn.Moduleを継承したクラスを使う方法がある.
nn.Moduleを継承して,メソッドにわかりやすい名前をつけると,その名前で参照できる
nn.Sequentialにそのまま並べて各処理を詰め込んで書くと,model.state_dict()のキーは,nn.Sequential()内に書いた順のインデックス(0-6とか)でアクセスされる.これはわかりにくいから,ぜひModuleで頑張って書こうと思った.
(nn.Sequentialにも,名前を指定する書き方はあるみたい,だけど書き方はあんまり気に入らなかった)
DatasetとDataLoaderの使い方を実例で学べた
もやったままのところ
点群ごとの区切りはどこで意識されている?
MaxPoolingでMaxPoolするカーネルを点群の区切りに来るように設定してる?
(batch_size, num_points, 3)
ではなく,(batch_size * num_points, 3)
なことを認知するまでに時間がかかった
そして納得はしてない...なぜ
お手本があったからそれに倣って
input_data = input_data.view(-1, 3)
labels = labels.view(-1, 1)
こうしたけど,一人では辿り着けないと思う...
MaxPoolの書き方
class MaxPool(nn.Module):
def __init__(self, input_channels, num_kernel):
super(MaxPool, self).__init__()
self.num_channels = input_channels
self.num_kernel = num_kernel
self.main = nn.MaxPool1d(self.num_kernel)
def forward(self, input_data):
out = input_data.view(-1, self.num_channels, self.num_kernel)
out = self.main(out)
out = out.view(-1, self.num_channels)
return out
out = input_data.view(-1, self.num_channels, self.num_kernel)
がなんのためにあるのか分からなかった
optimizerとlossがどうModelと繋がってるのか
error.backward()
optimizer.step()
この二行だけを見ると,モデルの情報とかパラメータをどう保持しているのかしていないのかが話からなかった
shared mlp
実装的にどう"shared"が表現されているのか分からなかった
詳解 3次元点群処理の方をもう少し参考にすればよかった
今回参考にしたQiitaの記事を前にも見てやってたからっていうので参考にしたけどせっかくだから詳解 3次元点群処理の方に則ってやればよかった
まとめ
PointNetをPytorchで書いてみて,点群2種類からダウンサンプリングした点群データセットで学習.
新たにダウンサンプリングした点群でテストして正解した.
感想
ネットワークの書き方がちょっとわかった.
データの形をどうすればいいのかがやっぱり躓きどころで,まだ友達になれない
Discussion