🍎

Jetson Nano上に構築したk3sでGPUを使ったアプリを走らせる

2023/05/05に公開

やりたいこと

前回 の続き。
PyTorchのチュートリアル のテキスト分類モデルを借用して、Jetson上で動かしてみようと思う。

構造としては、以下のような流れ。

MQTTを使ったことがないので、勉強がてら遊んでみる。

MQTTサーバーを建てる

MQTTサーバーとして Mosquittoコンテナ をMac上に建てる。

https://mosquitto.org

単に コンテナイメージをそのままインスタンス化してもいいのだけれど、せっかくDocker for Macをインストールしているので、Kubernetesを使ってコンテナをインスタンス化する。
まず、 mosquitto.conf の設定を記述した ConfigMap を用意する。
マニフェストは以下の通り。

mosquitto_conf.yaml
apiVersion: v1
data:
  mosquitto.conf: |
    listener 1883
    allow_anonymous true
kind: ConfigMap
metadata:
  creationTimestamp: null
  name: mosquitto

このマニフェストを Mac上に建てた Kubernetesクラスタに適用する。
次に Mosquitto サーバーを建てるためのマニフェストを定義する。内容としては以下のとおり。
本当は StatefulSet にしたり、ログ等を永続化するような設定をすべきだろうけれど、今回は簡単のためにPodで定義し永続化も設定しないでおく。

mqtt_server.yaml
apiVersion: v1
kind: Pod
metadata:
  creationTimestamp: null
  labels:
    run: mqtt
  name: mqtt
spec:
  containers:
  - image: eclipse-mosquitto:2.0.15
    name: mqtt
    ports:
    - containerPort: 1883
    resources: {}
    volumeMounts:
      - name: conf
        mountPath: "/mosquitto/config/"
        readOnly: true
  dnsPolicy: ClusterFirst
  restartPolicy: Always
  volumes:
    - name: conf
      configMap:
        name: mosquitto

status: {}

---

apiVersion: v1
kind: Service
metadata:
  creationTimestamp: null
  labels:
    run: mqtt
  name: mqtt
spec:
  ports:
  - port: 1883
    name: "default-port"
    protocol: TCP
    nodePort: 31884
    targetPort: 1883
  selector:
    run: mqtt
  type: NodePort
status:
  loadBalancer: {}

これでひとまず MacとJetson双方がアクセスできる MQTTサーバーを建てることができた。

Jetson Nanoの設定

前回、Jetson Nanoにk3sをインストールしてKubernetes環境を構築したが、Device Pluginをインストールしていないのでこのままでは、k3sがGPUを認識できない。

$ sudo kubectl create -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.13.0/nvidia-device-plugin.yml

を実行して nvidia-device-plugin をインストールする。Device Plugin 自体は DaemonSet なので、 kubectl -n kube-system get ds を実行すれば現在の状態を確認することができる。

kubectl describe pod などでk3sが GPUを正常に認識できていることを確認したら、
有志が作った deviceQuery を実行するコンテナイメージを使って確認用のPodを建ててみる。正常に設定できていれば、PASS と出るはず。

nvidia-query.yaml
apiVersion: v1
kind: Pod
metadata:
name: nvidia-query
spec:
  restartPolicy: OnFailure
  containers:
  - name: nvidia-query
    image: xift/jetson_devicequery:r32.5.0
    command: [ "./deviceQuery" ]
    resources:
      limits:
        nvidia.com/gpu: 1
  tolerations:
  - key: nvidia.com/gpu
    operator: Exists
    effect: NoSchedule

デモアプリの作成

ひとまず準備は整ったので、デモアプリを作成する。デモアプリ上で動作させるモデルは前述の通り、PyTorchのチュートリアルText classification with the Torchtext library で作成するモデルを利用する。

https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html

Mac側のプログラム

Mac側の作業ディレクトリの構造は以下の通り。

.
├── pyproject.toml
├── pub.py
├── sub.py
└── text-classify
    └── jit_vocab.pt

Pythonのライブラリの管理にはPoetryを使っている。
https://python-poetry.org

Poetryの各種設定を記述している pyproject.toml の内容は以下の通りで、 Python 3.11以上の環境をターゲットにしている。

pyproject.toml
[tool.poetry]
name = "jetson-demo"
version = "0.1.0"
description = ""
authors = [""]
license = "MIT"
packages = [{include = "jetson_demo"}]

[tool.poetry.dependencies]
python = "^3.11"
paho-mqtt = "^1.6.1"
torch = "^2.0.0"
torchtext = "^0.15.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

では以下から、PublishとSubscribe用のソースコードについて簡単に説明する。

Publish用のプログラム

MQTTのPublish用のプログラム。本当は単純に受け取った文字列をそのままサーバーに送るだけにしたかったが、 Jetson Nanoが Armプロセッサのため、torchtext のインストールが面倒なので、Publish側で一通りの前処理を済ませた後にサーバーに送るような構造とした。

なお、途中の vocab = torch.jit.load(path / "jit_vocab.pt") は、PyTorchのチュートリアルで作成した torchtext.vocab.build_vocab_from_iterator の結果 をTorchScript化した上で、jit_vocab.ptとして torch.jit.save したもの。
また、実行する前に環境変数 MQTT_PORT でMQTTサーバーにアクセスするためのポートを指定する必要がある。

pub.py
import os
import time
from pathlib import Path
import torch
from torchtext.data.utils import get_tokenizer
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flag, rc):
    print("Connected with result code " + str(rc))

def on_disconnect(client, userdata, rc):
    if rc != 0:
       print("Unexpected disconnection.")

def on_publish(client, userdata, mid):
    print(f"publish: {mid}")

def create_text_pipeline(path):
    tokenizer = get_tokenizer("basic_english")
    vocab = torch.jit.load(path / "jit_vocab.pt")
    text_pipeline = lambda x: vocab(tokenizer(x))
    return text_pipeline

def main():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_publish = on_publish

    client.connect("localhost", int(os.environ["MQTT_PORT"]))
    client.loop_start()
    text_pipeline = create_text_pipeline(Path("text-classify"))

    while True:
        text = "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \
        enduring the season’s worst weather conditions on Sunday at The \
        Open on his way to a closing 75 at Royal Portrush, which \
        considering the wind and the rain was a respectable showing. \
        Thursday’s first round at the WGC-FedEx St. Jude Invitational \
        was another story. With temperatures in the mid-80s and hardly any \
        wind, the Spaniard was 13 strokes better in a flawless round. \
        Thanks to his best putting performance on the PGA Tour, Rahm \
        finished with an 8-under 62 for a three-stroke lead, which \
        was even more impressive considering he’d never played the \
        front nine at TPC Southwind."

    send_text = ",".join(list(map(lambda x: str(x), text_pipeline(text))))
    client.publish("jetson-demo/question", send_text)
    time.sleep(3)

if __name__ == '__main__':
    main()

Subscribe用のプログラム

次に最終的な分類結果を受信するプログラム。
こっちは単に指定したトピック (jetson-demo/answer) にメッセージが届いたら print するだけ。こちらも、実行する前に環境変数 MQTT_PORT でMQTTサーバーにアクセスするためのポートを指定する必要がある。

sub.py
import os
import paho.mqtt.client as mqtt
 
def on_connect(client, userdata, flag, rc):
    print("Connected with result code " + str(rc))
    client.subscribe("jetson-demo/answer")

def on_disconnect(client, userdata, rc):
    if  rc != 0:
      print("Unexpected disconnection.")

def on_message(client, userdata, msg):
    print("Received message '" + str(msg.payload) + "' on topic '" + msg.topic + "' with QoS " + str(msg.qos))

def main():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_message = on_message
 
    client.connect("localhost", int(os.environ["MQTT_PORT"]))
 
    client.loop_forever()
    
if __name__ == '__main__':
    main()

Jetson側のプログラム

Jetson側の作業ディレクトリの構成は以下の通り。

.
├── Dockerfile
├── jetson-app.py
├── jetson-demo.yaml
└── text-classify
    └── TextClassificationModel.pt

まずはじめに、以下の内容の jetson-app.py を 組み込んだコンテナを生成する。

jetson-app.py
import os
import time
from pathlib import Path
import torch
import torch.nn as nn
import paho.mqtt.client as mqtt


class TextClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super(TextClassificationModel, self).__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=False)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)


def get_text_classifier(path):
    vocab_size = 95811
    emb_size = 64
    n_class = 4

    model = TextClassificationModel(vocab_size, emb_size, n_class)
    model.load_state_dict(torch.load(path / "TextClassificationModel.pt"))
    return model.half()


class Predictor:
    def __init__(self, path):
        self.ag_news_label = {
            1: "World",
            2: "Sports",
            3: "Business",
            4: "Sci/Tec"
        }

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(self.device)
        self.model = get_text_classifier(path).to(self.device)

    def predict(self, text):
        with torch.no_grad():
            text = torch.tensor(text).to(self.device)
            output = self.model(text, torch.tensor([0]).to(self.device))

            label_index = output.argmax(1).item() + 1
            return self.ag_news_label[label_index]


predictor = Predictor(Path("text-classify"))


def on_connect(client, userdata, flag, rc):
    print("Connected with result code " + str(rc))
    client.subscribe("jetson-demo/question")


def on_message(client: mqtt.Client, userdata, msg):
    client.loop_start()
    text = msg.payload.decode("utf-8")
    text = [int(i) for i in text.split(",")]
    ans = predictor.predict(text)

    client.publish("jetson-demo/answer", ans)
    print(f"publish answer: {ans}")
    time.sleep(3)


def main():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(os.environ["MQTT_SERVER"], int(os.environ["MQTT_PORT"]))

    client.loop_forever()


if __name__ == '__main__':
    main()

そして、 Dockerfile の内容としては以下の通り。

Dockerfile
FROM nvcr.io/nvidia/l4t-pytorch:r32.6.1-pth1.9-py3

RUN pip3 install paho-mqtt
WORKDIR /jetson_demo
COPY text-classify/ /jetson_demo/text-classify/
COPY jetson-app.py /jetson_demo

ENTRYPOINT ["python3", "-u", "jetson-app.py"]

Baseイメージは NVIDIA Container Registry にある Jetsonシリーズ向けのものを使っている。

https://catalog.ngc.nvidia.com/orgs/nvidia/containers/l4t-pytorch

なお、PyTorch v1.9がインストールされたコンテナイメージを使っているが、これはコンテナイメージを Jetson NanoにインストールされているJetPack 4.6.1 (r32.6.1) に対応する物と揃えた時、最新のものが v1.9であったため。
https://developer.nvidia.com/embedded/linux-tegra-r3261

最後にJetson側で建てるPodのマニフェスト。
ここが意外と曲者で、元々Jetson NanoにインストールされていたNVIDIA Container Runtimeのバージョンを前回更新したのが原因で、ホストマシンにあるCUDAやcuDNNといったGPUを使う上で重要なのファイルたちをコンテナ側に自動的にマウントしてくれなくなっている。そのため、こちらで明示的に設定してあげる必要がある。

jetson-demo.yaml
apiVersion: v1
kind: Pod
metadata:
  name: jetson-demo
  namespace: default
  labels:
    app: text-clf
spec:
  restartPolicy: OnFailure
  containers:
  - name: torch
    image: jetson:trc
    command: ["/bin/sh"]
    args: ["-c", "python3 jetson-app.py"]
    resources:
      limits:
        nvidia.com/gpu: 1
    env:
      - name: NVIDIA_VISIBLE_DEVICES
        value: all
      - name: NVIDIA_DRIVER_CAPABILITIES
        value: all
      - name: MQTT_SERVER
        value: "MQTTサーバーを建てているMacのIP"
      - name: MQTT_PORT
        value: "31884"
    # ここと
    volumeMounts:
      - name: cuda
        mountPath: /usr/local/cuda-10.2
        readOnly: true
      - name: cudnn
        mountPath: /usr/lib/aarch64-linux-gnu/libcudnn.so.8
        readOnly: true
  # ここでCUDA関連のファイルをマウント
  volumes:
    - name: cuda
      hostPath:
        path: /usr/local/cuda-10.2
    - name: cudnn
      hostPath:
        path: /usr/lib/aarch64-linux-gnu/libcudnn.so.8

---

apiVersion: v1
kind: Service
metadata:
  name: jetson-demo-svc
  namespace: default
  labels:
    app: text-clf
spec:
  ports:
  - protocol: TCP
    port: 1883
    targetPort: 1883
    nodePort: 31884
  selector:
    app: text-clf
  type: NodePort

作成したマニフェストをJetson Nano上で適用した上で、Mac上で pub.pysub.py を起動してあげれば、処理が始まる。

最終的な結果を受け取る sub.py の出力は

Received message 'b'Sports'' on topic 'jetson-demo/answer' with QoS 0

が受信されるはずだ。

まとめ

とりあえず、MacとJetson Nanoを使ってMQTT経由で通信をするデモアプリを作成することができた。
実質的にサポートが切れているJetson NanoでGPUを使えるようにするのは中々骨が折れた。

とりあえずGWの自由研究はこれで終わり。気が向いたら何か別のものを投稿してみようと思う。

Discussion