🙃

PythonでKinesisVideoStreamsをほぼリアルタイム解析する

2021/09/27に公開

はじめに

  • KinesisVideoStreamsの動画からフレームを切り出して、リアルタイムで画像処理と推論をする必要があったのですが、ダイレクトに参考になる記事が見つからなかったので書きました
  • 色々試した結果、KinesisVideoからではなくKinesisVideoArchivedMediaから取得することでフレーム欠けも重複もなく、きれいに動いています(いちおう数か月の動作実績もあります)
  • 自分の環境ではプロデューサータイムスタンプで現在時刻から2秒前のフラグメントを取得できてますので、リアルタイムの要件がよほど厳しくなければ問題ないレベルかと思います

実装手順

準備

  • 環境をDockerで構築していきます
  • Dockerfileは以下のとおり
    • mkvのフレームデータの変換にffmpegを使います
    • 画像の加工用にOpenCVとlibgl1-mesa-devを入れてますが、加工しないなら不要です
    • (2022/1/17追記)imageioのバージョンが新しいとこの後のソースでRuntimeError: ffmpeg can not handle the given uri.が発生するのでバージョンを固定します
FROM ubuntu:18.04

RUN apt-get update && \
  apt-get install -y \
  python3-pip \
  ffmpeg \
  libgl1-mesa-dev

RUN pip3 install -U pip

RUN pip3 install \
  opencv-python \
  boto3 \
  imageio==2.9.0 \
  imageio-ffmpeg=0.4.3

取得処理の実装

  • 前提として、KinesisVideoStreamsのプロデューサー側でフラグメントの継続時間は1秒にしています
  • フラグメントを1秒おきにサブスレッドで取得し、フレームをdequeに溜めていきます
    • 実際には別のスレッドでこのdequeから取得してますが省略
    • フレームレートが高い場合は適当に間引いてください
  • やり方は時間指定でフラグメントの番号のリストを取得して、そこからGetMediaしています
  • AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY、STREAM_NAME、REGION_NAMEは環境に合わせて設定してください(アクセスキーは使わずにIAMで制御すべきですね)
import cv2
import imageio
import boto3
import time
import io
import operator
import timeit
import threading
from datetime import datetime, timedelta
from logging import getLogger, StreamHandler, Formatter, INFO, DEBUG
from collections import deque
from urllib3.exceptions import ProtocolError

# loggerまわりの設定
logger = getLogger(__name__)
logger.setLevel(INFO)  # loglevel変更時はここをDEBUGにする

# 標準出力用の設定
s_handler = StreamHandler()
s_handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(name)s %(funcName)s(): %(message)s"))

# loggerにハンドラを設定
logger.addHandler(s_handler)

# KVSのディレイ秒 (Kinesis-video-archived-mediaから取得するため少し時間差を設定する)
KVS_DELAY_SEC= 2

# KVSのフラグメント継続時間 [s] 
FRAGMENT_DURATION = 1.0

class GetKinesisVideo(threading.Thread):

    def __init__(self):
        super().__init__()
        
        self.last_time = datetime.now() - timedelta(seconds=KVS_DELAY_SEC)

        kvs_client = boto3.client('kinesisvideo',
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            region_name=REGION_NAME
        )
        endpoint_url = kvs_client.get_data_endpoint(
            StreamName=STREAM_NAME,
            APIName='LIST_FRAGMENTS'
        )['DataEndpoint']

        self.media_client = boto3.client('kinesis-video-archived-media',
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            region_name=REGION_NAME,
            endpoint_url=endpoint_url
        )

        self.run_flg = True
        self.setDaemon(True)
        self.frames = deque()

    # フレーム取得
    def get_frame(self):
        # デフォルトタイマ取得
        df_timer = timeit.default_timer()

        # 取得開始時刻
        start_time = self.last_time
        # 取得終了時刻
        end_time = datetime.now() - timedelta(seconds=KVS_DELAY_SEC)

        list_fragment = self.media_client.list_fragments(
            StreamName=STREAM_NAME,
            FragmentSelector={
                'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
                'TimestampRange': {
                    'StartTimestamp': start_time,
                    'EndTimestamp': end_time
                }
            }
        )

        # フラグメントがなかったらここで抜ける
        if list_fragment.get('Fragments') == []:
            # フラグメント継続時間の周期でフラグメントリストを取得しているが
            # プロデューサー側でたまに遅延することがあり、リストが取得できない場合がある
            # 即座に再呼び出しせず少しwaitする
            time.sleep(0.2)
            return

        # 念の為、時刻順にソートしておく
        sorted_list_fragment = sorted(list_fragment['Fragments'], key=operator.itemgetter('ProducerTimestamp'))
        logger.debug(f"start_time:{start_time}, end_time:{end_time}, sorted_list_fragment:{sorted_list_fragment}")
        
        for fragment in sorted_list_fragment:

            try:
                fragment_list = [fragment['FragmentNumber']]
                media = self.media_client.get_media_for_fragment_list(
                    StreamName=STREAM_NAME,
                    Fragments=fragment_list
                )

                # MKV形式のチャンクを取得
                chunk = media['Payload'].read()

                reader = imageio.get_reader(io.BytesIO(chunk), 'ffmpeg') # class 'imageio.plugins.ffmpeg.FfmpegFormat.Reader'
                
                for i, frame in enumerate(reader):
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    # dequeはスレッドセーフなのでlock不要
                    self.frames.append(frame)
            
            # urllib3.exceptions.ProtocolError: media['Payload'].read() でConnectionResetErrorから発生することがある
            except (OSError, ProtocolError) as e:
                logger.info(e)
                self.run_flg = False
                return

        # 終了時刻更新
        self.last_time = end_time

        # 時間調整のwait(チャンクの取得やffmpegでの処理が早く終わると次回のフラグメントリスト取得が空振るため)
        wait_time = FRAGMENT_DURATION - (timeit.default_timer() - df_timer)
        if wait_time > 0:
            time.sleep(wait_time)

    def run(self):
        while self.run_flg:
            start_time = timeit.default_timer()
            self.get_frame()
            logger.debug(f'Time between fragment download and frame processing: {timeit.default_timer() - start_time}')

動かし方

  • 上記のGetKinesisVideoクラスを生成してstart()してやればよいです

注意点

  • 例外処理が雑なのと、たまにffmpegでエラーになったり極まれにKinesisからProtocolErrorが返ってくることがあります
    • そのフラグメントをスキップすれば(欠損はするものの)処理は継続できるので、上記のように即刻終了しない方がいいかもです
  • KinesisVideoに対してアクセスする場合は、同一ストリームに対して1秒間に5回までのリクエスト制限がありますが、KinesisVideoArchivedMediaの場合は特に制限はなさそうでした
  • 上記の場合、フラグメント継続時間とアクセス周期を同じ1秒にしてるので、フラグメントリストと言いつつ、基本的に1つのフラグメント番号が取れますが、時間指定が長い場合やフラグメントが細かい場合はたくさんのフラグメント番号が取れるので一度に取り切れないことがあります(こちらの対応については後述)

応用編

一度に大量のフラグメントを取得

  • KinesisVideoStreamsのコンソール画面からだと、GetClipで3分間までの動画しか取れず不便ですが、上記の応用で大量のフラグメントリストを取得して変換もできます
  • 時刻順でソートしたフラグメントリスト(sorted_list_fragment)の1回目の取得までは上記と同じですが、そのあとNextTokenが空かどうかチェックします
    # 結合用のフラグメントリスト
    combined_fragmentList = []

    for i in sorted_list_fragment:
        combined_fragmentList.append(i['FragmentNumber'])

    while list_fragment.get('NextToken') is not None:
        list_fragment = kvam.list_fragments(
            StreamName=STREAM_NAME,
            NextToken=list_fragment['NextToken']
        )
        
        sorted_list_fragment = sorted(list_fragment['Fragments'], key=operator.itemgetter('ProducerTimestamp'))
        
        for i in sorted_list_fragment:
            combined_fragmentList.append(i['FragmentNumber'])
  • あとは、このcombined_fragmentListに対してget_media_for_fragment_list()を実行すればOKです
  • あまりにフラグメントが多いとそのあとmedia['Payload'].read()したときにかなりの負荷がかかるっぽいので、数分とかで区切りつつ1日分の動画を吐き出したりしてます

あとがき

  • AWS公式からはJavaのパーサーしか出てないようなので、半ばあきらめていたのですが、OpenCVで加工したり推論に使う関係でなるべくPythonでやりたかったので色々調べてこの形に行きつきました

Discussion