Momo改造版を使ってRasPi Zero W+GPUマシンで画像処理をしてみる
はじめに
前回の記事 では画像に加工をして送り返すということをやってみました。今回は Momo の Ayame モードのデータチャンネルを利用して検出結果だけを返し、検出結果に応じて RasPi Zero W 側で GPIO を使ったアクションを取ってみます。
エッジ側では撮影と GPIO だけに専念できるので RasPi Zero W でも十分というデモです。
動作デモはこんな感じになります。カメラの視野をラズパイ本体が遮ると顔が検出されなくなり赤く光る、どかして再び顔が検出されるようになると青く光る、という感じです。
ポイント
エッジ側は単に撮影&GPIO 処理だけでいいので、ラズパイの Zero W でも十分だということです。
構成
こんな感じ。
ラズパイ側の IO には何でも良かったんですけどちょうど手元にあったBlinkt!を使いました。カメラはスパイカメラも手元にあったのですが、ケーブルをコネクタに上手く挿すことが出来ずに挫折しました。適当な Web カメラにしました。
カメラや WebRTC、シグナリングの機能はすべて Momo に任せ、NNG - nanomsg-NGを使って Momo-Python の通信させます。この NNG を使った部分は現時点では私の自家製改造版にすぎません。時雨堂へのお問い合わせなどはしないようよろしくお願いします。
Linux 側の Python プログラムは Docker の中で動いています。Pub-Sub ソケットにデコード済み画像が流れてきたらそれをそのままDBFaceに食わせて顔を検出します。検出した数が 1 以上ならデータチャンネルに\x01、0 個なら\x00 を送るよう、NNG の Push ソケットに書き込みます。
ラズパイ側の Python プログラムは単純にデータチャンネルに流れてくるメッセージを見て、データが\x00 だったら赤、\x01 だったら青に Blinkt を光らせます。
画像の流れは以下のような感じに流れます。USB カメラ->Momo は MJPEG(多分)、Momo-Momo は VP8、Momo-Python は RGB の生データで流れます。
検出結果は Momo-Momo でデータチャンネルに乗り、それ以外は NNG で通信(同一マシン内で TCP)しています。
Pythonプログラム(Pi Zero W側)
import time
from pynng import Sub0
import blinkt
sub_socket = Sub0()
sub_socket.listen("tcp://*:5567")
topic = b"data/message/"
sub_socket.subscribe(topic)
blinkt.clear()
blinkt.show()
blinkt.set_clear_on_exit()
prev = False
print("Initialize done.")
while True:
# Wait for next request from client
message = sub_socket.recv()
message_size_idx = message.find(b"/", len(topic)) + 1
recv_topic = message[:message_size_idx]
message_body_idx = message_size_idx + 4
size = int.from_bytes(message[message_size_idx:message_size_idx + 4], byteorder="big")
message_body = message[message_body_idx:]
print("message", message, "body", message_body)
if message_body == b"\x01":
if not prev:
blinkt.set_all(0, 0, 255)
blinkt.show()
prev = True
else:
if prev:
blinkt.set_all(255, 0, 0)
blinkt.show()
prev = False
データ取って Blinkt を光らせてるだけ。簡単ですね。
Python プログラム(GPU Linux側)
顔検出は OpenCV のカスケード分類器でも良いんですけど、あれは検出がイマイチというのと、もうちょっと重いことしても平気ってアピールしたくて DBFace にしてみました。
import common
import numpy as np
import torch
import torch.nn.functional as F
import torch.nn as nn
import cv2
from pynng import Sub0, Push0
from model.DBFace import DBFace
HAS_CUDA = torch.cuda.is_available()
print(f"HAS_CUDA = {HAS_CUDA}")
def nms(objs, iou=0.5):
if objs is None or len(objs) <= 1:
return objs
objs = sorted(objs, key=lambda obj: obj.score, reverse=True)
keep = []
flags = [0] * len(objs)
for index, obj in enumerate(objs):
if flags[index] != 0:
continue
keep.append(obj)
for j in range(index + 1, len(objs)):
if flags[j] == 0 and obj.iou(objs[j]) > iou:
flags[j] = 1
return keep
def detect(model, image, threshold=0.4, nms_iou=0.5):
mean = [0.408, 0.447, 0.47]
std = [0.289, 0.274, 0.278]
image = common.pad(image)
image = ((image / 255.0 - mean) / std).astype(np.float32)
image = image.transpose(2, 0, 1)
torch_image = torch.from_numpy(image)[None]
if HAS_CUDA:
torch_image = torch_image.cuda()
hm, box, landmark = model(torch_image)
hm_pool = F.max_pool2d(hm, 3, 1, 1)
scores, indices = ((hm == hm_pool).float() * hm).view(1, -1).cpu().topk(1000)
hm_height, hm_width = hm.shape[2:]
scores = scores.squeeze()
indices = indices.squeeze()
ys = list((indices / hm_width).int().data.numpy())
xs = list((indices % hm_width).int().data.numpy())
scores = list(scores.data.numpy())
box = box.cpu().squeeze().data.numpy()
landmark = landmark.cpu().squeeze().data.numpy()
stride = 4
objs = []
for cx, cy, score in zip(xs, ys, scores):
if score < threshold:
break
x, y, r, b = box[:, cy, cx]
xyrb = (np.array([cx, cy, cx, cy]) + [-x, -y, r, b]) * stride
x5y5 = landmark[:, cy, cx]
x5y5 = (common.exp(x5y5 * 4) + ([cx]*5 + [cy]*5)) * stride
box_landmark = list(zip(x5y5[:5], x5y5[5:]))
objs.append(common.BBox(0, xyrb=xyrb, score=score, landmark=box_landmark))
return nms(objs, iou=nms_iou)
def nng_demo():
sub_socket = Sub0()
sub_socket.listen("tcp://0.0.0.0:5567")
sub_socket.subscribe(b"frame/")
data_socket = Push0()
data_socket.listen("tcp://0.0.0.0:5570")
track_id = None
dbface = DBFace()
dbface.eval()
if HAS_CUDA:
dbface.cuda()
dbface.load("model/dbface.pth")
print("Initialize done.")
while True:
message = sub_socket.recv()
topic_sep_idx = message.find(b"/")
stream_sep_idx = message.find(b"/", topic_sep_idx + 1)
track_sep_idx = message.find(b"/", stream_sep_idx + 1)
track = message[stream_sep_idx + 1:track_sep_idx]
track_topic = message[:track_sep_idx]
if track_id is None:
sub_socket.unsubscribe(b"frame/")
sub_socket.subscribe(track_topic)
track_id = track.decode("utf-8")
print(f"subscribe frame for track {track_id}")
cnt_idx = track_sep_idx + 1
w_idx = cnt_idx + 4
h_idx = w_idx + 4
img_idx = h_idx + 4
cnt = int.from_bytes(message[cnt_idx:cnt_idx + 4], byteorder="big")
w = int.from_bytes(message[w_idx:w_idx + 4], byteorder="big")
h = int.from_bytes(message[h_idx:h_idx + 4], byteorder="big")
# print(topic, track_id, cnt, w, h)
if len(message[img_idx:]) != h * w * 3:
print(f"incorrect size expected:{h*w*3} actual:{len(message[img_idx:])} tracktopic:{track_topic} headOfMessage:{message[:80]}")
continue
# 受信したのはRGB画像、処理はBGR画像
print("received image.")
rgb = np.frombuffer(message[img_idx:], dtype=np.uint8).reshape((h, w, 3))
frame = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
objs = detect(dbface, frame)
# 結果をデータチャンネルに送信
print(len(objs), "faces found")
send_message = b"\x01" if len(objs) > 0 else b"\x00"
print(data_socket.send(send_message))
if __name__ == "__main__":
nng_demo()
こちらは環境構築が手間かかるので Docker にしてしまいましょう。
FROM pytorch/pytorch:1.7.1-cuda11.0-cudnn8-runtime
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -yqq --no-install-recommends git libopencv-dev
RUN pip install pynng opencv-python numpy
WORKDIR /app
RUN git clone https://github.com/dlunion/DBFace.git
WORKDIR /app/DBFace
COPY nng_dbface.py ./
CMD ["python", "nng_dbface.py"]
version: '3.9'
services:
nng:
build: ./
runtime: nvidia
environment:
- NVIDIA_VISIBLE_DEVICES=all
command: python nng_dbface.py
ports:
- "5567:5567"
- "5570:5570"
tty: true
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]
Dockerfile とついでに docker-compose.yml を用意して起動しやすくしてみました。
注意事項
繰り返しになりますが、NNG による改造は現時点では私の自家製改造版にすぎません。時雨堂へのお問い合わせは慎んでください。また、私の自家製 NNG 対応も改善していくうちにメッセージのフォーマットなどが変わる可能性が高いです。(Ayame モードでストリーム ID に/が入ってくることが分かっているので、その部分の修正は近いうちにやろうとしている)
デモのパフォーマンス(特に遅延)
LED の色変化が思っていたより遅延がありそうです。これは Ayame モードの Momo が 2 台で通信しているとき、映像コーデックを指定できず、VP8 で接続しているからでしょう。H264 に出来ればハードウエアアクセラレータが働いてフレームレートが高くなるはずです(Ayame ダッシュボードの Web との通信だと Web 側に選択肢があるので切り替えられる)。
Discussion