Momo改造版を使ってRasPi Zero W+GPUマシンで画像処理をしてみる

8 min read読了の目安(約7800字

はじめに

前回の記事 では画像に加工をして送り返すということをやってみました。今回はMomoのAyameモードのデータチャンネルを利用して検出結果だけを返し、検出結果に応じてRasPi Zero W側でGPIOを使ったアクションを取ってみようと思います。

エッジ側では撮影とGPIOだけに専念できるのでRasPi Zero Wでも十分というデモです。

動作デモはこんな感じになります。カメラの視野をラズパイ本体が遮ると顔が検出されなくなり赤く光る、どかして再び顔が検出されるようになると青く光る、という感じです。

https://youtu.be/rtgz85Lhp3U

ポイント

エッジ側は単に撮影&GPIO処理だけでいいので、ラズパイのZero Wでも十分だということです。

構成

こんな感じ。

ラズパイ側のIOには何でも良かったんですけどちょうど手元にあったBlinkt!を使いました。カメラはスパイカメラも手元にあったのですが、ケーブルをコネクタに上手く挿すことが出来ずに挫折しました。適当なWebカメラにしました。

カメラやWebRTC、シグナリングの機能はすべてMomoに任せ、NNG - nanomsg-NGを使ってMomo-Pythonの通信を行わせます。このNNGを使った部分は現時点では私の自家製改造版にすぎません。時雨堂へのお問い合わせなどはしないようよろしくお願いします。

Linux側のPythonプログラムはDockerの中で動いており、Pub-Subソケットにデコード済み画像が流れてきたらそれをそのままDBFaceに食わせて顔を検出、検出した数が1以上ならデータチャンネルに\x01、0個なら\x00を送るよう、NNGのPushソケットに書き込みます。

ラズパイ側のPythonプログラムは単純にデータチャンネルに流れてくるメッセージを見て、データが\x00だったら赤、\x01だったら青にBlinktを光らせます。

画像の流れは以下のような感じに流れます。USBカメラ->MomoはMJPEG(多分)、Momo-MomoはVP8、Momo-PythonはRGBの生データで流れます。

検出結果はMomo-Momoでデータチャンネルに乗り、それ以外はNNGで通信(同一マシン内でTCP)しています。

Pythonプログラム(Pi Zero W側)

import time
from pynng import Sub0
import blinkt

sub_socket = Sub0()
sub_socket.listen("tcp://*:5567")
topic = b"data/message/"
sub_socket.subscribe(topic)

blinkt.clear()
blinkt.show()
blinkt.set_clear_on_exit()
prev = False

print("Initialize done.")
while True:
    #  Wait for next request from client
    message = sub_socket.recv()
    message_size_idx = message.find(b"/", len(topic)) + 1
    recv_topic = message[:message_size_idx]
    message_body_idx = message_size_idx + 4
    size = int.from_bytes(message[message_size_idx:message_size_idx + 4], byteorder="big")
    message_body = message[message_body_idx:]
    print("message", message, "body", message_body)

    if message_body == b"\x01":
        if not prev:
            blinkt.set_all(0, 0, 255)
            blinkt.show()
            prev = True
    else:
        if prev:
            blinkt.set_all(255, 0, 0)
            blinkt.show()
            prev = False

データ取ってBlinktを光らせてるだけ。簡単ですね。

Python プログラム(GPU Linux側)

顔検出はOpenCVのカスケード分類器でも良いんですけど、あれは検出がイマイチというのと、もうちょっと重いことしても平気ってアピールしたくてDBFaceにしてみました。

import common
import numpy as np
import torch
import torch.nn.functional as F
import torch.nn as nn
import cv2
from pynng import Sub0, Push0
from model.DBFace import DBFace

HAS_CUDA = torch.cuda.is_available()
print(f"HAS_CUDA = {HAS_CUDA}")


def nms(objs, iou=0.5):

    if objs is None or len(objs) <= 1:
        return objs

    objs = sorted(objs, key=lambda obj: obj.score, reverse=True)
    keep = []
    flags = [0] * len(objs)
    for index, obj in enumerate(objs):

        if flags[index] != 0:
            continue

        keep.append(obj)
        for j in range(index + 1, len(objs)):
            if flags[j] == 0 and obj.iou(objs[j]) > iou:
                flags[j] = 1
    return keep


def detect(model, image, threshold=0.4, nms_iou=0.5):

    mean = [0.408, 0.447, 0.47]
    std = [0.289, 0.274, 0.278]

    image = common.pad(image)
    image = ((image / 255.0 - mean) / std).astype(np.float32)
    image = image.transpose(2, 0, 1)

    torch_image = torch.from_numpy(image)[None]
    if HAS_CUDA:
        torch_image = torch_image.cuda()

    hm, box, landmark = model(torch_image)
    hm_pool = F.max_pool2d(hm, 3, 1, 1)
    scores, indices = ((hm == hm_pool).float() * hm).view(1, -1).cpu().topk(1000)
    hm_height, hm_width = hm.shape[2:]

    scores = scores.squeeze()
    indices = indices.squeeze()
    ys = list((indices / hm_width).int().data.numpy())
    xs = list((indices % hm_width).int().data.numpy())
    scores = list(scores.data.numpy())
    box = box.cpu().squeeze().data.numpy()
    landmark = landmark.cpu().squeeze().data.numpy()

    stride = 4
    objs = []
    for cx, cy, score in zip(xs, ys, scores):
        if score < threshold:
            break

        x, y, r, b = box[:, cy, cx]
        xyrb = (np.array([cx, cy, cx, cy]) + [-x, -y, r, b]) * stride
        x5y5 = landmark[:, cy, cx]
        x5y5 = (common.exp(x5y5 * 4) + ([cx]*5 + [cy]*5)) * stride
        box_landmark = list(zip(x5y5[:5], x5y5[5:]))
        objs.append(common.BBox(0, xyrb=xyrb, score=score, landmark=box_landmark))
    return nms(objs, iou=nms_iou)


def nng_demo():
    sub_socket = Sub0()
    sub_socket.listen("tcp://0.0.0.0:5567")
    sub_socket.subscribe(b"frame/")

    data_socket = Push0()
    data_socket.listen("tcp://0.0.0.0:5570")

    track_id = None

    dbface = DBFace()
    dbface.eval()

    if HAS_CUDA:
        dbface.cuda()

    dbface.load("model/dbface.pth")

    print("Initialize done.")

    while True:
        message = sub_socket.recv()
        topic_sep_idx = message.find(b"/")
        stream_sep_idx = message.find(b"/", topic_sep_idx + 1)
        track_sep_idx = message.find(b"/", stream_sep_idx + 1)
        track = message[stream_sep_idx + 1:track_sep_idx]
        track_topic = message[:track_sep_idx]

        if track_id is None:
            sub_socket.unsubscribe(b"frame/")
            sub_socket.subscribe(track_topic)
            track_id = track.decode("utf-8")
            print(f"subscribe frame for track {track_id}")

        cnt_idx = track_sep_idx + 1
        w_idx = cnt_idx + 4
        h_idx = w_idx + 4
        img_idx = h_idx + 4
        cnt = int.from_bytes(message[cnt_idx:cnt_idx + 4], byteorder="big")
        w = int.from_bytes(message[w_idx:w_idx + 4], byteorder="big")
        h = int.from_bytes(message[h_idx:h_idx + 4], byteorder="big")
        # print(topic, track_id, cnt, w, h)
        if len(message[img_idx:]) != h * w * 3:
            print(f"incorrect size expected:{h*w*3} actual:{len(message[img_idx:])} tracktopic:{track_topic} headOfMessage:{message[:80]}")
            continue
        # 受信したのはRGB画像、処理はBGR画像
        print("received image.")
        rgb = np.frombuffer(message[img_idx:], dtype=np.uint8).reshape((h, w, 3))
        frame = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
        objs = detect(dbface, frame)

        # 結果をデータチャンネルに送信
        print(len(objs), "faces found")
        send_message = b"\x01" if len(objs) > 0 else b"\x00"
        print(data_socket.send(send_message))


if __name__ == "__main__":
    nng_demo()    

こちらは環境構築が手間かかるのでDockerにしてしまいましょう。

FROM pytorch/pytorch:1.7.1-cuda11.0-cudnn8-runtime

ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -yqq --no-install-recommends git libopencv-dev
RUN pip install pynng opencv-python numpy

WORKDIR /app
RUN git clone https://github.com/dlunion/DBFace.git

WORKDIR /app/DBFace
COPY nng_dbface.py ./

CMD ["python", "nng_dbface.py"]
version: '3.9'

services:
    nng:
        build: ./
        runtime: nvidia
        environment:
          - NVIDIA_VISIBLE_DEVICES=all
        command: python nng_dbface.py
        ports:
            - "5567:5567"
            - "5570:5570"
        tty: true
        deploy:
            resources:
                reservations:
                    devices:
                    - capabilities: [gpu]

Dockerfileとついでにdocker-compose.ymlを用意して起動しやすくしてみました。

注意事項

繰り返しになりますが、NNGによる改造は現時点では私の自家製改造版にすぎません。時雨堂へのお問い合わせは慎んでください。また、私の自家製NNG対応も改善していくうちにメッセージのフォーマットなどが変わる可能性が高いです(AyameモードでストリームIDに/が入ってくることが分かっているので、その部分の修正は近いうちにやろうとしている)。

デモのパフォーマンス(特に遅延)

LEDの色変化が思っていたより遅延があるように思います。これはAyameモードのMomoが2台で通信しているとき、映像コーデックを指定することができず、VP8で接続しているからだと思います。H264に出来ればハードウエアアクセラレータが働いてフレームレートが高くなるはずです(AyameダッシュボードのWebとの通信だとWeb側に選択肢があるので切り替えられる)。