🌚

AWS IoT CoreのDevice ShadowをRubyクライアントでお試し

2022/12/10に公開約6,200字

はじめに

IoT Core の Device Shadow のアイデアは分かるんだが、実際のクライアントはどう書くの?[1]と思ったので Ruby クライアントを書いて試したという話。「AWS IoTにRubyのMQTTクライアントから接続」を見ておくと話が早いです。

Device Shadow

IoT Core には Device Shadow という実装があります。Device Shadow は簡単にいうと、物理的なデバイスに紐つく JSON のデータで、クラウド側に保存されます。

例えばデバイスが乗り物またはロボットみたいなものだとして、そいつには fast, normal, slow という動作モードがあるとします。このモードを IoT Core がデバイスに紐つけて Shadow として持つわけです。オペレーターのアプリケーションがデバイスの動作モードを切り替えたいときには、フィールドのデバイスと直接やり取りせずにクラウド内にある Shadow のモードを書き換えると、あとは IoT Core がデバイスといい感じに同期してくれます。

物理デバイスが関係する IoT の世界では、ネットワークが不安定だったり、デバイスの電源が切れていたりで、アプリケーションとデバイスとは必ずしもリアルタイムに状態同期できないので、異常系の処理がめんどくさくなりがちです。そこをアプリで頑張らなくても、Shadow が仲介して同期してくれると言う訳です。

コード

コード見た方が早いですね。とりあえず貼り付けます。

  • デバイスと Device Shadow 間の Shadow データのやり取りには MQTT が推奨されています[2]。今回は ruby-mqtt を使います。
  • デバイスは Shadow に対するアクションである取得, 更新, 削除を、それぞれ /get, /update, /delete トピックにメッセージを投げることで実行します。
  • シャドウでやりとりするメッセージは Device Shadow サービスドキュメントで決められています。
  • 実行結果は /get/accepted, /get/rejected など、コード内の @topics にあるMQTT トピックに帰ってくるのでこれらを subscribe しておきます[3]。ここはループが他の動作をブロックしないように Thread にしています。

デバイス

device.rb
require 'mqtt'
require 'json'

class Device
  attr_reader :delta
  attr_accessor :local_state

  def initialize
    @mqtt_config = {
      host: "<your-endpoint>-ats.iot.ap-northeast-1.amazonaws.com",
      port: 8883,
      ssl: true,
      cert_file: "device.pem.crt",
      key_file: "private.pem.key",
      ca_file: "AmazonRootCA1.pem"
    }
    @thingName = "ta"
    @shadowName = "sb"
    
    # classic shadow
    # @shadow_topic = "$aws/things/#{@thingName}/shadow"
    
    # named shadow
    @shadow_topic = "$aws/things/#{@thingName}/shadow/name/#{@shadowName}" 
    
    @message = { state: { reported: { mode: nil } } }
    @topics = [
      @shadow_topic + "/delete/accepted",
      @shadow_topic + "/delete/rejected",
      @shadow_topic + "/get/accepted",
      @shadow_topic + "/get/rejected",
      @shadow_topic + "/update/accepted",
      @shadow_topic + "/update/rejected",
      @shadow_topic + "/update/delta",
      @shadow_topic + "/update/documents"
    ]
  end

  def connect
    @client =  MQTT::Client.connect(**@mqtt_config)
  end

  def subscribe
    Thread.new do 
      @client.get(@topics) do |topic, message|
        puts topic
        puts JSON.pretty_generate(JSON.parse(message))
        if topic.end_with?("delta")
          @delta = JSON.parse(message, symbolize_names: true)
        end
      end
    end
  end

  def get
    @client.publish(@shadow_topic + "/get")
  end

  def sync
    @local_state = @delta[:state][:mode]
  end

  def report
    @message[:state][:reported][:mode] =  @local_state 
    @client.publish(@shadow_topic + "/update", @message.to_json)
  end

  def delete
    @client.publish(@shadow_topic + "/delete")
  end

end

アプリ

アプリ側は aws コマンドを使うことにします。

app-desire.sh
THING_NAME=ta
SHADOW_NAME=sb
MODE=$1

aws iot-data update-thing-shadow --thing-name $THING_NAME --shadow-name $SHADOW_NAME \
    --cli-binary-format raw-in-base64-out \
    --payload "{\"state\":{\"desired\":{\"mode\":\"$MODE\"}}}" /dev/stdout \
    | jq .
app-get.sh
aws iot-data get-thing-shadow --thing-name da --shadow-name sb /dev/stdout | jq .

同期シナリオ

いろんなシナリオがあると思いますが、ここでは以下のような感じでやってみます。

  1. デバイスが現在のモード reported を Shadow に伝える
  2. アプリが望ましいモード desierd を Shadow に伝える
  3. その差分 delta を元にデバイスが現在のモードを desired なもので書き換えて同期

1. デバイスが現在のモード reported を Shadow に伝える

デバイス側は irb を使ってインタラクティブにやっていきます。

irb(main):006:0> require './device.rb'
irb(main):007:0> d = Device.new
irb(main):008:0> d.connect
irb(main):009:0> d.subscribe
irb(main):010:0> d.local_state="normal"
irb(main):011:0> d.report

ここで Shadow の状態はこんな感じかと

{
  "state": {
    "reported": {
      "mode": "normal"
    }
  }
}

2. アプリが望ましいモード desierd を Shadow に伝える

アプリが desired を送る

./app-desire.sh normal

Shadow はマージされてこうなる

{
  "state": {
    "desired": {
      "mode": "normal"
    },
    "reported": {
      "mode": "normal"
    }
  }
}

ここで normal から fast にしてみます

./app-desire.sh fast

Shadow が差分を計算して delta ができました。

{
  "state": {
    "desired": {
      "mode": "fast"
    },
    "reported": {
      "mode": "normal"
    },
    "delta": {
      "mode": "fast"
    }
  }
}

3. delta を元にデバイスが現在のモードを desired なもので書き換えて同期

ネットワークが正常であれば、先ほどのアプリのメッセージをトリガーに delta$aws/things/ta/shadow/name/sb/update/delta トピックを通じて、以下のようにデバイスに伝わってきているはずです。デバイスがオフラインであり、再接続後に改めて取りに行く場合はデバイスが d.get で取得できます。

$aws/things/ta/shadow/name/sb/update/delta
{
  "version": 10,
  "timestamp": 1670636133,
  "state": {
    "mode": "fast"
  },
  "metadata": {
    "mode": {
      "timestamp": 1670636133
    }
  }
}

delta をデバイスの local_state と同期します。

irb(main):015:0> d.sync
=> "fast"
irb(main):016:0> d.local_state
=> "fast"

最後に、同期したことを Shadow に伝えます。

irb(main):019:0> d.report

Shadow 側も fast になりました。

{
  "state": {
    "desired": {
      "mode": "fast"
    },
    "reported": {
      "mode": "fast"
    }
  }
}

めでたし、めでたし。

まとめ

何となく理解していた Device Shadow を Ruby で簡単に実装して試してみました。こういうケースは irb をつかってインタラクティブにやるのが分かりやすい。ここで Shadow を消したら... とか異常系を含めた、いろんなシナリオを試すことができる。Shadow については、「デジタルツインとは ? その概念と実装を、AWS IoT Core の「デバイスシャドウ」で追う」も Shadow の使い所がうまく説明されています。

この記事が、誰かの時間の節約になれば。

脚注
  1. 公式サイトに Device SDK を使った実装があるがわかりやすいとは思えなかった。 ↩︎

  2. HTTPでもできる。 ↩︎

  3. MQTT は基本的に Request / Response 型のプロコトルではないのでこの辺りがややめんどくさい。MQTT5では Request / Response がサポートされるが動作としては似たようなものである。 ↩︎

Discussion

ログインするとコメントできます