🐻

【AI CAPTURE】熊撃退プログラム

に公開

はじめに

現在、Raspberry Piをベースとした、AI CAPTUREという監視カメラシステムを公開中です。

https://aicap.daddysoffice.com

このシステムで動く検知プログラムとして、熊などの害獣を検知するとPush通知と共に音を出して害獣を追い払うプログラムを公開しましたので、そのプログラムの内容について説明します。

全プログラムはGitHubで公開中です。
https://github.com/daddyYukio/AICAPTURE/tree/main/programs/bear_repellent

プログラムの概要

このプログラムは物体検出モデルのYOLO11の物体検知を使用しています。

https://docs.ultralytics.com/ja/tasks/detect/

カメラフレーム画像を取得し、YOLOで推論を行うと、フレーム画像内で検出されたオブジェクトを囲むバウンディングボックスの座標が、クラスラベルと各ボックスの信頼度スコアとともに返却されます。

その結果を見てあらかじめ設定したオブジェクトが検出されたら、Push通知を送信すると同時に、音(WAVファイル)を再生する、というプログラムになります。

音の再生は、AIBOX(Raspberry Pi 5)にUSBで接続されたサウンド出力デバイスに対して行います。

Docker

使用するDockerイメージ

AIBOX OSには、標準でYOLO11をNCNNで実行できるDocker Imageが登録されています。

このイメージは、Ultralyticsが公開しているYOLO11用のイメージをベースに、ncnn変換・実行に必要なモジュールのインストールや、aicapコマンドを実行するための環境構築処理が行われていますので、 AIBOX OS上で稼働させるYOLO11を使用した基本的な検知プログラムは、このイメージから起動したコンテナ上で動作させることが可能になっています。

Dockerfileはこちらです。

https://github.com/daddyYukio/AICAPTURE/blob/main/docker/ultralytics/Dockerfile

ただ、今回実行する熊撃退プログラムは、YOLOによる物体検知の他に、音声を出力するための機能が追加で必要になりますので、この標準搭載のDocker イメージをベースに、音声出力に必要なモジュール等をインストールしたイメージを作成する必要があります。

ベースイメージに音声出力関係のモジュールをインストールしたDockerイメージ作成に使用するDockerfileはこちらを参照してください。
https://github.com/daddyYukio/AICAPTURE/blob/main/docker/audio/Dockerfile

ただし、AIBOXでのDocker イメージの登録には少し手順が必要になります。こちらの記事を参考にして、音声対応のイメージを構築&登録してください。

docker-compose.yml

今回のプログラムでは、ホストにUSB接続されたサウンド出力デバイスを使用します。

そのため、起動するコンテナから、ホストに接続されているオーディオデバイスにアクセスできるように、docker-compose.ymlに、下記の修正が必要です。

  1. ホストに接続されているオーディオデバイスのマウント
  2. 起動するコンテナに、上記デバイスへのアクセス権限を付与

使用するdocker-compose.ymlはこちらです。

https://github.com/daddyYukio/AICAPTURE/blob/main/programs/bear_repellent/docker-compose.yml

まずは、ホストに接続されているオーディオデバイスのパスを[devices]でマッピングします。

devices:
  - /dev/snd:/dev/snd      # オーディオデバイスをマウント

さらに、Linuxでは、オーディオデバイス(/dev/snd/*など)は通常 audio グループに属していますので、コンテナ内のプロセスにも、このaudioグループに追加する必要があります。

group_add:
  - audio              # audioグループに追加(権限付与)

これで、Dockerコンテナ上のPythonプログラムから音を出すことができるようになります。

処理説明

関数一覧

プログラム(extmod.py)は、下記の基本的な4つの関数と、3つの音再生関数、それらを呼び出すメイン関数から構成されています。

基本関数

関数名 説明
get_frame aicap get_frameコマンドを実行します。
フレーム画像は標準出力経由でメモリ上で取得します。
push aicap pushコマンドを実行します。
送信する画像は、標準入力経由で引数に指定して、検出した物体を枠で囲んで静止時間を書き込んだ画像を送信します。
create_result_jpeg get_frameで取得したカメラのフレーム画像に、検出した物体を枠で囲んだJPEG画像を作成しします。
parse_results YOLOから返却される結果を使いやすいように成形します。

音再生関数

関数名 説明
play_wav WAVファイルを再生します。再生は非同期で行われ、stop_wav()関数がコールされるか、WAV_PLAY_TIME_SEC経過するまでリピート再生します。すでに再生中にコールされた場合は、再生停止時間が延長されます
stop_wav 再生中のWAVファイルを停止します。
_play_wav_thread 実際にWAVファイルを再生するスレッド関数です。sounddevice(PortAudio)を使用して、指定されたWAVファイルを停止するまでスープ再生します。

処理の流れとしては、メイン関数内で上記の関数を以下のように繰り返し実行します。

カメラのフレーム画像取得

AIBOX OS組み込みコマンドのaicap get_frameコマンドを実行して、カメラのフレーム画像をJPEGで取得します。
取得した時間をunixtimeで保持した後、Yoloに渡すために、PLI Imageに変換します。

# ビデオ映像取得
frame = get_frame()
timestamp = int(datetime.now(tz=timezone.utc).timestamp())

# PLI Imageに変換
img = Image.open(BytesIO(frame))

aicapコマンドの詳細はこちらを参照してください

YOLOで物体検知

PLIImageに変換したフレーム画像を引数に、物体検知を実行します。

# 物体検知実行
results = model.predict(
    img, 
    conf=CONF, 
    iou=IOU, 
    classes=CLASSES, 
    verbose=True)

引数の説明は以下です。

引数 説明
img 入力画像
conf 信頼度(confidence)閾値
外部変数CONFで定義
iou IoU(Intersection over Union)閾値
外部変数IOUで定義
classes 検出するクラスを限定するためのリスト
例:classes=[0, 2] → クラスID 0(人)と2(車)だけ検出
指定しないとすべてのクラスが対象
外部変数CLASSESで定義
verbose 詳細ログを出すかどうか

このpredictを実行すると、物体検出結果(results)が返却されます。

返却される結果の主なプロパティは以下です。

  • results[i].boxes:
    • xyxy → 各検出/追跡対象のバウンディングボックス座標 [x1, y1, x2, y2] 形式
    • xywh → [x_center, y_center, width, height] 形式
    • conf → 各検出の信頼度スコア
    • cls → 各検出のクラスラベル(整数インデックス)

結果を整形

返却された結果をそのまま扱うには少し面倒なので、今回のプログラムで使用するパラメータだけを抜き出して、結果を扱いやすく成形します。

成形された結果は、以下のデータの配列になります。

{
    "pos"  : {"x" : x, "y" : y}, <= 検出物体中心座標
    "box"  : {"x1" : x1, "y1" : y1, "x2" : x2, "y2" : y2}, <= BOX座標
    "conf" : 検出信頼度[confidence score](0.0 ~ 1.0),
    "cls"  : 検出クラス
}

配列の長さが0の場合は、指定したオブジェクトが検出されなかった、ということになります。

物体が検出された場合

指定した物体が検出された場合は、以下の関数をコールします。

  1. 音を鳴らす(play_wav)
  2. 検知枠を書き込んだJPEG画像を生成する(検知枠を書き込んだJPEG画像の生成)
  3. Push通知を送信(push)
# 物体を検知したか?
if len(res) > 0:

    # 音を鳴らす
    play_wav(WAVFILE_PATH)

    # 検知枠を書き込んだJPEG画像の生成
    frame = create_result_jpeg(img, res)

    # PUSH通知
    # エラーはここでキャッチしてそのまま処理を流す
    try:
        push(timestamp, frame, res)
    except Exception as e:
        print(str(e))    

音を鳴らす処理については、この後、もう少し詳しく説明します。

プレビュー用イメージの保存

最後にプレビュー用の結果画像を保存します。

# 結果確認用のプレビューイメージの保存
with open(PREVIEW_IMAGE_PATH, 'wb') as f:
    f.write(frame)

AIBOX OSのプレビュー表示についてはこちらを参照してください

実際に熊の写真で実行したプレビュー結果です。

音の再生について

実際に音を鳴らす関数は、_play_wav_threadスレッド関数です。

今回のプログラムでは、再生時間の短いWAVファイルを指定時間ループ再生させていますが、単純に再生をループさせると、出力される音がぶつ切りになってしまいます。

原因はいくつか考えられますが、最も大きいと思われる原因は、YOLOによる物体検知の処理負荷が高い為に、ループ再生の間隔がばらけてしまう(間が空いてしまうことがある)為だと思われます。

実際、音声が切れるタイミングが、YOLOの物体検知処理に入ったタイミングと一致します。

NCNN変換したYOLO11mモデル(標準的なもの)を使用していますが、Raspberry Pi 5で回すと、検知処理に約1秒前後かかります。

それを回避するために、YOLOの物体検知処理を行っている間(約1秒)に音の再生が終了してしまい、次のサウンドデータをサウンドデバイスに送る、という処理が実行されないように、出力するサウンドデータの長さを調整する必要があります。

今回のプログラムでは、サウンドデバイスに渡すデータの長さを調整できるように、sounddevice(PortAudio)を使用しています。

sounddeviceでは、サウンドデバイスに音声データを送る必要があると、コールバック関数をコールしてで再生する音声データを要求してきます。
このコールバック関数が呼び出される間隔(内部バッファが枯渇するタイミング)は、blocksizeパラメータで調整します。

blocksizeパラメータは、1回のコールバックで処理するサンプル数を表します。つまり、blocksize / samplerate 秒が、コールバックが呼び出される間隔、ということですので、それを読み込んだWAVファイルのsamplerateから計算して、blocksizeを決定します。

 # WAVファイル読み込み
data, fs = sf.read(wav_path, dtype='float32')

# モノラル対策:1次元配列を (N, 1) に変換
if data.ndim == 1:
    data = data[:, np.newaxis]

channels = data.shape[1] if data.ndim > 1 else 1
length = len(data)

# blocksize をサンプリングレートから計算(目的秒数分)
blocksize = int(fs * BLOCK_DURATION_SEC)
if blocksize < 64:
    blocksize = 64  # 最低値の保護
print(f"sample_rate={fs}, channels={channels}, wav_length_frames={length}, blocksize={blocksize}")

指定する秒数は外部変数 BLOCK_DURATION_SEC(2秒) で指定しています。

ここで計算されたblocksizeを引数に再生を開始します。

# 再生開始
with sd.OutputStream(
    samplerate=fs,
    channels=channels,
    dtype='float32',
    device=device_index,
    blocksize=blocksize, <== ここで指定
    callback=audio_callback
):
    while True:
        # 指定時間経過か、イベントがセットされるまでループ再生
        timestamp = int(datetime.now(tz=timezone.utc).timestamp())
        if timestamp >= stop_wav_time or stop_event.is_set():
            break
        time.sleep(0.1)

Discussion