AWS IoT CoreのDevice ShadowをRubyクライアントでお試し
はじめに
IoT Core の Device Shadow のアイデアは分かるんだが、実際のクライアントはどう書くの?[1]と思ったので Ruby クライアントを書いて試したという話。「AWS IoTにRubyのMQTTクライアントから接続」を見ておくと話が早いです。
Device Shadow
IoT Core には Device Shadow という実装があります。Device Shadow は簡単にいうと、物理的なデバイスに紐つく JSON のデータで、クラウド側に保存されます。
例えばデバイスが乗り物またはロボットみたいなものだとして、そいつには fast
, normal
, slow
という動作モードがあるとします。このモードを IoT Core がデバイスに紐つけて Shadow として持つわけです。オペレーターのアプリケーションがデバイスの動作モードを切り替えたいときには、フィールドのデバイスと直接やり取りせずにクラウド内にある Shadow のモードを書き換えると、あとは IoT Core がデバイスといい感じに同期してくれます。
物理デバイスが関係する IoT の世界では、ネットワークが不安定だったり、デバイスの電源が切れていたりで、アプリケーションとデバイスとは必ずしもリアルタイムに状態同期できないので、異常系の処理がめんどくさくなりがちです。そこをアプリで頑張らなくても、Shadow が仲介して同期してくれると言う訳です。
コード
コード見た方が早いですね。とりあえず貼り付けます。
- デバイスと Device Shadow 間の Shadow データのやり取りには MQTT が推奨されています[2]。今回は ruby-mqtt を使います。
- デバイスは Shadow に対するアクションである取得, 更新, 削除を、それぞれ
/get
,/update
,/delete
トピックにメッセージを投げることで実行します。 - シャドウでやりとりするメッセージは Device Shadow サービスドキュメントで決められています。
- 実行結果は
/get/accepted
,/get/rejected
など、コード内の@topics
にあるMQTT トピックに帰ってくるのでこれらをsubscribe
しておきます[3]。ここはループが他の動作をブロックしないように Thread にしています。
デバイス
require 'mqtt'
require 'json'
class Device
attr_reader :delta
attr_accessor :local_state
def initialize
@mqtt_config = {
host: "<your-endpoint>-ats.iot.ap-northeast-1.amazonaws.com",
port: 8883,
ssl: true,
cert_file: "device.pem.crt",
key_file: "private.pem.key",
ca_file: "AmazonRootCA1.pem"
}
@thingName = "ta"
@shadowName = "sb"
# classic shadow
# @shadow_topic = "$aws/things/#{@thingName}/shadow"
# named shadow
@shadow_topic = "$aws/things/#{@thingName}/shadow/name/#{@shadowName}"
@message = { state: { reported: { mode: nil } } }
@topics = [
@shadow_topic + "/delete/accepted",
@shadow_topic + "/delete/rejected",
@shadow_topic + "/get/accepted",
@shadow_topic + "/get/rejected",
@shadow_topic + "/update/accepted",
@shadow_topic + "/update/rejected",
@shadow_topic + "/update/delta",
@shadow_topic + "/update/documents"
]
end
def connect
@client = MQTT::Client.connect(**@mqtt_config)
end
def subscribe
Thread.new do
@client.get(@topics) do |topic, message|
puts topic
puts JSON.pretty_generate(JSON.parse(message))
if topic.end_with?("delta")
@delta = JSON.parse(message, symbolize_names: true)
end
end
end
end
def get
@client.publish(@shadow_topic + "/get")
end
def sync
@local_state = @delta[:state][:mode]
end
def report
@message[:state][:reported][:mode] = @local_state
@client.publish(@shadow_topic + "/update", @message.to_json)
end
def delete
@client.publish(@shadow_topic + "/delete")
end
end
アプリ
アプリ側は aws
コマンドを使うことにします。
THING_NAME=ta
SHADOW_NAME=sb
MODE=$1
aws iot-data update-thing-shadow --thing-name $THING_NAME --shadow-name $SHADOW_NAME \
--cli-binary-format raw-in-base64-out \
--payload "{\"state\":{\"desired\":{\"mode\":\"$MODE\"}}}" /dev/stdout \
| jq .
aws iot-data get-thing-shadow --thing-name da --shadow-name sb /dev/stdout | jq .
同期シナリオ
いろんなシナリオがあると思いますが、ここでは以下のような感じでやってみます。
- デバイスが現在のモード
reported
を Shadow に伝える - アプリが望ましいモード
desierd
を Shadow に伝える - その差分
delta
を元にデバイスが現在のモードをdesired
なもので書き換えて同期
reported
を Shadow に伝える
1. デバイスが現在のモード デバイス側は irb を使ってインタラクティブにやっていきます。
irb(main):006:0> require './device.rb'
irb(main):007:0> d = Device.new
irb(main):008:0> d.connect
irb(main):009:0> d.subscribe
irb(main):010:0> d.local_state="normal"
irb(main):011:0> d.report
ここで Shadow の状態はこんな感じかと
{
"state": {
"reported": {
"mode": "normal"
}
}
}
desierd
を Shadow に伝える
2. アプリが望ましいモード アプリが desired
を送る
./app-desire.sh normal
Shadow はマージされてこうなる
{
"state": {
"desired": {
"mode": "normal"
},
"reported": {
"mode": "normal"
}
}
}
ここで normal
から fast
にしてみます
./app-desire.sh fast
Shadow が差分を計算して delta
ができました。
{
"state": {
"desired": {
"mode": "fast"
},
"reported": {
"mode": "normal"
},
"delta": {
"mode": "fast"
}
}
}
delta
を元にデバイスが現在のモードを desired
なもので書き換えて同期
3. ネットワークが正常であれば、先ほどのアプリのメッセージをトリガーに delta
が $aws/things/ta/shadow/name/sb/update/delta
トピックを通じて、以下のようにデバイスに伝わってきているはずです。デバイスがオフラインであり、再接続後に改めて取りに行く場合はデバイスが d.get
で取得できます。
{
"version": 10,
"timestamp": 1670636133,
"state": {
"mode": "fast"
},
"metadata": {
"mode": {
"timestamp": 1670636133
}
}
}
delta
をデバイスの local_state
と同期します。
irb(main):015:0> d.sync
=> "fast"
irb(main):016:0> d.local_state
=> "fast"
最後に、同期したことを Shadow に伝えます。
irb(main):019:0> d.report
Shadow 側も fast
になりました。
{
"state": {
"desired": {
"mode": "fast"
},
"reported": {
"mode": "fast"
}
}
}
めでたし、めでたし。
まとめ
何となく理解していた Device Shadow を Ruby で簡単に実装して試してみました。こういうケースは irb をつかってインタラクティブにやるのが分かりやすい。ここで Shadow を消したら... とか異常系を含めた、いろんなシナリオを試すことができる。Shadow については、「デジタルツインとは ? その概念と実装を、AWS IoT Core の「デバイスシャドウ」で追う」も Shadow の使い所がうまく説明されています。
この記事が、誰かの時間の節約になれば。
Discussion