♨️

M5Stack Module LLM で カスタムモデルを動かす ~ 3.pythonデモコードでリアルタイム推論

2024/12/10に公開

はじめに

この記事では@PINTO03091氏によるYOLOv9カスタムモデルのwoholebody17のONNXモデルをM5Stack Module LLMのNPUで実行します。実行には1.モデル変換で作ったモデルと2.python用runtimeビルド
で制作したruntimeを使用します。

プロジェクトの概要については0.概要をご確認ください。

環境

実行はM5Stack LLM Module上にSSH接続して行います。

M5Stack LLM Moduleの接続

様々な実装がありますが、ここではカメラ画像のリアルタイム推論を行うため、以下の図のようにハードを接続してください。

M5Stack Core2はModule LLMへ電源供給するためにのみ使用していますので、Basic、CoreS3、Gray、Fire、などのいずれかのモデルで代用可能です。

ウェブカメラにはModule LLMのTypeC USBポートからUSBハブを介して接続しています。USBハブは使えるものと使えないものが在るようですがDAISOのTypeC to AなUSBハブが使用可能であったとの報告があります。
https://x.com/nnn112358/status/1859010353851756670

python実行環境

必要なライブラリ類をインストールします。

apt update
apt install libgl1
pip install opencv-python
pip install flask

作業用フォルダ

容量に比較的余裕がある/optに作業フォルダを作成して移動します。

mkdir /opt/workspace
cd /opt/workspace

1.モデル変換で作ったyolov9_t_wholebody17_relu_480x640_cut_coco4.axmodelと2.python用runtimeビルド
で制作したax_yolov9_module.soを/opt/workspaceにコピーします。

実行用デモコード

Webカメラで撮影した画像を推論にかけ、Flaskを使って結果をウェブページ上にストリーミングします。
まず、表示用のhtmlファイルを準備します。

mdkir templates
cd templates

index.htmlを以下の通り作成して保存してください。

<html>
   <head>
       <title>{{ title }} from M5Stack Module LLM</title>
   </head>   <body>
       <h3>from {{ user.username }}.</h3>
       <h3>Module LLM CV demo Live Streaming.</h3>
       <img src="{{ url_for('video_feed') }}">
       <h3>model:{{ user.modelname }}</h3>
   </body>
</html>

作業用フォルダに戻り、デモ用スクリプトを制作して保存します。
スクリプト名はapp.pyとしてください。

cd /opt/workspace

デモコード

from flask import render_template, Flask, Response

import numpy as np
import cv2
import ax_yolov9_module
import time
import os
import re

from typing import List

app = Flask(__name__)


# クラス名リスト
class_names = [
    "Body", "Adult", "Child", "Male", "Female", "Body_with_wheelchair", "Body_with_crutches", 
    "Head", "Face", "Eye", "Nose", "Mouth", "Ear", "Hand", "Hand_left", "Hand_right", "Foot"
]

num_class = 17

# ラインの色をクラスごとに割り当て(ランダムな色)
np.random.seed(0)  # 再現性のため固定
class_colors = {class_id: tuple(np.random.randint(0, 255, 3).tolist()) for class_id in range(len(class_names))}
scale = 0
PROB_THRESHOLD = 0.30  # 必要に応じて変更
num_frames = 300
codec = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
fps = 15  # フレームレート(必要に応じて変更)


# リサイズ&パディング関数
def resize_and_pad(image, target_height, target_width):
    global scale


    """
    画像を高さと幅に基づいてリサイズし、足りない部分をパディングして目的のサイズに調整する
    """
    h, w, _ = image.shape
    scale_h = target_height / h  # 高さ方向のスケール
    scale_w = target_width / w  # 幅方向のスケール
    scale = min(scale_h, scale_w)  # 短辺に基づいてスケールを決定
    #print("scale:"+ str(scale))

    # 新しいサイズを計算
    new_h, new_w = int(h * scale), int(w * scale)

    # リサイズ
    resized_image = cv2.resize(image, (new_w, new_h))

    # パディングを計算(下と右に限定)
    top_pad = 0
    bottom_pad = target_height - new_h
    left_pad = 0
    right_pad = target_width - new_w

    # パディングを追加(黒色で埋める)
    padded_image = cv2.copyMakeBorder(
        resized_image, top_pad, bottom_pad, left_pad, right_pad, cv2.BORDER_CONSTANT, value=[0, 0, 0]
    )

    return padded_image

# バウンディングボックスを描画する関数
def draw_bounding_boxes(image, results, original_size, target_size, inference_time):
    """
    推論結果に基づきバウンディングボックスを描画します。
    """
    #print("scale:" + str(scale))
    cv2.putText(image, str(int(inference_time*1000)) + "ms", (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 3, lineType=cv2.LINE_AA)
    cv2.putText(image, str(int(inference_time*1000)) + "ms", (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0,), 1, lineType=cv2.LINE_AA)
    
    for result in results:
        label, prob, coords = result

        # 信頼度が閾値以下の場合はスキップ
        if prob < PROB_THRESHOLD:
            continue

        x_min, y_min, x_max, y_max = coords

        # バウンディングボックスの座標を元のサイズに変換
        x_min = int(x_min / scale)
        y_min = int(y_min / scale)
        x_max = int(x_max / scale)
        y_max = int(y_max / scale)

        # クラスIDからクラス名と色を取得
        class_name = class_names[label] if label < len(class_names) else f"Class_{label}"
        color = class_colors.get(label, (0, 255, 0))  # デフォルト色

        # バウンディングボックスを描画
        cv2.rectangle(image, (x_min, y_min), (x_max, y_max), color, 2)

        # クラス名と確率を描画
        label_text = f"{class_name}: {prob:.2f}"
        label_size, _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        label_x, label_y = x_min, y_min - 10 if y_min - 10 > 10 else y_min + 10
        cv2.putText(image, label_text, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 3, lineType=cv2.LINE_AA)
        cv2.putText(image, label_text, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, lineType=cv2.LINE_AA)
    

def extract_model_dimensions(model_file: str):
    """
    モデルファイル名から画像のheightとwidthを抽出します。

    Parameters:
        model_file (str): モデルファイル名

    Returns:
        tuple: (height, width) または None(抽出できなかった場合)
    """
    # 正規表現で "数字x数字" の形式を抽出
    match = re.search(r'(\d+)x(\d+)', model_file)
    if match:
        # 抽出した値を整数に変換してタプルで返す
        height, width = map(int, match.groups())
        return height, width
    else:
        # 該当しない場合は 固定値 を返す
        return 480,640


# モデルファイル
model_file = "compiled.axmodel"
def gen_frames():
    target_height, target_width = extract_model_dimensions(model_file)

    # カメラを初期化
    cap = cv2.VideoCapture(0)  # デフォルトカメラを指定(インデックスを変更可能)
    if not cap.isOpened():
        raise ValueError("Failed to open the camera")

    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)  
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) 


    print("カメラが起動しました。フレームを取得しています...")

    # 動画保存用設定
    output_video_path = model_file.replace(".axmodel", ".mp4")
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # コーデック(MP4形式)

    frame_size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))

    # VideoWriterを初期化
    i = 0
    while True:
        start_frame_time = time.time()  # フレーム処理の開始時刻
        #print(i)

        start_read_time = time.time()  # pad処理の開始時刻
        ret, image = cap.read()
        if not ret:
            print(f"フレーム取得に失敗しました: フレーム番号 {i}")
            break
        end_read_time = time.time()  # pad処理の終了時刻
        read_time = end_read_time - start_read_time
        # オリジナル画像のサイズを取得
        h_original, w_original, _ = image.shape

        # 画像をリサイズ&パディング
        start_pad_time = time.time()  # pad処理の開始時刻
        padded_image = resize_and_pad(image, target_height, target_width)
        end_pad_time = time.time()  # pad処理の終了時刻
        pad_time = end_pad_time - start_pad_time

        # モデルの推論には Numpy 配列を渡す必要がある
        image_array = np.asarray(padded_image, dtype=np.uint8)

        # NPUを初期化(最初のフレームでのみ実行)
        if i == 0:
            ret = ax_yolov9_module.initialize(model_file, num_class)
            print(ret)
            i = 1

        # 推論処理の時間を測定
        start_inference_time = time.time()  # 推論処理の開始時刻
        # 推論を実行
        results = ax_yolov9_module.inference(model_file, image_array, [target_height, target_width], num_class)
        end_inference_time = time.time()  # 推論処理の終了時刻
        inference_time = end_inference_time - start_inference_time

        # バウンディングボックスをオリジナル画像に描画
        start_drawBB_time = time.time()  # pad処理の開始時刻
        draw_bounding_boxes(image, results, (h_original, w_original), (target_height, target_width), inference_time)
        end_drawBB_time = time.time()  # pad処理の終了時刻
        drawBB_time = end_drawBB_time - start_drawBB_time
        

        # フレームを動画に書き込む
        start_write_time = time.time()  # 推論処理の開始時刻
        #video_writer.write(image)
        ret, buffer = cv2.imencode('.jpg',image)
        # bytesデータ化
        image = buffer.tobytes()
        yield (b'--image\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n')

        end_write_time = time.time()  # 推論処理の終了時刻
        write_time = end_write_time - start_write_time


        # フレーム処理全体の時間を測定
        end_frame_time = time.time()  # フレーム処理の終了時刻
        frame_processing_time = end_frame_time - start_frame_time

        # 処理時間を出力
        print(f"フレーム {i}: read:{read_time:.3f} 秒, pad:{pad_time:.3f} 秒, 推論:{inference_time:.3f} 秒, drawBB{drawBB_time:.3f} 秒, write:{write_time:.3f} 秒, フレーム:{frame_processing_time:.3f} 秒")
        
    # NPUを終了
    ret = ax_yolov9_module.finalize()
    print(ret)

    # リソースを解放
    cap.release()

@app.route('/video_feed')
def video_feed():
    #imgタグに埋め込まれるResponseオブジェクトを返す
    return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=image')

@app.route('/')
@app.route('/index')
def index():
   
    user = {'username' : 'M5Stack Module LLM',
            'modelname': model_file}
    return render_template('index.html', title='home', user=user)

実行コマンド

flask run --host=0.0.0.0

実行すると閲覧用のUrlが表示されるため、同じネットワーク内のPCのウェブブラウザから確認してください。

このように表示されたら成功です。
https://x.com/AirpocketRobot/status/1861755055801802771

おわりに

今回のプロジェクトではM5Stack Module LLMを使ってお気に入りのCVモデルを変換、実行しました。対象としたのはYOLOv9のカスタムモデルでしたが、AXERA社のYOLOv9モデルとほぼ同じ操作で変換、実行できたため難易度はそれほど高くない内容でした。
変換に伴う精度劣化が生じており、最適化を進める余地は十分にありそうです。

一方で、AXERA社が対応していないモデルの変換については、より深いモデルとPulsar2の理解が必要です。精度劣化を抑えたモデルの刈込とポストプロセスの組み立てについてはまだ理解が及んでいないため、更なる勉強が必要です。

Discussion