📷

AWS IoT サービスと S3 署名付きURLを活用したエッジカメラの画像認識システムの実装

に公開

やりたいこと

ラズパイなどにカメラを取り付け、その結果を生成AIで分析させ、結果をラズパイ端末で受け取りたい。

Amazon API Gatewayを使った方法なども考えられるが、エッジからのpushの実装が難しい。また当然、認証されたデバイスとして通信は発生させたくない。MQTT通信で画像をやり取りする方法も、なくは無いが、ペイロードの上限は128 KBまでであり、これはハードリミットである。リッチな画像のやり取りには不向きと考えられる。S3に直接AWS認証情報を与えてS3にアップロードすることもできるが、セキュリティ上避けたい実装である。
ref: https://docs.aws.amazon.com/ja_jp/general/latest/gr/iot-core.html#message-broker-limits

そこで、Amazon S3の署名付きURLを使って画像データは直接pushし、署名付きURLや画像の認識結果などのテキストデータのみをMQTT通信でやり取りする方法を考える。このようにすることで、エッジとクラウド間の通信実装の煩雑さを避け、AWSクラウドのスケーラビリティを享受しつつ、コスト最適化した状態で生成AIを用いた画像認識の結果をエッジ端末で取得することが可能になる。

まずIoT カメラのデプロイにはAWS IoT Greengrass v2を使用する。
AWS IoT Core にはMQTT通信を行う機能があり、指定したトピックにメッセージを流せる。この機能はAWS Lambdaから呼び出すこともでき、またIoT rulesという機能を使うと、特定のトピックへのメッセージの受信をトリガーに指定したAWS Lambda関数を起動することができる。

システムの動作

  1. ラズパイで画像を撮影する
  2. 署名つきURLをリクエストする際のMQTTトピック(エッジ→クラウド)を呼び出す
  3. トピックへの受信をトリガーにAWS Lambda関数を起動し、署名付きURLを生成する
  4. 署名つきURLをラズパイに返すトピック(クラウド→エッジ)に署名付きURLを流す
  5. 署名付きURLを使ってラズパイはS3に画像データをアップロードする
  6. S3への画像データのアップロードをトリガーにして、Amazon Bedrockに画像データを入力し、結果を受け取り、像の認識結果を通知するトピック(クラウド→エッジ)AWS Lambda関数を起動する
  7. ラズパイ側で結果を受け取る

トピック

  • ラズパイが署名つきURLをリクエストする際のトピック(エッジ→クラウド)
    • "service/camera/request/presignedurl"
  • ラズパイが署名つきURLを受け取る際のトピック(クラウド→エッジ)
    • "service/camera/response/presignedurl"
  • 画像の認識結果を通知するトピック
    • "service/camera/imgresult"

署名付きURLを返却するLambda関数のコード

IoT rules の記述

下記のように設定する

SELECT * FROM 'service/camera/request/presignedurl'

Lambda関数は下記のように記述

import os, json, time, boto3, base64

BUCKET_NAME   = os.getenv("BUCKET_NAME", "xxxx")
OBJECT_PREFIX = os.getenv("OBJECT_PREFIX", "uploads/")
URL_TTL_SEC   = int(os.getenv("URL_TTL_SEC", "900"))
PUBLISH_TOPIC = os.getenv("PUBLISH_TOPIC", "service/camera/response/presignedurl")
IOT_ENDPOINT  = os.getenv("IOT_ENDPOINT","xxxx-ats.iot.{region}.amazonaws.com")

s3  = boto3.client("s3")
iot = boto3.client("iot-data", endpoint_url=f"https://{IOT_ENDPOINT}")

def _extract_body(event: dict) -> dict:
    if "thingName" in event:           # そのまま JSON
        return event
    raw = event.get("payload")         # Base64 or str
    if raw is None:
        raise ValueError("payload not found")
    if isinstance(raw, (bytes, bytearray)):
        raw = raw.decode()
    try:                               # Base64 判定
        return json.loads(base64.b64decode(raw))
    except Exception:
        return json.loads(raw)

def lambda_handler(event, context):
    # 1. 受信ペイロード取り出し
    body  = _extract_body(event)
    thing = body.get("thingName", "unknown")
    imgtk = body.get("image_token") 
    if not imgtk:
        raise ValueError("image_token missing")

    # 2. S3 キー生成
    ts  = time.strftime("%Y%m%dT%H%M%SZ", time.gmtime())
    key = f"{OBJECT_PREFIX}{thing}/{ts}_{imgtk}.jpg"

    # 3. presigned URL 作成
    url = s3.generate_presigned_url(
        ClientMethod="put_object",
        Params={"Bucket": BUCKET_NAME,
                "Key": key,
                "ContentType": "image/jpeg"},
        ExpiresIn=URL_TTL_SEC,
        HttpMethod="PUT"
    )

    # 4. Pi へ返送
    resp = {"image_token": imgtk, "url": url, "key": key, "method": "PUT"}
    iot.publish(topic=PUBLISH_TOPIC, qos=1,
                payload=json.dumps(resp).encode())
    return {"status": "OK", **resp}

S3からトリガーされ、画像認識をBedrockで行い、ラズパイに結果を返却するLambda関数のコード

import os, json, base64, boto3
from urllib.parse import unquote_plus
from pathlib import PurePosixPath

IOT_ENDPOINT   = os.getenv("IOT_ENDPOINT",
                           "xxxx-ats.iot.{region}.amazonaws.com")
MODEL_ID       = os.getenv("BEDROCK_MODEL_ID",
                           "us.amazon.nova-pro-v1:0")
IOT_TOPIC      = os.getenv("IOT_RESULT_TOPIC",
                           "service/camera/imgresult")
BEDROCK_REGION = os.getenv("AWS_REGION", "us-east-1")
PROMPT = os.getenv("PROMPT", 
"""
You are an image-inspection AI.
What is this?
""")

s3      = boto3.client("s3")
iot     = boto3.client("iot-data",
                       endpoint_url=f"https://{IOT_ENDPOINT}")
bedrock = boto3.client("bedrock-runtime", region_name=BEDROCK_REGION)

def _describe_image(img_bytes: bytes) -> str:
    img_b64 = base64.b64encode(img_bytes).decode()
    payload = {
        "schemaVersion": "messages-v1",
        "messages": [{
            "role": "user",
            "content": [
                {"image": {"format": "jpeg","source":{"bytes": img_b64}}},
                {"text": PROMPT}
            ]}],
        "inferenceConfig": {"maxTokens": 256, "temperature": 0.2}
    }
    resp = bedrock.invoke_model(
        modelId     = MODEL_ID,
        body        = json.dumps(payload),
        contentType = "application/json",
        accept      = "application/json"
    )
    return json.loads(resp["body"].read())["output"]["message"]["content"][0]["text"]

def _extract_image_token(key: str) -> str | None:
    """
    uploads/raspi/20250615T121528Z_<uuid>.jpg から uuid を取り出す
    """
    name = PurePosixPath(key).name               # ファイル名のみ
    stem = name.rsplit(".", 1)[0]                # 拡張子除去
    parts = stem.split("_")
    return parts[-1] if len(parts) >= 2 else None

def lambda_handler(event, context):
    for rec in event["Records"]:
        bucket = rec["s3"]["bucket"]["name"]
        key    = unquote_plus(rec["s3"]["object"]["key"])

        try:
            img = s3.get_object(Bucket=bucket, Key=key)["Body"].read()

            # Bedrock 推論
            while True:
                res = _describe_image(img).strip()
                if res.lower() in ("true", "false"):
                    break
                print("invalid result:", res)

            # image_token をキーから抽出
            img_tok = _extract_image_token(key)
            if not img_tok:
                raise ValueError("image_token not found in key")

            # IoT Core Publish
            msg = {
                "bucket": bucket,
                "key":    key,
                "image_token": img_tok,
                "bedrock_result": res.lower()
            }
            iot.publish(topic=IOT_TOPIC, qos=1,
                        payload=json.dumps(msg).encode())

        except Exception as e:
            err = {"bucket": bucket, "key": key,
                   "status": "ERROR", "reason": str(e)}
            iot.publish(topic=IOT_TOPIC, qos=0,
                        payload=json.dumps(err).encode())
            raise

ラズパイ側のコード

基本的には次の動作をすればよい

  • スクリプトが起動されると、接続されたUSBカメラを起動して写真撮影を行う
  • その後、service/camera/request/presignedurlトピックにメッセージを送る
  • トピックservice/camera/response/presignedurlをポーリングする。
  • 署名付きURLを使って画像をS3にアップする。
  • トピックservice/camera/imgresultをポーリングする
  • Amazon Bedrockで画像認識した結果がトピックservice/camera/imgresultに返されるため、結果をログに残す。

例えば、下記のようなコードで動くと思われる。
サンプルコードはAWS Greengrass v2を使ってエッジ端末にコードをデプロイする前提である。AWS IoT Core のみを使用する場合は証明書のインポートなども必要となる。
カメラの環境等に合わせて適宜調節が必要。

import os, json, time, uuid, logging, threading
import cv2
import requests
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import QOS

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

# ==== 環境変数(必要に応じて上書き)====
THING_NAME      = os.getenv("THING_NAME") or os.getenv("AWS_IOT_THING_NAME","raspi")
REQ_TOPIC       = os.getenv("REQUEST_TOPIC","service/camera/request/presignedurl")
RESP_TOPIC      = os.getenv("RESPONSE_TOPIC","service/camera/response/presignedurl")
RESULT_TOPIC    = os.getenv("RESULT_TOPIC","service/camera/imgresult")
CAM_INDEX       = int(os.getenv("CAMERA_INDEX","0"))
JPEG_QUALITY    = int(os.getenv("JPEG_QUALITY","90"))
RESP_TIMEOUT_S  = int(os.getenv("RESP_TIMEOUT_SEC","60"))   # presigned URL 取得の待ち
RESULT_TIMEOUT_S= int(os.getenv("RESULT_TIMEOUT_SEC","300"))# Bedrock 結果の待ち

# ==== ユーティリティ ====
def capture_jpeg(cam_index=0, quality=90) -> bytes:
    cap = cv2.VideoCapture(cam_index)
    if not cap.isOpened():
        raise RuntimeError("USBカメラをオープンできませんでした")
    time.sleep(0.4)  # 起動安定待ち(短め)
    ok, frame = cap.read()
    cap.release()
    if not ok:
        raise RuntimeError("フレーム取得に失敗しました")
    ok, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
    if not ok:
        raise RuntimeError("JPEGエンコードに失敗しました")
    return buf.tobytes()

def http_put(url: str, data: bytes, method="PUT"):
    if requests:
        r = requests.request(method, url, data=data,
                             headers={"Content-Type":"image/jpeg"}, timeout=60)
        return r.status_code, r.text[:200]
    # fallback: urllib
    req = Request(url, data=data, method=method)
    req.add_header("Content-Type","image/jpeg")
    with urlopen(req, timeout=60) as resp:
        # urllib は 200/204 等を返す
        return getattr(resp, "status", 200), ""

# ==== メイン ====
def main():
    # 起動→撮影
    img = capture_jpeg(CAM_INDEX, JPEG_QUALITY)
    image_token = uuid.uuid4().hex
    logging.info("captured image: %d bytes, token=%s", len(img), image_token)

    ipc = GreengrassCoreIPCClientV2()

    # presigned URL 要求を購読→受信待ち(image_token一致のみ拾う)
    presigned_evt = threading.Event()
    presigned_resp = {}

    def on_presigned(event):
        try:
            payload = event.message.payload.decode()
            msg = json.loads(payload)
            if msg.get("image_token") == image_token:
                presigned_resp.update(msg)
                presigned_evt.set()
                logging.info("presigned URL 受信")
        except Exception as e:
            logging.warning("presigned 受信処理エラー: %s", e)

    sub1 = ipc.subscribe_to_iot_core(topic_name=RESP_TOPIC, qos=QOS.AT_LEAST_ONCE,
                                     on_stream_event=on_presigned)

    # presigned URL 要求を Publish
    req_payload = {"thingName": THING_NAME, "image_token": image_token}
    ipc.publish_to_iot_core(topic_name=REQ_TOPIC, qos=QOS.AT_LEAST_ONCE,
                            payload=json.dumps(req_payload).encode())
    logging.info("presigned URL を要求: %s", REQ_TOPIC)

    # presigned URL を待つ
    if not presigned_evt.wait(timeout=RESP_TIMEOUT_S):
        sub1.close()
        raise TimeoutError("presigned URL 応答なし (timeout)")

    sub1.close()
    url = presigned_resp.get("url")
    method = presigned_resp.get("method","PUT")
    if not url:
        raise RuntimeError("presigned URL が応答に含まれていません")

    # 画像を S3 に PUT
    status, body = http_put(url, img, method=method)
    if status not in (200, 201, 204):
        raise RuntimeError(f"S3アップロード失敗 status={status} body={body}")
    logging.info("S3 にアップロード完了 (HTTP %s)", status)

    # 画像認識結果トピックを購読→待機
    result_evt = threading.Event()
    result_payload = {}

    def on_result(event):
        try:
            payload = event.message.payload.decode()
            msg = json.loads(payload)
            if msg.get("image_token") == image_token:
                result_payload.update(msg)
                result_evt.set()
                logging.info("結果受信: %s", msg.get("bedrock_result"))
        except Exception as e:
            logging.warning("結果受信処理エラー: %s", e)

    sub2 = ipc.subscribe_to_iot_core(topic_name=RESULT_TOPIC, qos=QOS.AT_LEAST_ONCE,
                                     on_stream_event=on_result)

    if not result_evt.wait(timeout=RESULT_TIMEOUT_S):
        sub2.close()
        raise TimeoutError("画像認識結果の受信なし (timeout)")

    sub2.close()

    # ログ出力
    logging.info("最終結果: token=%s bucket=%s key=%s bedrock_result=%s",
                 image_token,
                 result_payload.get("bucket"),
                 result_payload.get("key"),
                 result_payload.get("bedrock_result"))

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logging.exception("致命的エラー: %s", e)
        raise

Discussion