🎥

【Python】GetClipを使用してAmazon Kinesis Videoから指定時間分の動画をダウンロードするスクリプト

2024/05/07に公開

自分用のメモです🦔

Amazon Kinesis Videoの動画をダウンロードしてローカルに保存するスクリプトです。
GetClipは数時間分の動画を一度にダウンロードできないので、数分単位の動画を時間分ダウンロードします。

アクセスキーとかも雑にスクリプト上で指定しています。

kvs_download.py
import os
import argparse
from datetime import datetime, timedelta

from typing import Any

import boto3  # type: ignore
from tqdm import tqdm  # type: ignore

# AWS認証情報の設定
os.environ['AWS_DEFAULT_REGION'] = 'ap-northeast-1'
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''


def get_args() -> Any:
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--year",
        type=int,
        default=2024,
        help="対象年を指定"
    )
    parser.add_argument(
        "--month",
        type=int,
        default=5,
        help="対象月を指定"
    )
    parser.add_argument(
        "--day",
        type=int,
        default=5,
        help="対象日を指定"
    )
    parser.add_argument(
        "--start_hour",
        type=int,
        default=7,
        help="開始時刻の時を指定"
    )
    parser.add_argument(
        "--interval_minute",
        type=int,
        default=6,
        help="動画1つ分の保存時間(分)"
    )
    parser.add_argument(
        "--end_hour",
        type=int,
        default=9,
        help="終了時刻の時を指定"
    )
    parser.add_argument(
        "--kinesis_name",
        type=str,
        default='',
        help="Kinesisストリーム名を指定"
    )

    args = parser.parse_args()
    return args


def download(
    stream_name: str,
    target_date: str,
    hour_duration: int,
    interval_minute: int,
) -> None:
    """
    指定したKinesisビデオストリームから動画をダウンロードしローカルに保存
    :param stream_name: Kinesisビデオストリームの名前
    :param target_date: 取得を開始する対象日時(YYYYMMDDHHの形式)
    :param hour_duration: 何時間分取得するか
    :param interval_minute: ダウンロードする動画の分単位間隔
    """
    print(stream_name)

    # 開始時刻をUTCに変換
    starttime = datetime.strptime(target_date, "%Y%m%d%H") - timedelta(hours=9)

    # 指定の間隔で動画クリップをダウンロード
    movie_num = int(math.ceil(60 / interval_minute * hour_duration))
    for index in tqdm(range(movie_num)):
        temp_starttime = starttime + timedelta(
            seconds=int(60 * interval_minute * index))
        temp_endtime = temp_starttime + timedelta(seconds=int(60 *
                                                              interval_minute))

        # ディレクトリ名とファイル名用の時刻(JST)
        disp_starttime = temp_starttime + timedelta(hours=9)
        disp_endtime = temp_endtime + timedelta(hours=9)

        # 保存ディレクトリとファイル名を作成
        directory_name = f"{disp_starttime.year:04}{disp_starttime.month:02}{disp_starttime.day:02}"
        mp4_name = f"{disp_starttime.year:04}{disp_starttime.month:02}{disp_starttime.day:02}_{disp_starttime.hour:02}{disp_starttime.minute:02}_{disp_endtime.hour:02}{disp_endtime.minute:02}.mp4"
        save_path = f"{stream_name}/{directory_name}/{mp4_name}"

        os.makedirs(f"{stream_name}/{directory_name}", exist_ok=True)

        # Kinesis Video Streamsのエンドポイントを取得
        kvs_client = boto3.client('kinesisvideo')
        kvs_endpoint = kvs_client.get_data_endpoint(
            StreamName=stream_name,
            APIName='GET_CLIP',
        )
        kvs_client_for_archived = boto3.client(
            'kinesis-video-archived-media',
            endpoint_url=kvs_endpoint['DataEndpoint'],
        )

        # 動画クリップを取得してローカルに保存
        response = kvs_client_for_archived.get_clip(
            StreamName=stream_name,
            ClipFragmentSelector={
                'FragmentSelectorType': 'SERVER_TIMESTAMP',
                'TimestampRange': {
                    'StartTimestamp': temp_starttime,
                    'EndTimestamp': temp_endtime
                }
            },
        )
        payload = response['Payload'].read()
        with open(save_path, 'wb') as f:
            f.write(payload)


def main() -> None:
    args = get_args()

    # 対象日付の情報を取得
    target_year = args.year
    target_month = args.month
    target_day = args.day
    start_hour = args.start_hour
    end_hour = args.end_hour

    interval_minute = args.interval_minute

    hour_duration = timedelta(hours=end_hour - start_hour)
    hour_duration = int(hour_duration.total_seconds() // 3600)

    # ターゲット日付をYYYYMMDDHH形式にフォーマット
    target_date = f"{target_year:04}{target_month:02}{target_day:02}{start_hour:02}"

    # 対象Kinesis名
    kinesis_name = args.kinesis_name

    # Kinesis Video ダウンロード
    download(kinesis_name, target_date, hour_duration, interval_minute)


if __name__ == '__main__':
    main()

Discussion