ATOM Camの常時記録動画からタイムラプス動画を生成するPythonスクリプトを書いてみた

3 min読了の目安(約2800字TECH技術記事

はじめに

税込2,500円の格安ネットワークカメラ「ATOM Cam」(Amazonでも買えます)で撮影した常時記録動画からタイムラプス動画を生成するPythonスクリプトを書いたので、メモを残しておきます。

背景

ATOM Camには、それ自身にもタイムラプス動画の作成機能がありますが、撮影開始前に条件(撮影間隔など)を決める必要があり、また不意に撮影が停止してしまうことがあったことから、別の方法を考えました。
ATOM Camには、常時映像&音声を記録し、microSDカードに書き込む機能があります。MP4形式、フルHD(1980x1080)、20フレーム/秒の動画ファイルが1分毎のファイルとして生成されます。

以前はそれらの動画ファイル群(撮影時間、条件にもよりますが10時間で6GBほど)をMP4Boxコマンドで結合し、ffmpegコマンドで速度の調整を行っていました。
ただ、中間ファイルの容量もそれなりになってしまうため、直接タイムラプス動画を生成するPythonスクリプトを実装しました。

環境

今回はDocker内で実行しました。Ubuntu 20.04 LTS、Python 3系、OpenCVを使って実装しました。Dockerfileは以下の通りです。

FROM ubuntu:20.04
RUN apt-get update \
  && DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
    libopencv-dev \
    python3-dev \
    python3-pip \
  && rm --recursive --force /var/lib/apt/lists/*
RUN python3 -m pip install opencv-python==4.4.0.46

Pythonスクリプト

タイムラプス動画を生成するPythonスクリプトは以下の通りです。だいぶ手抜きなので各種パラメータはハードコーディングされています。

  • カレントディレクトリ以下の*.mp4を読み込む。
  • カレントディレクトリにout.mp4というファイル名で動画を描き出す。
  • 600フレームに1フレームの割合で書き出す。(つまり600倍速)
  • 出力動画の解像度はフルHD(1980x1080)。
  • 出力動画のフレームレートは20フレーム/秒。
  • 動画読み込みスレッド、動画書き出しスレッドの2スレッドで動作。
#!/usr/bin/env python3

import pathlib
import queue
import threading

import cv2


def read_frame(target_paths, frame_queue):
    for path in target_paths:
        capture = cv2.VideoCapture(str(path))
        if not capture.isOpened():
            return
        frame_index = 0
        while True:
            result, frame = capture.read()
            if not result:
                break
            print("[read] {}:{}".format(path, frame_index))
            frame_queue.put(frame)
            frame_index += 1
        capture.release()
    frame_queue.put(None)


def write_frame(frame_queue):
    video_writer = cv2.VideoWriter(
        "out.mp4", cv2.VideoWriter_fourcc("m", "p", "4", "v"), 20, (1920, 1080)
    )

    frame_index = 0
    while True:
        frame = frame_queue.get()
        try:
            if frame is None:
                break
            if frame_index % 600 == 0:
                print("[write] {}".format(frame_index))
                video_writer.write(frame)
            frame_index += 1
        finally:
            frame_queue.task_done()

    video_writer.release()


target_dir = pathlib.Path(".")
target_paths = sorted(target_dir.glob("**/*.mp4"))

frame_queue = queue.Queue(maxsize=10)

read_frame_worker = threading.Thread(
    target=read_frame,
    daemon=True,
    kwargs={"target_paths": target_paths, "frame_queue": frame_queue},
)
read_frame_worker.start()

write_frame_worker = threading.Thread(
    target=write_frame, daemon=True, kwargs={"frame_queue": frame_queue},
)
write_frame_worker.start()

read_frame_worker.join()
write_frame_worker.join()

print("done")

何かの参考になれば幸いです。