🎥
ATOM Camの常時記録動画からタイムラプス動画を生成するPythonスクリプトを書いてみた
はじめに
税込2,500円の格安ネットワークカメラ「ATOM Cam」(Amazonでも買えます)で撮影した常時記録動画からタイムラプス動画を生成するPythonスクリプトを書いたので、メモを残しておきます。
背景
ATOM Camには、それ自身にもタイムラプス動画の作成機能がありますが、撮影開始前に条件(撮影間隔など)を決める必要があり、また不意に撮影が停止してしまうことがあったことから、別の方法を考えました。
ATOM Camには、常時映像&音声を記録し、microSDカードに書き込む機能があります。MP4形式、フルHD(1980x1080)、20フレーム/秒の動画ファイルが1分毎のファイルとして生成されます。
以前はそれらの動画ファイル群(撮影時間、条件にもよりますが10時間で6GBほど)をMP4Box
コマンドで結合し、ffmpeg
コマンドで速度の調整を行っていました。
ただ、中間ファイルの容量もそれなりになってしまうため、直接タイムラプス動画を生成するPythonスクリプトを実装しました。
環境
今回はDocker内で実行しました。Ubuntu 20.04 LTS、Python 3系、OpenCVを使って実装しました。Dockerfile
は以下の通りです。
FROM ubuntu:20.04
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
libopencv-dev \
python3-dev \
python3-pip \
&& rm --recursive --force /var/lib/apt/lists/*
RUN python3 -m pip install opencv-python==4.4.0.46
Pythonスクリプト
タイムラプス動画を生成するPythonスクリプトは以下の通りです。だいぶ手抜きなので各種パラメータはハードコーディングされています。
- カレントディレクトリ以下の
*.mp4
を読み込む。 - カレントディレクトリに
out.mp4
というファイル名で動画を描き出す。 - 600フレームに1フレームの割合で書き出す。(つまり600倍速)
- 出力動画の解像度はフルHD(1980x1080)。
- 出力動画のフレームレートは20フレーム/秒。
- 動画読み込みスレッド、動画書き出しスレッドの2スレッドで動作。
#!/usr/bin/env python3
import pathlib
import queue
import threading
import cv2
def read_frame(target_paths, frame_queue):
for path in target_paths:
capture = cv2.VideoCapture(str(path))
if not capture.isOpened():
return
frame_index = 0
while True:
result, frame = capture.read()
if not result:
break
print("[read] {}:{}".format(path, frame_index))
frame_queue.put(frame)
frame_index += 1
capture.release()
frame_queue.put(None)
def write_frame(frame_queue):
video_writer = cv2.VideoWriter(
"out.mp4", cv2.VideoWriter_fourcc("m", "p", "4", "v"), 20, (1920, 1080)
)
frame_index = 0
while True:
frame = frame_queue.get()
try:
if frame is None:
break
if frame_index % 600 == 0:
print("[write] {}".format(frame_index))
video_writer.write(frame)
frame_index += 1
finally:
frame_queue.task_done()
video_writer.release()
target_dir = pathlib.Path(".")
target_paths = sorted(target_dir.glob("**/*.mp4"))
frame_queue = queue.Queue(maxsize=10)
read_frame_worker = threading.Thread(
target=read_frame,
daemon=True,
kwargs={"target_paths": target_paths, "frame_queue": frame_queue},
)
read_frame_worker.start()
write_frame_worker = threading.Thread(
target=write_frame, daemon=True, kwargs={"frame_queue": frame_queue},
)
write_frame_worker.start()
read_frame_worker.join()
write_frame_worker.join()
print("done")
何かの参考になれば幸いです。
Discussion