🎥
【Python】GetClipを使用してAmazon Kinesis Videoから指定時間分の動画をダウンロードするスクリプト
自分用のメモです🦔
Amazon Kinesis Videoの動画をダウンロードしてローカルに保存するスクリプトです。
GetClipは数時間分の動画を一度にダウンロードできないので、数分単位の動画を時間分ダウンロードします。
アクセスキーとかも雑にスクリプト上で指定しています。
kvs_download.py
import os
import argparse
from datetime import datetime, timedelta
from typing import Any
import boto3 # type: ignore
from tqdm import tqdm # type: ignore
# AWS認証情報の設定
os.environ['AWS_DEFAULT_REGION'] = 'ap-northeast-1'
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
def get_args() -> Any:
parser = argparse.ArgumentParser()
parser.add_argument(
"--year",
type=int,
default=2024,
help="対象年を指定"
)
parser.add_argument(
"--month",
type=int,
default=5,
help="対象月を指定"
)
parser.add_argument(
"--day",
type=int,
default=5,
help="対象日を指定"
)
parser.add_argument(
"--start_hour",
type=int,
default=7,
help="開始時刻の時を指定"
)
parser.add_argument(
"--interval_minute",
type=int,
default=6,
help="動画1つ分の保存時間(分)"
)
parser.add_argument(
"--end_hour",
type=int,
default=9,
help="終了時刻の時を指定"
)
parser.add_argument(
"--kinesis_name",
type=str,
default='',
help="Kinesisストリーム名を指定"
)
args = parser.parse_args()
return args
def download(
stream_name: str,
target_date: str,
hour_duration: int,
interval_minute: int,
) -> None:
"""
指定したKinesisビデオストリームから動画をダウンロードしローカルに保存
:param stream_name: Kinesisビデオストリームの名前
:param target_date: 取得を開始する対象日時(YYYYMMDDHHの形式)
:param hour_duration: 何時間分取得するか
:param interval_minute: ダウンロードする動画の分単位間隔
"""
print(stream_name)
# 開始時刻をUTCに変換
starttime = datetime.strptime(target_date, "%Y%m%d%H") - timedelta(hours=9)
# 指定の間隔で動画クリップをダウンロード
movie_num = int(math.ceil(60 / interval_minute * hour_duration))
for index in tqdm(range(movie_num)):
temp_starttime = starttime + timedelta(
seconds=int(60 * interval_minute * index))
temp_endtime = temp_starttime + timedelta(seconds=int(60 *
interval_minute))
# ディレクトリ名とファイル名用の時刻(JST)
disp_starttime = temp_starttime + timedelta(hours=9)
disp_endtime = temp_endtime + timedelta(hours=9)
# 保存ディレクトリとファイル名を作成
directory_name = f"{disp_starttime.year:04}{disp_starttime.month:02}{disp_starttime.day:02}"
mp4_name = f"{disp_starttime.year:04}{disp_starttime.month:02}{disp_starttime.day:02}_{disp_starttime.hour:02}{disp_starttime.minute:02}_{disp_endtime.hour:02}{disp_endtime.minute:02}.mp4"
save_path = f"{stream_name}/{directory_name}/{mp4_name}"
os.makedirs(f"{stream_name}/{directory_name}", exist_ok=True)
# Kinesis Video Streamsのエンドポイントを取得
kvs_client = boto3.client('kinesisvideo')
kvs_endpoint = kvs_client.get_data_endpoint(
StreamName=stream_name,
APIName='GET_CLIP',
)
kvs_client_for_archived = boto3.client(
'kinesis-video-archived-media',
endpoint_url=kvs_endpoint['DataEndpoint'],
)
# 動画クリップを取得してローカルに保存
response = kvs_client_for_archived.get_clip(
StreamName=stream_name,
ClipFragmentSelector={
'FragmentSelectorType': 'SERVER_TIMESTAMP',
'TimestampRange': {
'StartTimestamp': temp_starttime,
'EndTimestamp': temp_endtime
}
},
)
payload = response['Payload'].read()
with open(save_path, 'wb') as f:
f.write(payload)
def main() -> None:
args = get_args()
# 対象日付の情報を取得
target_year = args.year
target_month = args.month
target_day = args.day
start_hour = args.start_hour
end_hour = args.end_hour
interval_minute = args.interval_minute
hour_duration = timedelta(hours=end_hour - start_hour)
hour_duration = int(hour_duration.total_seconds() // 3600)
# ターゲット日付をYYYYMMDDHH形式にフォーマット
target_date = f"{target_year:04}{target_month:02}{target_day:02}{start_hour:02}"
# 対象Kinesis名
kinesis_name = args.kinesis_name
# Kinesis Video ダウンロード
download(kinesis_name, target_date, hour_duration, interval_minute)
if __name__ == '__main__':
main()
Discussion