🫥

PointNetのClassification Networkを書いてみる【ディープラーニング勉強会】

2023/03/20に公開

Iwaken Lab.内ディープラーニング勉強会(2023/3/13)向け資料

やったこと

https://qiita.com/opeco17/items/707a5c57bca41a145122

  • 上の記事を見ながらPointNetをPytorchで写経&実装した
  • Scaniverseしたウルトラマンとゼットンの点群から,indexに対するランダムサンプリングとfarthest point samplingをして,ダウンサンプリングされた点群とそのクラスラベルをつけたデータセットを作った
  • クラス分類するように学習して,4つのデータでテストした

(細かい試行錯誤はスクラップにまとめた)
https://zenn.dev/sakutama_11/scraps/404147fb9f8497
コード:
https://github.com/sakutama-11/PointNet

PointNetとは

PointNetは,三次元点群のための機械学習ネットワーク
詳細は詳解 3次元点群処理 Pythonによる基礎アルゴリズムの実装 129ページからあります
原著論文は
https://arxiv.org/pdf/1612.00593.pdf

  • 点群入力の向きを揃えるために,変換行列を求める学習(T-Net)を最初に行う
  • 特徴ベクトルを算出した後も,特徴ベクトルを揃えるための?学習をT-Netで行う
  • 各点への特徴ベクトル算出処理が終わったらmax poolingで各点の特徴を点群全体の特徴にまとめる.この時,max poolingを使うことで,点群の順番に依存しない学習が行える
  • Classification Networkはこうして求められた点群全体の特徴ベクトルを,クラス数にまとめてクラスを算出&学習(点群:ラベル のデータ)
  • Segmentation Networkは,点群全体の特徴ベクトルと各点の特徴ベクトルをつなげて各点のラベルを算出&学習(点:ラベル のデータ)

実装

環境構築

Nvidia Docker イメージからビルド

今回は,Nvidia Dockerを試したかったのもあって,Dockerでやりました
https://blog.shikoan.com/wsl2-ndivid-docker-pytorch/
これの通りにやってそのまま動いた.

データのやり取りや永続化のためにボリュームをマウント

https://docs.docker.jp/engine/userguide/dockervolumes.html

wsl側からWeindows側には

/mnt/c/Users/user_name

でアクセスできるので,Windows配下のディレクトリをDockerにマウントしておけば,docker ⇄ wsl ⇄ Windows でファイル共有可能になる

モデル実装

transform T-Net

class InputTNet(nn.Module):
    def __init__(self, num_points):
        super(InputTNet, self).__init__()
        self.num_points = num_points

        self.transT = nn.Sequential(
            NonLinear(3, 64),
            NonLinear(64, 128),
            NonLinear(128, 1024),
            # 点群数でMax Poolingすることで,点群全体に対する特徴にまとめる
            MaxPool(input_channels=1024, num_kernel=self.num_points),
            NonLinear(1024, 512),
            NonLinear(512, 256),
            # 9次のベクトルを生成することで,後で3x3の変換ベクトルに変形する
            nn.Linear(256, 9)
        )

    # shape of input_data is (batchsize x num_points, channel)
    def forward(self, input_data):
        matrix = self.transT(input_data).view(-1, 3, 3)
        # 点群全体を
        out = torch.matmul(input_data.view(-1, self.num_points, 3), matrix)
        out = out.view(-1, 3)
        return out

NonLinearには,Linear→ReLU→Batch Normalizationをセットにした自作Module

学び①: MaxPoolingで各点の特徴を点群の特徴に

MaxPoolで,1024個の点に対してMax Poolingすることによって,各点の特徴ベクトルを点群の特徴ベクトルに変換している.
ここは点群の数も1024点だし,チャンネル数も1024だったので少し混乱してしまった.
out = input_data.view(-1, self.num_channels, self.num_kernel)の変換が,なぜそうなるのかちょっとモヤモヤしたまま...

class MaxPool(nn.Module):
    def __init__(self, input_channels, num_kernel):
        super(MaxPool, self).__init__()
        self.num_channels = input_channels
        self.num_kernel = num_kernel
        self.main = nn.MaxPool1d(self.num_kernel)

    def forward(self, input_data):
        out = input_data.view(-1, self.num_channels, self.num_kernel)
        out = self.main(out)
        out = out.view(-1, self.num_channels)
        return out

feature T-Net

class FeatureTNet(nn.Module):
    def __init__(self, num_points):
        super(FeatureTNet, self).__init__()
        self.num_points = num_points

        self.featT = nn.Sequential(
            NonLinear(64, 64),
            NonLinear(64, 128),
            NonLinear(128, 1024),
            # 点群数でMax Poolingすることで,各点の特徴を点群全体に対する特徴にまとめる
            MaxPool(input_channels=1024, num_kernel=self.num_points),
            NonLinear(1024, 512),
            NonLinear(512, 256),
            # 64 x 64の変換行列
            nn.Linear(256, 4096)
        )

    # shape of input_data is (batchsize x num_points, channel)
    def forward(self, input_data):
        matrix = self.featT(input_data).view(-1, 64, 64)
        # transform whole point cloud
        out = torch.matmul(input_data.view(-1, self.num_points, 64), matrix)
        out = out.view(-1, 64)
        return out

全体

class PointNet(nn.Module):
    def __init__(self, num_points, num_labels):
        super(PointNet, self).__init__()
        self.num_points = num_points
        self.num_labels = num_labels

        self.pointNet = nn.Sequential(
            InputTNet(self.num_points),
            NonLinear(3, 64),
            NonLinear(64, 64),
            FeatureTNet(self.num_points),
            NonLinear(64, 64),
            NonLinear(64, 128),
            NonLinear(128, 1024),
            MaxPool(1024, self.num_points),
            NonLinear(1024, 512),
            nn.Dropout(p = 0.3),
            NonLinear(512, 256),
            nn.Dropout(p = 0.3),
            # classification label
            NonLinear(256, self.num_labels),
            )

    def forward(self, input_data):
        return self.pointNet(input_data)

データセット実装

DatasetとDataLoaderを理解した

https://zenn.dev/link/comments/ee72cc798e8d1f

作ったデータセット

https://zenn.dev/link/comments/fc4b28aa20249c
transform: データをnormalizeする
Dataset: 点群を点群数分サブサンプリングして部分点群: ラベルのデータセットを作る
indexサンプリング 1024点 × 128点群
farthest point sampling 1024点 × 128点群
を2つの密点群に対して処理
{128(index)+128(FPS)}×2(種類)=512のデータセットにしてみる
DataLoader: Datasetからバッチサイズ分データを取り出す

farthest point sampling: 点数がn点になるように均等にサンプルしてくれるやつ.

FPSでは、はじめに1点をランダムに選択します。次に,この点と他のすべての点との間の距離を計算し、最も距離の大きい点を選択します。そして,今回選択された点と他のすべての点をの距離を計算します。各点について,最初に選ばれた点との距離と,今回選ばれた点との距離のうち、より近いほうの(最小となる)距離に注目し、この値が最大となる点を第三の点として選択します。この操作を繰り返して目的の個数の点を選択します。

→おおよそ等間隔にサンプリングするアルゴリズム!ということ
直書きで書いてあるので,遅い.open3d使うとoctree使って早く計算してくれるけど,今回はデータセットを作ったらキャッシュしておく程度で計算時間は我慢してました.

本当は,向きやスケールをシャッフルしたデータセットを作らないといけなかった

sampler.py
import torch
import os
import json
from pathlib import Path
import warnings
import numpy as np
import random
from torch.utils.data import Dataset


def farthest_point_sample(point, npoint):
    """
    Input:
        xyz: pointcloud data, [N, D]
        npoint: number of samples
    Return:
        centroids: sampled pointcloud index, [npoint, D]
    """
    N, D = point.shape
    xyz = point[:,:3]
    centroids = np.zeros((npoint,))
    distance = np.ones((N,)) * 1e10
    farthest = np.random.randint(0, N)
    for i in range(npoint):
        centroids[i] = farthest
        centroid = xyz[farthest, :]
        dist = np.sum((xyz - centroid) ** 2, -1)
        mask = dist < distance
        distance[mask] = dist[mask]
        farthest = np.argmax(distance, -1)
    point = point[centroids.astype(np.int32)]
    return point


class PcdNormalize(object):
    def __init__(self):
        pass

    def __call__(self, sample):
        sample = sample[:, 0:3]
        centroid = torch.mean(sample, axis=0)
        sample = sample - centroid
        m = torch.max(torch.sqrt(torch.sum(sample ** 2, axis=1)))
        sample = sample / m
        return sample

class PcdDataset(Dataset):
    def __init__(self, data_num=512, npoints=1024, transform=PcdNormalize(), cache_dir='./data/', for_test=False):
        self.npoints = npoints
        self.data_num = data_num
        self.transform = transform

        data_cache_path = Path(cache_dir + 'data.pt')
        label_cache_path = Path(cache_dir + 'label.pt')
        if not for_test and data_cache_path.exists() and label_cache_path.exists():
            self.input_data = torch.load(data_cache_path)
            self.labels = torch.load(label_cache_path)
            return

        class_0_pcd = np.loadtxt('./data/Moebius.txt').astype(np.float32)
        class_1_pcd = np.loadtxt('./data/Zetton.txt').astype(np.float32)
        print("============pcd files loaded")
        pcds_0 = np.empty((0,npoints,class_0_pcd.shape[1]))
        pcds_1 = np.empty((0,npoints,class_1_pcd.shape[1]))
        print("============random sampling")
        for i in range(self.data_num//4):
            random_idx_0 = random.sample(range(len(class_0_pcd)), self.npoints)
            pcds_0 = np.append(pcds_0, np.array([class_0_pcd[random_idx_0]]), axis=0)
            random_idx_1 = random.sample(range(len(class_1_pcd)), self.npoints)
            pcds_1 = np.append(pcds_1, np.array([class_1_pcd[random_idx_1]]), axis=0)

        print("============farthest point sampling")
        for i in range(self.data_num//4):
            sampled_0 = farthest_point_sample(class_0_pcd, self.npoints)
            sampled_1 = farthest_point_sample(class_1_pcd, self.npoints)
            pcds_0 = np.append(pcds_0, np.array([sampled_0]), axis=0)
            pcds_1 = np.append(pcds_1, np.array([sampled_1]), axis=0)

        print("============create label")
        labels_0 = torch.zeros(len(pcds_0))
        labels_1 = torch.ones(len(pcds_1))
        class_0_tensor = torch.from_numpy(pcds_0)
        class_1_tensor = torch.from_numpy(pcds_1)
        self.input_data = torch.cat((class_0_tensor[:, :,0:3], class_1_tensor[:, :,0:3]), dim=0)
        self.labels = torch.cat((labels_0, labels_1), dim=0)
        # dataset = torch.cat((self.input_data, self.labels), dim=1)
        if not for_test:
            torch.save(self.input_data, data_cache_path)
            torch.save(self.labels, label_cache_path)

    def __len__(self):
        return self.data_num

    def __getitem__(self, index):
        data = self.input_data[index]
        label =  self.labels[index]

        if self.transform:
            data = self.transform(data)

        return data.view(-1, 3), label.view(-1, 1)


def main():
    data_set = PcdDataset(32)
    dataloader = torch.utils.data.DataLoader(data_set, batch_size=2, shuffle=True)

    for i in dataloader:
        print(i)
            
if __name__ == '__main__':
    main()

shapeとかnumpyなのかtensorなのかみたいなところに引っかかりつつちまちまデバッグして整形する.
main関数でDataLoaderの使い方もミニマムで学ぶ

Main実装

main.py
from Model import *
from sampler import *


batch_size = 64
num_points = 1024
num_labels = 1
epochs = 100


def main():
    pointnet = PointNet(num_points, num_labels)

    new_param = pointnet.state_dict()
    new_param['pointNet.0.transT.6.bias'] = torch.eye(3, 3).view(-1)
    new_param['pointNet.3.featT.6.bias'] = torch.eye(64, 64).view(-1)
    pointnet.load_state_dict(new_param)
    pointnet = pointnet.float()

    criterion = nn.BCELoss()
    optimizer = optim.Adam(pointnet.parameters(), lr=0.001)

    loss_list = []
    accuracy_list = []
    
    dataset = PcdDataset(512, num_points)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs):
        for input_data, labels in dataloader:
            pointnet.zero_grad()

            input_data = input_data.view(-1, 3)
            labels = labels.view(-1, 1)
            output = pointnet(input_data.float())
            output = nn.Sigmoid()(output)
            
            error = criterion(output, labels)
            # ここで勾配を計算している
            error.backward()
            # 各レイヤのパラメタを更新
            optimizer.step()

        with torch.no_grad():
            output[output > 0.5] = 1
            output[output < 0.5] = 0
            accuracy = (output==labels).sum().item()/batch_size
        

        loss_list.append(error.item())
        accuracy_list.append(accuracy)
        torch.save(pointnet.state_dict(), 'pointnet_weight.pth')
        print('epoch : {}   Loss : {}'.format(epoch, error.item()))
        print('epoch : {}   Accuracy : {}'.format(epoch, accuracy))
    
           
if __name__ == '__main__':
    main()

ほぼ写しただけだけど,元のコードと比べるとDataLoaderを使う都合でbatchとepochにループを分けてみたり

テスト実装

test.py
from Model import *
from sampler import *


batch_size = 64
num_points = 1024
num_labels = 1


def test():
    pointnet = PointNet(num_points, num_labels)
    pointnet.load_state_dict(torch.load('pointnet_weight.pth'))
    pointnet = pointnet.float()

    # target = 0
    

    dataset = PcdDataset(4, num_points, for_test=True)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=4, shuffle=False)

    pointnet.eval()
    for input_data, labels in dataloader:
        input_data = input_data.view(-1, 3)
        labels = labels.view(-1, 1)
        with torch.no_grad():
            pred = pointnet(input_data.float())
            pred = nn.Sigmoid()(pred)
            pred[pred > 0.5] = 1
            pred[pred < 0.5] = 0
            for predicted, actual in zip(pred, labels):
                print(f'Predicted: "{predicted.item()}", Actual: "{actual.item()}"')
                if predicted.item() == actual.item():
                    print("correct!")
                else:
                    print("incorrect...") 
            
if __name__ == '__main__':
    test()

基礎の基礎なんだけど,model.eval()と,with torch.no_grad():で一つからデータを入れられる(evalにしないでモデルに1つデータ入れたら「batch_normできないよ!」と怒られた)

結果

学び

model.state_dict().keys()のアクセスの仕方がわかった

modelの各レイヤのパラメータが格納されている

new_param = pointnet.state_dict()
new_param['pointNet.0.transT.6.bias'] = torch.eye(3, 3).view(-1)
new_param['pointNet.3.featT.6.bias'] = torch.eye(64, 64).view(-1)
pointnet.load_state_dict(new_param)

こんな感じで,各層のパラメータ初期値を任意の値に設定できる.
でも,この'pointNet.0.transT.6.bias'みたいなのは何で決まってるの...と思った

結論

各モジュールで設定した

class InputTNet(nn.Module):
    def __init__(self, num_points):
        super(InputTNet, self).__init__()
        self.num_points = num_points

        self.transT = nn.Sequential(
            NonLinear(3, 64),
            NonLinear(64, 128),
            NonLinear(128, 1024),
            # max pooling by number of points
            MaxPool(input_channels=1024, num_kernel=self.num_points),
            NonLinear(1024, 512),
            NonLinear(512, 256),
            # make 9 dimension feature for transformation matrix
            nn.Linear(256, 9)
        )

    # shape of input_data is (batchsize x num_points, channel)
    def forward(self, input_data):
        matrix = self.transT(input_data).view(-1, 3, 3)
        # transform whole point cloud
        out = torch.matmul(input_data.view(-1, self.num_points, 3), matrix)
        out = out.view(-1, 3)
        return out

forwardで呼び出しているメソッド名 matrix = self.transT(input_data).view(-1, 3, 3)の部分,ここではtransTが入っている.

https://zenn.dev/link/comments/4cf196977ced6e
よって,pytorchの各レイヤのモジュールは(nn.Sequential)[https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html] を使う方法と,nn.Moduleを継承したクラスを使う方法がある.
https://note.nkmk.me/python-pytorch-module-sequential/
https://dreamer-uma.com/pytorch-network/
nn.Moduleを継承して,メソッドにわかりやすい名前をつけると,その名前で参照できる
nn.Sequentialにそのまま並べて各処理を詰め込んで書くと,model.state_dict()のキーは,nn.Sequential()内に書いた順のインデックス(0-6とか)でアクセスされる.これはわかりにくいから,ぜひModuleで頑張って書こうと思った.
(nn.Sequentialにも,名前を指定する書き方はあるみたい,だけど書き方はあんまり気に入らなかった)

DatasetとDataLoaderの使い方を実例で学べた

もやったままのところ

点群ごとの区切りはどこで意識されている?

MaxPoolingでMaxPoolするカーネルを点群の区切りに来るように設定してる?
(batch_size, num_points, 3)ではなく,(batch_size * num_points, 3)なことを認知するまでに時間がかかった
そして納得はしてない...なぜ
お手本があったからそれに倣って

input_data = input_data.view(-1, 3)
labels = labels.view(-1, 1)

こうしたけど,一人では辿り着けないと思う...

MaxPoolの書き方

https://zenn.dev/link/comments/8abaa6275d9a44

class MaxPool(nn.Module):
    def __init__(self, input_channels, num_kernel):
        super(MaxPool, self).__init__()
        self.num_channels = input_channels
        self.num_kernel = num_kernel
        self.main = nn.MaxPool1d(self.num_kernel)

    def forward(self, input_data):
        out = input_data.view(-1, self.num_channels, self.num_kernel)
        out = self.main(out)
        out = out.view(-1, self.num_channels)
        return out

out = input_data.view(-1, self.num_channels, self.num_kernel)がなんのためにあるのか分からなかった

optimizerとlossがどうModelと繋がってるのか

error.backward()
optimizer.step()

この二行だけを見ると,モデルの情報とかパラメータをどう保持しているのかしていないのかが話からなかった
https://zenn.dev/link/comments/ef29bc1980d01c

shared mlp

実装的にどう"shared"が表現されているのか分からなかった

詳解 3次元点群処理の方をもう少し参考にすればよかった

今回参考にしたQiitaの記事を前にも見てやってたからっていうので参考にしたけどせっかくだから詳解 3次元点群処理の方に則ってやればよかった

まとめ

PointNetをPytorchで書いてみて,点群2種類からダウンサンプリングした点群データセットで学習.
新たにダウンサンプリングした点群でテストして正解した.

感想

ネットワークの書き方がちょっとわかった.
データの形をどうすればいいのかがやっぱり躓きどころで,まだ友達になれない

Discussion