🤖

ラズパイ+OpenCV+Streamlitで顔認証アプリをつくってみた

2023/11/17に公開

はじめに

会社のハッカソンイベントで、顔認証アプリをつくってみたのでその記録です。
具体的にやったこととしては以下になります。

  • RaspberryPi4+公式CameraModule3で撮影してみる
  • カメラで撮った映像をOpenCVで顔認証処理してみる
  • Streamlitで上記をWebアプリ化してみる

一緒につくったメンバー

開発環境

  • RaspberryPi4
  • Bullseye 64bit
  • Python 3.9.2
  • CameraModule3

ラズパイとカメラで撮影

こちら非常に苦労しまして長くなったのでこちらの記事にまとめています。

撮影画像をOpenCVで顔認証処理

パッケージのインストール

picamera2とOpenCVをインストールします。

pip install picamera2
pip install opencv-python

学習済みモデルのダウンロード

顔画像の保存

画像から顔を検出して切り出して顔画像として保存します。

  • ソースコード
generate_aligned_faces.py
import os
import argparse
import cv2

def main():
    # 引数をパースする
    parser = argparse.ArgumentParser("generate aligned face images from an image")
    parser.add_argument("image", help="input image file path (./image.jpg)")
    args = parser.parse_args()

    # 引数から画像ファイルのパスを取得
    path = args.image
    directory = os.path.dirname(args.image)
    if not directory:
        directory = os.path.dirname(__file__)
        path = os.path.join(directory, args.image)

    # 画像を開く
    image = cv2.imread(path)
    if image is None:
        exit()

    # 画像が3チャンネル以外の場合は3チャンネルに変換する
    channels = 1 if len(image.shape) == 2 else image.shape[2]
    if channels == 1:
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
    if channels == 4:
        image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)

    # モデルを読み込む
    weights = "onnx/yunet_n_640_640.onnx"
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = "onnx/face_recognizer_fast.onnx"
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    # 入力サイズを指定する
    height, width, _ = image.shape
    face_detector.setInputSize((width, height))

    # 顔を検出する
    _, faces = face_detector.detect(image)

    # 検出された顔を切り抜く
    aligned_faces = []
    if faces is not None:
        for face in faces:
            aligned_face = face_recognizer.alignCrop(image, face)
            aligned_faces.append(aligned_face)

    # 画像を表示、保存する
    for i, aligned_face in enumerate(aligned_faces):
        cv2.imshow("aligned_face {:03}".format(i + 1), aligned_face)
        cv2.imwrite(os.path.join(directory, "face{:03}.jpg".format(i + 1)), aligned_face)

    cv2.waitKey(0)
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()
  • 実行コマンド
python generate_aligned_faces.py input.jpg
  • 入力画像(input.jpg)

  • 実行結果

face001.jpg face002.jpg

特徴を抽出して辞書として保存

顔画像から特徴を抽出して特徴辞書として保存します。

  • ソースコード
generate_feature_dictionary.py
import os
import sys
import argparse
import numpy as np
import cv2

def main():
    # 引数をパースする
    parser = argparse.ArgumentParser("generate face feature dictionary from an face image")
    parser.add_argument("image", help="input face image file path (./<face名>.jpg)")
    args = parser.parse_args()
    print(args.image)

    # 引数から画像ファイルのパスを取得
    path = args.image
    directory = os.path.dirname(args.image)
    if not directory:
        directory = os.path.dirname(__file__)
        path = os.path.join(directory, args.image)

    # 画像を開く
    image = cv2.imread(path)
    if image is None:
        exit()

    # 画像が3チャンネル以外の場合は3チャンネルに変換する
    channels = 1 if len(image.shape) == 2 else image.shape[2]
    if channels == 1:
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
    if channels == 4:
        image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)

    # モデルを読み込む
    weights = "onnx/face_recognizer_fast.onnx"
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    # 特徴を抽出する
    face_feature = face_recognizer.feature(image)
    print(face_feature)
    print(type(face_feature))

    # 特徴を保存する
    basename = os.path.splitext(os.path.basename(args.image))[0]
    dictionary = os.path.join(directory, basename)
    np.save(dictionary, face_feature)

if __name__ == '__main__':
    main()
  • 実行コマンド
python generate_feature_dictionary.py face001.jpg
  • 入力画像(1枚ごとにソースコードを実行)
face001.jpg face002.jpg
  • 実行結果
    face001.npy及びface002.npyが出力されます。

顔認証処理

撮影画像から顔を検出し特徴を抽出、特徴辞書と比較して顔認証します。

  • ソースコード
camface_recognizer.py
import cv2
import os
import glob
import numpy as np
from picamera2 import Picamera2
from libcamera import controls

COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128

# 特徴を辞書と比較してマッチしたユーザーとスコアを返す関数
def match(recognizer, feature1, dictionary):
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            return True, (user_id, score)
    return False, ("", 0.0)

def main():
    camera = Picamera2()
    camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)}))
    camera.start()
    camera.set_controls({'AfMode': controls.AfModeEnum.Continuous})

    # 特徴を読み込む
    dictionary = []
    files = glob.glob(os.path.join("sample_data", "*.npy"))
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))
    
    # モデルを読み込む
    weights = "onnx/yunet_n_640_640.onnx"
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = "onnx/face_recognizer_fast.onnx"
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    while True:
        image = camera.capture_array()

        # 画像が3チャンネル以外の場合は3チャンネルに変換する
        channels = 1 if len(image.shape) == 2 else image.shape[2]
        if channels == 1:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
        if channels == 4:
            image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
        
        # 入力サイズを指定する
        height, width, _ = image.shape
        face_detector.setInputSize((width, height))

        # 顔を検出する
        result, faces = face_detector.detect(image)
        faces = faces if faces is not None else []

        for face in faces:
            # 顔を切り抜き特徴を抽出する
            aligned_face = face_recognizer.alignCrop(image, face)
            feature = face_recognizer.feature(aligned_face)

            # 辞書とマッチングする
            result, user = match(face_recognizer, feature, dictionary)

            # 顔のバウンディングボックスを描画する
            box = list(map(int, face[:4]))
            color = (0, 255, 0) if result else (0, 0, 255)
            thickness = 2
            cv2.rectangle(image, box, color, thickness, cv2.LINE_AA)

            # 認識の結果を描画する
            id, score = user if result else ("unknown", 0.0)
            text = "{0} ({1:.2f})".format(id, score)
            position = (box[0], box[1] - 10)
            font = cv2.FONT_HERSHEY_SIMPLEX
            scale = 0.6
            cv2.putText(image, text, position, font, scale, color, thickness, cv2.LINE_AA)

        # 画像を表示する
        cv2.imshow("face recognition", image)
        key = cv2.waitKey(1)
        if key == ord('q'):
            break

    camera.close()
    cv2.destroyAllWindows()
    
if __name__ == '__main__':
    main()
  • 実行コマンド
python camface_recognizer.py
  • 実行結果

StreamlitでWebアプリ化

Streamlitについて

Streamlitは、PythonでWebアプリを作成できるオープンソースのフレームワークです。UIコンポーネントが豊富にあり、短いコードで直感的に書けるため、WebUI付きのアプリを手軽に作ることができます。デモ用アプリ等の小規模な開発に向いています。

パッケージのインストール

Streamlitに関連するパッケージをインストールします。

pip install streamlit
pip install streamlit-webrtc
pip install streamlit_server_state

撮影映像をWebアプリの画面に表示

  • ソースコード
run_streamlit.py
import cv2
import os
import glob
import numpy as np
from picamera2 import Picamera2
from libcamera import controls
import streamlit as st
from streamlit_server_state import server_state, server_state_lock

# ラズパイカメラ画像の準備
# セッションに関わらずサーバー内でカメラを共有するために、server_stateを使用する
def get_camera() -> Picamera2:
    with server_state_lock["camera"]:
        if "camera" not in server_state:
            camera = Picamera2()
            camera.configure(camera.create_preview_configuration(main={
                "format": 'XRGB8888',
                "size": (640, 480)
            }))
            camera.start()
            camera.set_controls({'AfMode': controls.AfModeEnum.Continuous})
            server_state.camera = camera
    return server_state.camera

def main():

    # アプリにタイトルを表示
    st.markdown("# Face Recognition Application")
    
    # カメラを読み込む
    camera = get_camera()

    try:
        image_loc = st.empty()
        while True:
            image = camera.capture_array()

            # 画像が3チャンネル以外の場合は3チャンネルに変換する
            channels = 1 if len(image.shape) == 2 else image.shape[2]
            if channels == 1:
                image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
            if channels == 4:
                image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
            
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            image_loc.image(image)
    except KeyboardInterrupt:
        print('app stop!!')
    except Exception as e:
        st.markdown("### エラーです。")
        st.markdown(f'{e}')

    camera.close()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()
  • 実行コマンド
python run_streamlit.py

顔認証処理を加える

  • ソースコード
run_streamlit.py
import cv2
import os
import glob
import numpy as np
from picamera2 import Picamera2
from libcamera import controls
import streamlit as st
from streamlit_server_state import server_state, server_state_lock

COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128

# 特徴量のファイルの絶対パス
GLOB_PATH = '*********/sample_data/*.npy'
FACE_DETECTOR_WEIGHTS = '*********/onnx/yunet_n_640_640.onnx'
FACE_RECOGNIZER_WEIGHTS = '*********/onnx/face_recognizer_fast.onnx'

# 特徴を辞書と比較してマッチしたユーザーとスコアを返す関数
def match(recognizer, feature1, dictionary) -> tuple[bool,set[str,float]]:
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            return True, (user_id, score)
    return False, ("", 0.0)

# ラズパイカメラ画像の準備
# セッションに関わらずサーバー内でカメラを共有するために、server_stateを使用する
def get_camera() -> Picamera2:
    with server_state_lock["camera"]:
        if "camera" not in server_state:
            camera = Picamera2()
            camera.configure(camera.create_preview_configuration(main={
                "format": 'XRGB8888',
                "size": (640, 480)
            }))
            camera.start()
            camera.set_controls({'AfMode': controls.AfModeEnum.Continuous})
            server_state.camera = camera
    return server_state.camera

def recognize_face(face_recognizer, dictionary, image, face) -> np.ndarray:
    # 顔を切り抜き特徴を抽出する
    aligned_face = face_recognizer.alignCrop(image, face)
    feature = face_recognizer.feature(aligned_face)

    # 辞書とマッチングする
    result, user = match(face_recognizer, feature, dictionary)

    # 顔のバウンディングボックスを描画する
    box = list(map(int, face[:4]))
    color = (0, 255, 0) if result else (0, 0, 255)
    thickness = 2
    cv2.rectangle(image, box, color, thickness, cv2.LINE_AA)

    # 認識の結果を描画する
    id, score = user if result else ("unknown", 0.0)
    text = "{0} ({1:.2f})".format(id, score)
    position = (box[0], box[1] - 10)
    font = cv2.FONT_HERSHEY_SIMPLEX
    scale = 0.6
    cv2.putText(image, text, position, font, scale, color, thickness, cv2.LINE_AA)

    return image

def main():

    # アプリにタイトルを表示
    st.markdown("# Face Recognition Application")

    # 特徴を読み込む
    dictionary = []
    files = glob.glob(GLOB_PATH)
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))

    # モデルを読み込む
    face_detector = cv2.FaceDetectorYN_create(FACE_DETECTOR_WEIGHTS, "", (0, 0))
    face_recognizer = cv2.FaceRecognizerSF_create(FACE_RECOGNIZER_WEIGHTS, "")
    
    # カメラを読み込む
    camera = get_camera()

    try:
        image_loc = st.empty()
        while True:
            image = camera.capture_array()

            # 画像が3チャンネル以外の場合は3チャンネルに変換する
            channels = 1 if len(image.shape) == 2 else image.shape[2]
            if channels == 1:
                image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
            if channels == 4:
                image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
            
            # 入力サイズを指定する
            height, width, _ = image.shape
            face_detector.setInputSize((width, height))

            # 顔を検出する
            result, faces = face_detector.detect(image)
            faces = faces if faces is not None else []

            for face in faces:
                image = recognize_face(face_recognizer, dictionary,image, face)
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            image_loc.image(image)
    except KeyboardInterrupt:
        print('app stop!!')
    except Exception as e:
        st.markdown("### エラーです。")
        st.markdown(f'{e}')

    camera.close()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()
  • 実行コマンド
python run_streamlit.py
  • 実行結果

Streamlitでつまづいたこと

Streamlitは、ユーザーがアプリにアクセスする度にスクリプトを最初から実行する仕様になっています。そのため、あるユーザーがアプリでカメラ映像を見ている間に他ユーザーがアクセスすると、カメラの初期設定処理が再実行されてサーバーが停止してしまうことが発生しました。これは、streamlit_server_stateライブラリを使用して、セッションに関わらずサーバー内でカメラを共有することで解決できました。

AWS Rekognition Videoとの比較

Amazon Rekognition Video

Amazon Rekognition ストリーミング動画イベントは、新規または既存の Kinesis Video Streams からの動画を処理します。Rekognition は、動画分析を開始するための通知を送信した場合にのみ、Kinesis Video ストリームの処理を開始し、イベントごとに最大 120 秒の動画を分析できます。Amazon Rekognition によって処理された動画の量に対してのみ支払います。注: Amazon Kinesis Video Streams サービスは別途お支払いいただきます。

  • Amazon Rekognition Imageだとそこまで高額ではないが、Amazon Rekognition Videoは思ったより費用が高い。
  • 動画をストリーミングするためにAmazon Kinesis Video Streamsを使用する場合は、別途サービスの料金が発生する。
    • Amazon Kinesis Video Streamsはかなり安めの設定のため、費用はあまりかからない印象。詳細はdocs参照。
  • 動画での顔検索:0.15USD / 1m
    • ≒ 1300円 / 1h
    • ≒ 31,200円 / 24h

ラズパイ

  • OpenCV等を使用すれば基本的には無料。
  • ラズパイ本体などのハードウェア代は導入時に発生。(1万前後)
    • ハードウェアありきなので、故障などの追加費用や運用費等は発生する可能性があり。
    • ただ、Amazon Rekognition Videoでも外部カメラは用意する必要があるため、どっちにしろハードウェアの運用方法は検討必要。
      モデルの作成をどうするか、どこまでの精度を求めるか、などの要因もありますが、値段だけを考えると圧倒的にラズパイでの運用がコストが低そうです。

参考記事

NCDCエンジニアブログ

Discussion