🕌

AWS IoT Core触ってみる(第一回: デバイスシャドウ)

に公開

概要

AWS IoT Coreのワークショップをやってみました。ここでは、実際にラズパイなどの物理デバイスに接続するのではなく、仮想デバイスを立ち上げて接続します。
https://catalog.workshops.aws/aws-iot-immersionday-workshop/en-US/aws-iot-core/device-sdk-v2

その際に、lab5.1でシャドウを扱うトピックがあります。そこではAWSのコンソールからデバイスシャドウを編集することで状態を変更して異なる出力が出てくるようになっていたのですが、外部のpythonスクリプトを実行してステートを更新して出力を変えたりしたいと思ったので、そちらの実装をしてみました。

注意:今回はIoT Coreにおけるデバイスシャドウに関する記事なので、SubscribeやPublish, TopicなどのMQTTの基礎部分については省略します。

MQTTから学ぶ必要がある場合は、下記の記事などがおすすめです。
https://zenn.dev/ohke/articles/c98c8fa1e5e392

デバイスシャドウとは

IoTの「デバイスシャドウ(Device Shadow)」は、簡潔に言うとクラウド上にデバイスの状態をクラウドに「キャッシュ」しておく仕組みです。これにより、デバイスがオフラインの時でも、クラウドが代わりに状態を管理/保持してくれます。

例えば、以下の状況を想定します。

  • デバイス(例:エアコン)が オフライン
  • クラウド側で「電源ONにしたい」という要求が来た

この時、下記の手順でデバイスが何かの都合でオフラインだとしてもクラウドで望む状態を保存できるため、デバイスがオンラインになったら差分が更新されて反映されるようになります。

① クラウドが Shadow の desired を更新
    {
     "state": {
       "desired": {
         "power": "ON"
       }
     }
    }

② デバイスがオフラインでもクラウド上の Shadow に desired は保存される
こうすると、差分がdeltaとして反映される
    {
      "state": {
        "desired": {
          "MOTOR": "ON"
        },
        "reported": {
          "MOTOR": "OFF"
        },
        "delta": {
          "MOTOR": "ON"
        }
      }
    }

③ デバイスが後でオンラインになったとき
   → delta(差分)が通知されて、「あ、ONにしてほしいのか」と気づく
   → 実行後、reported を更新して反映
    {
      "state": {
        "desired": {
          "MOTOR": "ON"
        },
        "reported": {
          "MOTOR": "ON"
        }
      }
    }

デバイス側の読み書きについては、下記のように整理することができます。デバイスシャドウ部分をソースコードで実装する際にも、下記の手順に準拠します(本記事ではshadow.pyに相当します)。

  • 読み取り → クラウド上のシャドウ(状態レプリカ)から取得するだけ

    • delta を受け取って実行します
  • 書き込み → desired を更新してクラウド上に反映するだけ

    • 完了後に reported を更新します
ユーザー/クラウド            シャドウストア(クラウド)                     デバイス
      │                             │                                  │
      │ - desired update request -> │                                  │
      │                             │ -- delta notification (MQTT) --> │
      │                             │                                  │
      │                             │ <--- reported update (MQTT) ---- │
      │                             │                                  │


Device Shadowを使うことで、デバイスの状態管理を「非同期」「双方向」「耐障害性あり」にできます

メリットをまとめると、下記のようになります。

  • デバイスの状態をいつでも参照できる

  • オフラインでも事前に指示を出せる(次回復帰時に反映)

  • 状態の監査・履歴もクラウドで保持しやすい

逆にデメリットとしては、デバイスが非常に長期間オフラインだと、desired 状態が古くなって矛盾する可能性があるため、「デバイス最終オンライン時間」や「タイムアウト」も記録して整合性を取っています。

設定

IoT Coreのセットアップ

ログの設定

左のツールバーの「設定」を開くと、「ログ」という項目があるので、「ログの管理」を選択します。

ロールを作成して、「情報提供」を選択して更新します。

ポリシーの作成

次に、左のツールバーの「セキュリティ」→「ポリシー」を選択して、「ポリシーを作成」をクリックします。

そして、下図のようにポリシードキュメントのポリシーアクションとポリシーリソースを「*」にして作成をしてください。

モノを作成

最後にモノを作成します。左サイドメニューから「すべてのデバイス」→「モノ」を選択して、「モノを作成」をクリックしてください。

そして、ステップ1の「モノのプロパティを指定」の項目で名前を適当に作成して、「Device Shadow」の項目で「名前のないシャドウ」を選択します。

名前のないシャドウを選択すると、下記のトピックにpublishする際にpayload(json形式)を指定すればデバイスシャドウの状態を更新することができます。

$aws/things/YourThingName/shadow/update

一方で、名前付きシャドウを指定した場合は下記のようなトピックにアクセスする必要があります。このオプションは1つのThingに複数のシャドウを所持したい場合は名前管理したい場合に有効です。

$aws/things/YourThingName/shadow/name/motor/update
$aws/things/YourThingName/shadow/name/display/update

次に、step2のデバイス証明書を設定するところでは、「新しい証明書を自動作成(推奨)」を選択します。

最後のstep3では、証明書に先ほど作成したポリシーをアタッチして完成です。そうすると、証明書とキーをダウンロードというポップアップが表示されるので、下記の三つをダウンロードしてください。

  • デバイス証明書
    • certificate.pem として保存
  • パブリックキーファイル
    • 今回は使用しないが、今後使う場合は保存しておく
  • プライベートキーファイル
    • privateKey.pem として保存
  • ルートCA証明書(CA1, CA3)
    • AmazonRootCA1.pem として保存

ソースコードの作成

lab5.1のハンズオンと同様に、温度メトリックを送るデバイスをClassic Shadowを作ってデバイスの状態を変更し、振動メトリックも送るようにします。

ハンズオンではコンソールからデバイスシャドウの状態を更新することで実現していましたが、本記事ではpythonコードを実行してデバイスシャドウの状態を変更します。

ディレクトリ構造は下記のとおりです。

.
├── AmazonRootCA1.pem
├── certificate.pem
├── privateKey.pem
├── shadow.py
└── update.py

デバイス側の実装

AWS IoT Device SDK v2 for Python を使ってデバイスシャドウ(Classic Shadow)を操作する受信側を実装します。
lab5.1のソースコードをベースに実装しました。

shadow.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
from awsiot import iotshadow
import sys
import time
from random import randint
import json

# Setup our MQTT client and security certificates
# Make sure your certificate names match what you downloaded from AWS IoT
target_ep = '***************.iot.<region>.amazonaws.com'
thing_name = '***************'
cert_filepath = './certificate.pem'
private_key_filepath = './privateKey.pem'
ca_filepath = './AmazonRootCA1.pem'

# Our motor is not currently running.
MOTOR_STATUS = "OFF"
pub_topic_1 = 'data/temperature'
pub_topic_2 = 'data/vibration'

# Callback when connection is accidentally lost
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. Error: {}".format(error))

# Callback when an interrupted connection is re-established
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if (return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present):
        print("Session did not persist Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread
        # Evaluate result with a callback instead
        resubscribe_future.add_done_callback(on_resubscribe_complete)

# Callback to resubscribe to previously subscribed topics upon lost session
def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))

# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

proxy_options = None

mqtt_connection = mqtt_connection_builder.mtls_from_path(
    endpoint=target_ep,
    port=8883,
    cert_filepath=cert_filepath,
    pri_key_filepath=private_key_filepath,
    client_bootstrap=client_bootstrap,
    ca_filepath=ca_filepath,
    on_connection_interrupted=on_connection_interrupted,
    on_connection_resumed=on_connection_resumed,
    client_id=thing_name,
    clean_session=False,
    keep_alive_secs=30,
    http_proxy_options=proxy_options
)

print("Connecting to {} with client ID '{}'...".format(target_ep, thing_name))

# Connect to the gateway
while True:
    try:
        connect_future = mqtt_connection.connect()
        # Future.result() waits until a result is available
        connect_result = connect_future.result()
    except:
        print("Connection to IoT Core failed... retrying in 5s.")
        time.sleep(5)
        continue
    else:
        print(connect_result)
        print("Connected!")
        break

# Set up the Classic Shadow handler
shadowClient = iotshadow.IotShadowClient(mqtt_connection)

# Function to update the Classic Shadow
def updateDeviceShadow():
    global shadowClient
    global MOTOR_STATUS
    
    # Set the Classic Shadow with the current motor status and check if it was successful
    # print("Updating shadow with reported motor status")
    payload = {"MOTOR": MOTOR_STATUS}
    shadowMessage = iotshadow.ShadowState(reported=payload)
    update_shadow_request = iotshadow.UpdateShadowRequest(state=shadowMessage, thing_name=thing_name)
    update_shadow_future = shadowClient.publish_update_shadow(request=update_shadow_request, qos=mqtt.QoS.AT_LEAST_ONCE)
    update_shadow_future.add_done_callback(on_classic_shadow_update_complete)

    shadow_delta_updated_subscription = iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=thing_name)
    shadowClient.subscribe_to_shadow_delta_updated_events(
        request=shadow_delta_updated_subscription,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_classic_shadow_update_event
    )

# Callback to update the Classic Shadow
def on_classic_shadow_update_complete(update_shadow_future):
    update_shadow_result = update_shadow_future.result()

# Callback to get the Classic Shadow delta changes
# Add your custom logic to react to these delta changes, like turning ON the motor
def on_classic_shadow_update_event(shadowDeltaUpdatedEvent):
    try:
        desired = shadowDeltaUpdatedEvent.state
        print("🔔 Delta received:", desired)

        if "MOTOR" in desired:
            global MOTOR_STATUS
            MOTOR_STATUS = desired["MOTOR"]
            print(f"✅ MOTOR_STATUS changed to {MOTOR_STATUS}")
            updateDeviceShadow()
        else:
            print("⚠️ No MOTOR key in desired state")

    except Exception as e:
        print(f"❌ Exception in delta handler: {e}")


def publishMessage(message, topic):
    message_json = json.dumps(message)
    publish_future, _ = mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE
    )
    publish_result = publish_future.result()
    return publish_result

# This sends a random temperature message to the topic pub_topic_1
# and random vibration message to the topic pub_topic_2, if the motor is ON
def send():
    temp = randint(0, 100)
    message = {
        'temp': temp,
        'unit': 'F'
    }
    publish_result = publishMessage(message, topic=pub_topic_1)    
    print("Temperature Message Published")

    # Only send motor vibration data if the motor is on.
    if MOTOR_STATUS == "ON":
        vibration = randint(-500, 500)
        message = {
            'vibration' : vibration
        }
        publish_result = publishMessage(message, topic=pub_topic_2)
        print ("Motor is running, Vibration Message Published")

# Listen for delta changes
shadow_delta_updated_subscription = iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=thing_name)
shadowClient.subscribe_to_shadow_delta_updated_events(
    request=shadow_delta_updated_subscription,
    qos=mqtt.QoS.AT_LEAST_ONCE,
    callback=on_classic_shadow_update_event
)

status = None
while True:
    mode = updateDeviceShadow()
    send()
    time.sleep(5)
    status = MOTOR_STATUS

#To check and see if your message was published to the message broker go to the MQTT Client and subscribe to the iot topic and you should see your JSON Payload

主な変更点としては、mqtt_connection_builder.mtls_from_path において、clean_sessionをFalseにすることで、再接続時にサーバ側で購読が維持されます。

また、異なるpythonファイルからのデバイスシャドウの状態変更に対応できるように、While文の中にupdateDeviceShadow()を入れています。

そして、on_classic_shadow_update_event 関数を作成して、デバイスシャドウのdelta変更分を取得して更新できるようにしています。

# Callback to get the Classic Shadow delta changes
# Add your custom logic to react to these delta changes, like turning ON the motor
def on_classic_shadow_update_event(shadowDeltaUpdatedEvent):
    try:
        desired = shadowDeltaUpdatedEvent.state
        print("🔔 Delta received:", desired)

        if "MOTOR" in desired:
            global MOTOR_STATUS
            MOTOR_STATUS = desired["MOTOR"]
            print(f"✅ MOTOR_STATUS changed to {MOTOR_STATUS}")
            updateDeviceShadow()
        else:
            print("⚠️ No MOTOR key in desired state")

    except Exception as e:
        print(f"❌ Exception in delta handler: {e}")

外部から状態をアップデートする

ここでは、mqtt_connection.publish を実施する際にpayloadを更新することによって、shadow.pyの表示結果を変更させています。$aws/things/{thing_name}/shadow/update のトピックに対してpublishを実行しています。

また、今回はawsiotsdkを使いましたが、pahoなどのライブラリでも同様の実装が可能です。好きな方で実装するのが良いと思います。

update.py
import json
import sys
import time

from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder

# 証明書とエンドポイントのパス
endpoint = '****************.iot.<region>.amazonaws.com'  # AWS IoT Core エンドポイント
client_id = "****************"  # Thing名と一致する必要はないがユニークなID
thing_name = "****************"

cert_filepath = './certificate.pem'
key_filepath = './privateKey.pem'
ca_filepath = './AmazonRootCA1.pem'

event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

mqtt_connection = mqtt_connection_builder.mtls_from_path(
    endpoint=endpoint,
    cert_filepath=cert_filepath,
    pri_key_filepath=key_filepath,
    client_bootstrap=client_bootstrap,
    ca_filepath=ca_filepath,
    client_id=client_id,
    clean_session=False,
    keep_alive_secs=30,
    port=8883,
)

print(f"Connecting to {endpoint} with client ID '{client_id}'...")
connect_future = mqtt_connection.connect()
connect_future.result()
print("Connected!")

mqtt_connection.subscribe(
    f"$aws/things/{thing_name}/shadow/update/accepted",
    mqtt.QoS.AT_LEAST_ONCE,
    lambda topic, payload, **kwargs: print("✅ accepted", payload.decode())
)

mqtt_connection.subscribe(
    f"$aws/things/{thing_name}/shadow/update/rejected",
    mqtt.QoS.AT_LEAST_ONCE,
    lambda topic, payload, **kwargs: print("✅ rejected", payload.decode())
)

# シャドウの desired ステートを更新
payload = {
    "state": {
        "desired": {
            "MOTOR": "ON",
        }
    }
}
topic = f"$aws/things/{thing_name}/shadow/update"
print(f"Publishing to topic '{topic}'...")

# ★ 完了を待つ
publish_future, _ = mqtt_connection.publish(
    topic=topic,
    payload=json.dumps(payload),
    qos=mqtt.QoS.AT_LEAST_ONCE
)
publish_future.result()

time.sleep(5)

disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")

Usage

  • shadow.pyを実行して、メッセージをpublishし続ける
  • update.pyを実行して、デバイスシャドウを更新する
  • shadow.pyの出力結果が変更されることを確認する

実行結果

$ python shadow.py
# 接続されたら「Temperature Message Published」が5秒おきに表示される
Connecting to ************.iot.ap-northeast-1.amazonaws.com with client ID '************'...
{'session_present': True}
Connected!
Temperature Message Published
Temperature Message Published

# update.pyを実行してpayloadを"MOTOR": "ON"でpublishすると、「Motor is running, Vibration Message Published」も表示されて、temperatureとvibrationも送信される
Connection interrupted. Error: AWS_ERROR_MQTT_UNEXPECTED_HANGUP: The connection was closed unexpectedly.
Connection resumed. return_code: 0 session_present: True
Temperature Message Published
🔔 Delta received: {'MOTOR': 'ON'}
✅ MOTOR_STATUS changed to ON
Temperature Message Published
Motor is running, Vibration Message Published
Temperature Message Published
Motor is running, Vibration Message Published
Temperature Message Published
Motor is running, Vibration Message Published
Temperature Message Published
Motor is running, Vibration Message Published
Temperature Message Published
Motor is running, Vibration Message Published
Temperature Message Published
Motor is running, Vibration Message Published
Temperature Message Published
Motor is running, Vibration Message Published

# update.pyを実行してpayloadを"MOTOR": "OFF"でpublishすると、temperatureのみ送信される
🔔 Delta received: {'MOTOR': 'OFF'}
✅ MOTOR_STATUS changed to OFF
Temperature Message Published
Temperature Message Published

まとめ

今回はデバイスシャドウの状態更新について、ハンズオンを参考にして実装して、外部のpythonコードからデバイスシャドウのステートを変更できるようにしてみました。今後はこちらをrustで実装してみたり、実際に物理デバイスと接続してゴニョゴニョしてみるかもしれません。

参考資料

https://www.cloudbuilders.jp/articles/4109/

https://zenn.dev/shikira/articles/20240318-aws-iot-immersionday-workshop-iot-core

Discussion