📽️

InsightFaceを使って動画から顔検出してみた

2021/08/10に公開

目次

初めに

前回の記事でInsightFaceをGPU上で動かすことができました。

ある程度の速度(1フレームあたり50ミリ秒)で動かすとができるようになったので、InsightFaceを動画に適用し、結果を簡易的に可視化してみました。

認識結果

まずは、顔検出の結果(バウンディングボックスのみ)を可視化した動画(アニメーションGIFですが)をご覧ください。
顔検出の結果は安定しており、かなり角度がついた状態でも顔を認識できていることが分かります。(元の動画は30fpsなので、実際にはもっと滑らかに動きます)

動画はPixabayからお借りしました。

ほぼ自分用のメモ: MP4動画からアニメーションGIFへの変換には、FFmpegとImageMagickを使いました。(以下の例は、フレームレートが5フレーム/秒、先頭5秒を切り出す場合)

ffmpeg -i output.mp4 -an -r 5 -t 5 -s 480x270 tmp/%04d.png
convert tmp/*.png output.gif

InsightFaceを動画に適用する

InsightFaceを動画に適用するにあたり、新たに実装した処理は以下の通りです。

  • 動画からフレーム画像を読み込む
  • マルチスレッド/マルチプロセスで処理する
  • 可視化する
  • その他の工夫

動画からフレーム画像を読み込む

動画からフレーム画像を読み込む処理は、OpenCV(のPythonライブラリ)を使って簡単に実装することができます。

import cv2

video_capture = cv2.VideoCapture("foo.mp4")
result, frame = video_capture.read()

フレーム画像(上記ではframe)に対してInsightFaceを適用するだけで、各フレームの顔検出が行えます。簡単ですね。

なお、MOT(Multiple Object Tracking)を適用して、数フレームおきに顔検出を行い、残りはトラッキングで処理する方法もありますが、今回は単純化のためにすべてのフレームで顔検出を行っています。

マルチスレッド/マルチプロセスで処理する

上記の通り、動画からのフレーム画像の取得と顔検出は簡単に行えるのですが、フレーム数分処理する必要があるため、ある程度の時間が掛かります。
できるだけ短時間で処理したいため、今回は顔検出処理を別プロセスに分割し、複数のプロセスを束ねて処理できるようにしました。

以前の記事で実装した顔検出サーバ(detector-insightface)を少し改造し、JPEG画像、PNG画像の他にNumPy形式の画像を処理できるようにしました。
こうすることで、劣化することなく顔検出サーバに画像を渡すことができます。また、余分なエンコード/デコードも不要です。
ただし、圧縮されていない状態の画像を扱うことになるため、ある程度の通信帯域を必要とします。(今回は同一マシンで動作させたため、あまり気になりませんでしたが)

顔検出サーバを呼び出す側のプロセスも、複数のスレッドに分割してパイプライン処理するようにしました。具体的には、以下の3つのスレッドに分割しました。

  • 動画ファイルからのフレーム画像の読み込み(デコード)
  • 顔検出サーバの呼び出し(顔検出)
  • 結果ファイルへの書き出し(書き出し)

デコード、書き出しについては、それぞれ1スレッドで処理を行っています。
顔検出については複数のスレッドで処理し、顔検出サーバの能力や数に応じてスレッド数を調整することができます。
簡単に実験してみた結果、GPU 1台につき顔検出サーバを2プロセス、顔検出サーバ 1プロセスにつき2スレッドを用意すると、最大のスループットを得ることができました。

マルチスレッド処理については標準ライブラリのthreadingqueueを使って愚直に実装しました。
パイプライン処理を実装するのに便利なライブラリなどがあれば教えて頂けると嬉しいです。

可視化する

本格的な可視化は、別途Next.jsを使ったウェブアプリ側で実施する予定のため、今回は簡易的な可視化だけを実装しました。
具体的にはOpenCVを使い、バウンディングボックスをオーバーレイした動画を出力しています。

その他の工夫

その他の工夫した点として、各スレッドにて処理回数、処理時間、スループットなどを算出して出力するようにしました。
スレッド数の調整などに便利でした。

ビルド&実行

コード全体はGitHub上のリポジトリ「202107-face-detector」にあります。
この記事に対応するタグは20210810aです。

https://github.com/nayutaya/202107-face-detector/tree/20210810a

ビルド、実行例は以下の通りです。

# リポジトリを取得
git clone https://github.com/nayutaya/202107-face-detector.git
cd 202107-face-detector
# タグをチェックアウト
git checkout 20210810a
# Dockerイメージをビルドする
docker-compose build
# Dockerコンテナをバックグラウンドで起動する
docker-compose up -d
# video-analyzerコンテナを起動する
docker-compose run --rm video-analyzer

# video-analyzerコンテナ内: 顔検出する
./src/main.py pixabay_76889_960x540.mp4 pixabay_76889_960x540.mp4.jsonl
# video-analyzerコンテナ内: 結果を動画として書き出す
./src/overlay.py pixabay_76889_960x540.mp4 pixabay_76889_960x540.mp4.jsonl output.mp4

# (試したあとに)Dockerコンテナを停止する
docker-compose down

コード

特に解説、コメントなどはありませんが、主要なコードを貼っておきます。

video-analyzer/src/main.py

今回は手抜きのためハードコーディングされていますが、detector_base_urlsを調整することでバックエンドを変更することができます。
また、エラー処理はほとんど実装されていませんのでご注意ください。

#!/usr/bin/env python3

import hashlib
import io
import itertools
import json
import logging
import pathlib
import queue
import threading
import time

import click
import cv2
import numpy as np
import requests


def calc_sha1_hash(path):
    with path.open("rb") as file:
        return hashlib.sha1(file.read()).hexdigest()


def make_video_meta(video_file_path, video_capture):
    return {
        "sha1": calc_sha1_hash(video_file_path),
        "size": video_file_path.stat().st_size,
        "width": int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)),
        "height": int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)),
        "fps": video_capture.get(cv2.CAP_PROP_FPS),
        "numberOfFrames": int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)),
    }


def dump_json_compact(obj):
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


def dump_numpy(ary):
    bio = io.BytesIO()
    np.save(bio, ary)
    return bio.getvalue()


def read_thread(video_file_path, video_capture, output_queue):
    logging.info("start")

    total_count = 0
    total_time_ns = 0

    for frame_index in itertools.count():
        start_ns = time.perf_counter_ns()
        result, frame = video_capture.read()
        time_ns = time.perf_counter_ns() - start_ns
        if not result:
            break

        output_queue.put(
            {
                "video_file_path": video_file_path,
                "frame_index": frame_index,
                "frame": frame,
            }
        )

        total_count += 1
        total_time_ns += time_ns

    logging.info("total_count=%d", total_count)
    logging.info("total_time[ms]=%d", total_time_ns / 1000 / 1000)
    logging.info("mean_time[ms]=%f", total_time_ns / total_count / 1000 / 1000)
    logging.info("throughput/sec=%f", 1_000_000_000 / (total_time_ns / total_count))
    logging.info("end")


def detection_thread(input_queue, output_queue, detector_base_url):
    logging.info("start")
    logging.info("detector_base_url=%s", detector_base_url)

    url = detector_base_url + "/detect"
    total_count = 0
    total_time_ns = 0

    while True:
        frame_info = input_queue.get()
        try:
            if frame_info is None:
                break
            logging.info("frame_index=%d", frame_info["frame_index"])

            start_ns = time.perf_counter_ns()

            files = {
                "file": (
                    frame_info["video_file_path"].name,
                    dump_numpy(frame_info["frame"]),
                    "application/octet-stream",
                ),
            }
            response = requests.post(url, files=files)
            assert response.status_code == 200
            result = response.json()

            time_ns = time.perf_counter_ns() - start_ns

            output_queue.put(
                {
                    "video_file_path": frame_info["video_file_path"],
                    "frame_index": frame_info["frame_index"],
                    "frame": frame_info["frame"],
                    "result": result,
                }
            )

            total_count += 1
            total_time_ns += time_ns
        finally:
            input_queue.task_done()

    logging.info("total_count=%d", total_count)
    logging.info("total_time[ms]=%d", total_time_ns / 1000 / 1000)
    logging.info("mean_time[ms]=%f", total_time_ns / total_count / 1000 / 1000)
    logging.info("throughput/sec=%f", 1_000_000_000 / (total_time_ns / total_count))
    logging.info("end")


def write_thread(input_queue, output_file_path, video_meta):
    logging.info("start")
    logging.info("output_file_path=%s", str(output_file_path))

    total_count = 0
    total_time_ns = 0

    with output_file_path.open("w") as output_file:
        output_file.write(dump_json_compact(video_meta) + "\n")
        while True:
            frame_info = input_queue.get()
            try:
                if frame_info is None:
                    break

                start_ns = time.perf_counter_ns()

                record = {
                    "frame_index": frame_info["frame_index"],
                    "result": frame_info["result"],
                }
                output_file.write(dump_json_compact(record) + "\n")

                time_ns = time.perf_counter_ns() - start_ns

                total_count += 1
                total_time_ns += time_ns
            finally:
                input_queue.task_done()

    logging.info("total_count=%d", total_count)
    logging.info("total_time[ms]=%d", total_time_ns / 1000 / 1000)
    logging.info("mean_time[ms]=%f", total_time_ns / total_count / 1000 / 1000)
    logging.info("throughput/sec=%f", 1_000_000_000 / (total_time_ns / total_count))
    logging.info("end")


def start_read_worker(**kwargs):
    worker = threading.Thread(
        name="read",
        target=read_thread,
        daemon=True,
        kwargs=kwargs,
    )
    worker.start()
    return worker


def start_detection_workers(detector_base_urls, input_queue, output_queue):
    workers = []

    for thread_index, detector_base_url in enumerate(detector_base_urls):
        worker = threading.Thread(
            name="detection#{}".format(thread_index),
            target=detection_thread,
            daemon=True,
            kwargs={
                "input_queue": input_queue,
                "output_queue": output_queue,
                "detector_base_url": detector_base_url,
            },
        )
        worker.start()
        workers.append(worker)

    return workers


def start_write_worker(**kwargs):
    worker = threading.Thread(
        name="write",
        target=write_thread,
        daemon=True,
        kwargs=kwargs,
    )
    worker.start()
    return worker


def join_all_workers(workers, input_queue):
    for _ in workers:
        input_queue.put(None)
    for worker in workers:
        worker.join()


@click.command()
@click.argument("video_file_path", type=click.Path(exists=True))
@click.argument("output_file_path", type=click.Path(exists=False))
def main(video_file_path, output_file_path):
    threading.current_thread().name = "main"
    logging.info("start")

    video_file_path = pathlib.Path(video_file_path)
    output_file_path = pathlib.Path(output_file_path)
    detector_base_urls = [
        "http://detector-insightface:8000",
        "http://detector-insightface:8000",
    ]

    video_capture = cv2.VideoCapture(str(video_file_path))
    assert video_capture.isOpened()
    video_meta = make_video_meta(video_file_path, video_capture)

    frame_queue = queue.Queue(maxsize=10)
    detection_queue = queue.Queue()

    read_worker = start_read_worker(
        video_file_path=video_file_path,
        video_capture=video_capture,
        output_queue=frame_queue,
    )
    detection_workers = start_detection_workers(
        detector_base_urls=detector_base_urls,
        input_queue=frame_queue,
        output_queue=detection_queue,
    )
    write_worker = start_write_worker(
        input_queue=detection_queue,
        output_file_path=output_file_path,
        video_meta=video_meta,
    )

    read_worker.join()
    join_all_workers(detection_workers, frame_queue)
    join_all_workers([write_worker], detection_queue)

    logging.info("end")


if __name__ == "__main__":
    logging.basicConfig(
        format="%(asctime)s %(levelname)s [%(threadName)s] %(message)s",
        level=logging.INFO,
    )
    main()

video-analyzer/src/overlay.py

#!/usr/bin/env python3

import itertools
import json

import click
import cv2


def read_result_file(file_path):
    video_frame_table = {}
    with open(file_path, "r") as file:
        video_meta = json.loads(file.readline())
        for line in file:
            if line is None:
                break
            record = json.loads(line)
            frame_index = record["frame_index"]
            video_frame_table[frame_index] = record
    return (video_meta, video_frame_table)


def draw_detection_result(frame, result):
    color = (255, 255, 255)
    faces = result["result"]["response"]["faces"]
    for face in faces:
        bbox = face["boundingBox"]
        x1, y1, x2, y2 = bbox["x1"], bbox["y1"], bbox["x2"], bbox["y2"]
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, thickness=2)


@click.command()
@click.argument("video_file_path", type=click.Path(exists=True))
@click.argument("result_file_path", type=click.Path(exists=True))
@click.argument("output_file_path", type=click.Path(exists=False))
def main(video_file_path, result_file_path, output_file_path):
    video_meta, video_frame_table = read_result_file(result_file_path)

    video_capture = cv2.VideoCapture(video_file_path)
    video_writer = cv2.VideoWriter(
        output_file_path,
        cv2.VideoWriter_fourcc("m", "p", "4", "v"),
        video_meta["fps"],
        (video_meta["width"], video_meta["height"]),
    )

    for frame_index in itertools.count():
        result, frame = video_capture.read()
        if not result:
            break

        draw_detection_result(frame, video_frame_table[frame_index])
        video_writer.write(frame)

    video_writer.release()


if __name__ == "__main__":
    main()

終わりに

InsightFaceを動画に適用し、それっぽい結果を得ることができました。

今回はOpenCVで顔検出の結果をオーバーレイして動画として出力しましたが、これでは結果の動画ファイルが増えて管理が大変です。
次回は、ウェブアプリ上でのオーバーレイを試してみたいと思っています。

InsightFaceの顔検出結果をブラウザ上で動画にオーバーレイ表示してみた』に続く。

Discussion