🎞

Momo に ZeroMQ を組み込んでみる

2021/04/10に公開

はじめに

時雨堂のMomoという製品があります。オープンソースプロダクトで WebRTC で映像や音声を送受信できるネイティブクライアントです。

一方で最近はディープラーニングを中心に画像処理関連の研究の進展が著しいです。有名どころの顔検出をはじめとして、様々なことがリアルタイムでも処理できるようになってきています。

そうなると、この 2 つをがっちゃんこして WebRTC で受信した映像をリアルタイムに処理したくなりますよね。ところが、以下の理由などにより Momo の改造で画像処理を行うのは難しいと考えられていました。

  • Momo が C++で書かれていていること
  • 高速と言ってもさすがに全てのフレームを処理できるほどではない

この記事ではZeroMQというメッセージングライブラリを使うことで Momo 側の処理は最低限にし、別プロセスで様々な画像処理などを行えるよう、Momo を改造した話をします。

ZeroMQ

ZeroMQ とは、メッセージングライブラリの 1 つで、以下のような特徴を持っています。

  • 様々なトランスポートに対応(in-process/IPC/TCP/multicast, ...)
  • 様々なメッセージングパターンに対応(fan-out/pub-sub/req-rep, ...)
  • 様々な言語のバインディングがある(C/C++/Python/Go/Java/Lua/Erlang...)

Momoの改造

音声や外部プロセス→Momo のことはひとまず置いといて、Momo→外部プロセスだけを実装しました。
雑な説明をすると、SDLRendererを参考に、デコードしたフレーム画像を ZeroMQ に送信するZMQSenderクラスを作り、SDLRendererのと差し替える感じにしました。
Fork したリポジトリのmomo-with-zeromqブランチにあります。
プロトタイプなので動作保証は出来ませんし、ビルド方法などの問い合わせにも原則として答えられません。また、独自に改造したものなので時雨堂への問い合わせは絶対に行わないでください。Momo の Discord には参加しているので、そこで直接聞いていただければ答えられるものには答えます。

現状、自分が使えれば良いというだけなのでローカルビルド(追加したスクリプト)と ubuntu-18.04_x86_64 ビルドにのみ対応しています。

ZeroMQメッセージ

PUB-SUB モデルなので、受信側は適宜トピックを SUBSCRIBE する必要があります。現在は以下の 3 種類のトピックでメッセージを送信しています。

  • track/add: トラックが追加されたとき、トラック ID と共に送信されます
  • track/remove: トラックが削除されたとき、トラック ID と共に送信されます
  • frame/<track_id>: デコード後の画像フレームが送信されます。詳細は後述

画像フレームメッセージ

  • トピック(frame/<track_id>
  • フレームカウント(32bit int, ビッグエンディアン)
  • 画像幅(32bit int, ビッグエンディアン)
  • 画像高さ(32bit int, ビッグエンディアン)
  • フレームカウント(32bit int, ビッグエンディアン)
  • 画素データ(点順次、RGB 24 bit/pixel)

外部プロセスの実装例

今回の例はどちらも Python + OpenCV で作りましたが、Pillow でも出来るはずです。
また、ZeroMQ のライブラリさえあれば他の言語でも構わないです。

例1: 動作確認用

import time
import zmq
import numpy as np
import cv2

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("frame/")
socket.set_hwm(1)
socket.bind("tcp://*:5567")

while True:
    #  Wait for next request from client
    message = socket.recv_multipart()
    cnt = int.from_bytes(message[1], byteorder="big")
    w = int.from_bytes(message[2], byteorder="big")
    h = int.from_bytes(message[3], byteorder="big")
    color = np.frombuffer(message[4], dtype=np.uint8).reshape([h, w, 3])
    color = cv2.cvtColor(color, cv2.COLOR_RGB2BGR)

    if cnt % 30 == 0:
        cv2.imwrite("image.png", color)
        print(f'count:{cnt} drop:{drop} W:{w} H:{h} pix:{w*h}')

画像フレームを受信して 30 フレームごとにimage.pngへ保存しています。VS Code やエクスプローラのプレビューなどリアルタイムに反映されるツールで見ていると受信出来ているのが分かります。

画像は RGB で送信されてくるので OpenCV の場合は BGR に置き換えてあげる必要があります。

例2: 画像処理をして送り返す例

import time
import zmq
import numpy as np
import cv2
import pyvirtualcam

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("track/add")
socket.set_hwm(1)
socket.bind("tcp://*:5567")

cam_w = 640
cam_h = 480

num_proc = 0

track_id = None

with pyvirtualcam.Camera(device='/dev/video0', width=cam_w, height=cam_h, fps=30) as cam:
    print(f'Using virtual camera: {cam.device}')
    frame = np.zeros((cam.height, cam.width, 3), np.uint8)  # RGB
    frame[:] = 128
    cam.send(frame)
    cam.sleep_until_next_frame()
    while True:
        #  Wait for next request from client
        message = socket.recv_multipart()
        topic = message[0].decode("utf-8")
        if topic == "track/add":
            if track_id is None:
                track_id = message[1].decode("utf-8")
                socket.subscribe(f"frame/{track_id}")
                print(f"subscribe frame for track {track_id}")
        if topic.startswith("frame"):
            cnt = int.from_bytes(message[1], byteorder="big")
            w = int.from_bytes(message[2], byteorder="big")
            h = int.from_bytes(message[3], byteorder="big")
            color = np.frombuffer(message[4], dtype=np.uint8).reshape([h, w, 3])

            color = cv2.line(color, (0, 0), (w, h), (0, 0, 255), 10)
            cam_color = cv2.resize(color, (cam_w, cam_h))
            cam.sleep_until_next_frame()
            cam.send(cam_color)

            num_proc += 1
            if num_proc % 100 == 0:
                print(f'count:{cnt} drop:{drop} W:{w} H:{h} pix:{w*h}')
                time.sleep(1)

外部プロセス→Momo の受け渡しを実装してなかったのは、この例でも使っているpyvirtualcamを使うと簡単に仮想カメラのようなものが作れるからです。仮想カメラに出力できてさえいれば、Momo の通常の機能でそのカメラ映像を送信することが出来ます。

この例では最初はtrack/addだけを購読して、トラックが追加されるのを待っています。最初に追加されたトラック ID のframe/<track_id>を購読することで、そのトラックの画像フレームのみを受信して処理します。

画像処理としてはここでは例なので単純に左上から右下へ青い線を引いています。

最後でわざとらしく 100 フレーム処理するごとに 1 秒のスリープを入れてます。これは画像処理が重くて次の画像フレームが届くまでに処理が終わらなかった場合どうなるかをデモンストレーションしています。ZeroMQ の PUB-SUB の場合は、メッセージが受け取られなかった場合、黙って破棄されるようになっています。送信側・受信側両方で HWM(High Water Mark)を 1 に設定しているので、どちらにもバッファされないようになっていて、できるだけ最新に近い画像フレームを常に受け取ることが出来ます。

以下が動作させたときの様子です。緑色の背景の接続の画像だけを処理していること、時々止まるけどその後は最新のフレームを処理していることが分かります。

https://youtu.be/2mZEtUb2-Ps

Momo本体への取り込み

Momo 側の変更は小さいので、Momo 本体へ取り込まれると嬉しいです。
ですが、残念ながら ZeroMQ のライブラリである libzmq(LGPL)を組み込むことが出来ないというのが時雨堂の方針とのことです。

nanoMicrosoftg/nng への切り替え(2021/04/11追記)

ライセンス問題で取り込まれないとの話なので、別のライブラリを探してみました。nng というのが "it's a spiritual successor to ZeroMQ" とも呼ばれるくらいには ZeroMQ の後継と言えるライブラリのようです。

早速 momo-with-nng ブランチで試してみました。

Python 側はこんな感じになります。

import time
import numpy as np
import cv2
import pyvirtualcam
from pynng import Sub0

socket = Sub0()
socket.listen("tcp://127.0.0.1:5567")
socket.subscribe(b"frame/")

prev = -1
drop = 0

cam_w = 640
cam_h = 480

num_proc = 0

track_id = None

with pyvirtualcam.Camera(device='/dev/video0', width=cam_w, height=cam_h, fps=30) as cam:
    print(f'Using virtual camera: {cam.device}')
    frame = np.zeros((cam.height, cam.width, 3), np.uint8)  # RGB
    frame[:] = 128
    cam.send(frame)
    cam.sleep_until_next_frame()
    while True:
        message = socket.recv()
        topic_sep_idx = message.find(b"/")
        track_sep_idx = message.find(b"/", topic_sep_idx + 1)
        topic = message[:topic_sep_idx]
        track = message[topic_sep_idx + 1:track_sep_idx]
        track_topic = message[:track_sep_idx]

        if track_id is None:
            socket.unsubscribe(b"frame/")
            socket.subscribe(track_topic)
            track_id = track.decode("utf-8")
            print(f"subscribe frame for track {track_id}")
            continue

        cnt_idx = track_sep_idx + 1
        w_idx = cnt_idx + 4
        h_idx = w_idx + 4
        img_idx = h_idx + 4
        cnt = int.from_bytes(message[cnt_idx:cnt_idx + 4], byteorder="big")
        w = int.from_bytes(message[w_idx:w_idx + 4], byteorder="big")
        h = int.from_bytes(message[h_idx:h_idx + 4], byteorder="big")
        if len(message[img_idx:]) != h * w * 3:
            print(f"incorrect size expected:{h*w*3} actual:{len(message[img_idx:])}")
            continue
        color = np.frombuffer(message[img_idx:], dtype=np.uint8).reshape((h, w,
3))
        color = cv2.cvtColor(color, cv2.COLOR_RGB2BGR)

        cv2.line(color, (0, 0), (w, h), (0, 0, 255), 10)
        cam_color = cv2.cvtColor(cv2.resize(color, (cam_w, cam_h)), cv2.COLOR_BGR2RGB)
        cam.sleep_until_next_frame()
        cam.send(cam_color)
        if drop > -1 and cnt != prev + 1:
            drop += cnt - prev
            print(f"dropped {cnt - prev}")
        prev = cnt
        num_proc += 1
        if num_proc % 100 == 0:
            print(f'count:{cnt} drop:{drop} W:{w} H:{h} pix:{w*h} id:{track_id}')
            time.sleep(1)

接続関係はオプションなしでも期待するバッファ無しの動作をしているようでした。

メッセージオブジェクトを使ったやり取りが Python bindings では書きにくい感じなのでバイナリ列そのままのやり取りになっています。また、マルチパートメッセージもないので自分で分解しています。

メッセージの組み立て・分解関係が一手間かかるのは気になるものの、それ以外は安定して動いているようですし、割と良さそうです。

Discussion