📮

AWS IoT CoreとRaspberry Piの双方向通信

commits9 min read

AWSクラウド ⇔ Raspberry Piの双方向通信するためにやったことを書きます。

書くこと

  • AWSクラウドとRaspberry Piの双方向通信するためにやったこと
  • Lambdaの実装例(Serverless Framework、TypeScript、GreengrassSDKとか)
  • 細かいところでハマったこと

書かないこと

  • Lambdaとは、IoTCoreとは
  • Serverless Frameworkのインストール、TypeScript用の設定周り、デプロイ周り
  • Raspberry PiのOSインストール、設定まわり
  • IAMロール周りの権限設定
  • Node.jsのインストール、Greengrass SDKのインストール
    • GreengrassV2を使いました

ゴール

以下の一連の流れを実現する

  • IoTCoreにMQTTメッセージを送る
  • Raspberry Pi上でMQTTメッセージを契機にLambdaを動かす
  • Raspberry Piのローカルのファイルアクセスする
  • Raspberry PiからMQTTメッセージを送信する
  • IoT CoreのMQTTメッセージ受信を契機にLambdaを実行する

やったこと

IoTCoreにMQTTメッセージを送る

IoTCoreのコンソールからモノを作成して証明書や鍵ファイルを取得します。
コンソールから各種設定、情報取得時のメモ。(メモですみません..)

  • コンソール > モノ > モノを作成
  • 1つのモノを作成
  • モノの名前を設定
  • 新しい証明書を自動生成 (推奨)
  • ポリシーを割当ててモノを作成
    - GreengrassV2IoTThingPolicy
    - GreengrassTESCertificatePolicyGreengrassV2TokenExchangeRoleAlias
  • 証明書とキーをダウンロード
    • キーファイルは本画面でしかダウンロードできないので注意
  • デバイスデータエンドポイントをAWSIoT > 設定 から取得

MQTTメッセージを送信するTypeScriptサンプル
取得した証明書や秘密鍵を使用します。

import awsiot from "aws-iot-device-sdk"

const device = new awsiot.device({
	keyPath:"秘密鍵のパス",
	certPath:"証明書のパス",
	caPath:"ルート証明書のパス",
	clientId:"クライアント名",  
	host:"xxxx.iot.<region>.amazonaws.com" // デバイスデータエンドポイント
})

device.on("connect",()=>{
	const sendData = {message:"test message"}
	device.publish("トピック名", JSON.stringify(sendData))  // メッセージを送信
})

publishMqttMessage({ message: "hello world" }) のようにして呼び出すとMQTTメッセージが送信されます。こんな感じ。

Raspberry Pi上でMQTTメッセージを契機にLambdaを動かす

MQTTメッセージを遅れたので、Greengrassで動作させるを作ってRaspberry Piにデプロイします。

Lambdaのサンプル

export const sampleFunction = (event, context, callback) => {
  context.callbackWaitsForEmptyEventLoop = false;

  const main = async (event) => {
    try {
      console.log(event)  // なんやかんやの処理
    } catch (err) {
      console.warn(err);
      callback(Error());
    }
    return;
  };

  main(event);
  callback(undefined, {});
  return;
};

第一引数にMQTTメッセージの中身が入ってきます。

注意するポイントは非同期関数にしないこと

Unfortunately your handler cannot be an async function.

https://github.com/aws/aws-greengrass-core-sdk-js/issues/5#issuecomment-571206908

公式サンプルも同期関数です。ここを非同期で書くと動きません。半日溶かしました。

Lambdaを同期関数にする時の実装

https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html
context.callbackWaitsForEmptyEventLoop = false;

この1行を入れないとエラーとなります。

Lambdaを作成したので、IoTCoreのコンソールからコンポーネントを作ります。

ポイントはイベントソースの設定です。ここに設定したトピックで受信したメッセージを契機にLambdaが実行されます。

タイプには「AWS IoT Core MQTT」を指定します。

Raspberry Piのローカルのファイルアクセスする

LambdaからRaspberry Piのローカルのファイルアクセスしたい場合は、コンポーネントの設定画面でボリュームを設定します。
この設定により、例えばRaspberry Pi上にあるセンサーデータの読み込みや
書き込みが出来ます。

物理ボリュームにRaspberry Piのパス、論理ボリュームにはLambdaで指定する時のパスを設定します。

これでローカルのファイルへの読み込みと書き込みが出来るようになります。
Lambdaからは論理ボリュームに設定したパス名でアクセスが可能です。

LambdaからCSVファイルにデータを書き込むサンプル

import { createObjectCsvWriter } from "csv-writer";

const csvWriter = createObjectCsvWriter({
	"/home/ggc_user/test.csv",
	header
});

csvWriter.writeRecords(writeData).catch((err) => {
	console.log(err);
});

Raspberry PiからMQTTメッセージを送信する

AWSクラウドからRaspberry Piへのデータ送信ができたので、次はRaspberry PiからAWSクラウドへのデータ送信を実装します。

具体的には、Raspberry PiにデプロイしたLambdaからIoTCoreにMQTTメッセージを送信します。

IoTCoreにMQTTメッセージを送るためには、デプロイ時のコンポーネントの選択において、パブリックコンポーネントのaws.greengrass.LegacySubscriptionRouterを指定する必要があります。

以下、LegacySubscriptionRouterの設定例です。

{
  "subscriptions": {
    "Greengrass_HelloWorld_to_cloud": {
      "id": "Greengrass_HelloWorld_to_cloud",
      "source": "component:com.example.HelloWorldLambda",
      "subject": "hello/world",
      "target": "cloud"
    }
  }
}

sourceには作成したコンポーネントで使用するLambdaを設定します。subjectには送信先のトピック名を設定します。

https://docs.aws.amazon.com/greengrass/v2/developerguide/legacy-subscription-router-component.html?icmpid=docs_gg_console

GreengrassSDKの公式サンプルを参考として、MQTTメッセージ送信を実装しました。

https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/HelloWorld/index.js#L40

Lambdaサンプル

import ggSdk, { PublishParams } from "aws-greengrass-core-sdk";

export default class IoTClient {
  constructor(private iotClient = new ggSdk.IotData()) {}

  publishCallback = (err, data) => {
    if (err) {
      console.log(err);
    }
    console.log(data);
  };

  pubOpt = (jsonData): PublishParams => {
    return {
      topic: "hello/world",
      payload: JSON.stringify({
        message: jsonData,
      }),
      queueFullPolicy: "AllOrError",
    };
  };

  sendData = async <T>(publishData: T): Promise<void> => {
    this.iotClient.publish(this.pubOpt(publishData), this.publishCallback);
  };
}

sendDataメソッドを呼んであげると、指定したトピック(ここでは "hello/world")にデータが送信されます。

IoT CoreのMQTTメッセージ受信を契機にLambdaを実行する

Raspberry Pi⇒IoTCoreへの送信が出来たので、その後の処理をLambdaで実装しました。

トリガーにAWS IoTを設定

ルールクエリステートメントにSQL Likeな文を書きます

Lambdaの実装

export const sample = async (event) => {
  console.log(event);
	...
	// Raspberry Piから送られてきたデータをDB保存するなどの処理を書く
}

こちらは非同期関数として実装できます。。。

IoT CoreのテストクライアントからMQTTメッセージを送信して、Lambdaのログに出力されることを確認できます。

省略しましたが、別のAPIを叩いてデータ登録したり、そのままRDS等に書き込んだり、SESと連携してメール通知したり。

小賢しいこと

Raspberry PiにデプロイするLambdaをこんな感じで書くと定期実行処理とイベント駆動の処理を一つの関数でかけます

setInterval(★★なんやかんやの処理★★, 5000);

export const sampleFunction = (event, context, callback) => {
  context.callbackWaitsForEmptyEventLoop = false;

  const main = async (event) => {...
	}

  main(event);
  callback(undefined, {});
  return;
};

デプロイ時のLambda起動時にsetInterval関数が実行されて、以降定期的に★★なんやかんやの処理★★が動いてくれます。

一方で、event駆動の関数はMQTTメッセージの受信を契機に実行されます。

つまり、setIntervalの★★なんやかんやの処理★★ にMQTTメッセージを送信する処理を書くことで、1つのLambdaで双方向通信が出来ることになります。実際出来ます。

ループを避けるため、トピックはそれぞれ別のモノを指定しましょう。

以上、参考まで。

(追記)
公式のサンプルでもsetInterval使ってますね。

https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/HelloWorld/index.js

ハマったこと

Componentのバージョン管理でハマったこと

経緯を忘れてしまったので、注意すべきこと、という観点で書きます。

パブリックコンポーネントを指定してデプロイする場合、バージョン指定しない場合はそのコンポーネントの最新版がインストールされるようです。
そのため、依存関係に注意が必要です。

例えば、nucleusコンポーネント2.4.0をインストールしている状態でLambdaManagerコンポーネントをバージョン指定せずにデプロイするとどうなるか。

2021年12月現在は正常にデプロイされますが、LambdaManagerの最新版が2.2.0だった2021年10月~11月頃はデプロイエラーとなりました。

原因はLambdaManagerの2.2.0の依存関係が下記の通りとなっており、既にインストールされていたGreengrass nucleusとの依存関係でエラーとなるためです。

Greengrass nucleus	>=2.5.0 <2.6.0

=> nucleus 2.4.0 がデプロイされているとデプロイエラーとなります。ログにバージョン関係のエラーが吐かれていました。

greengrass.logのエラーを抜粋します。

2021-11-15T03:25:30.357Z [ERROR] (pool-2-thread-21) com.aws.greengrass.componentmanager.ComponentManager: Failed to negotiate version with cloud and no local version to fall back to. {componentName=aws.greengrass.Nucleus, versionRequirement={aws.greengrass.Cli=>=2.1.0 <2.5.0, aws.greengrass.LambdaManager=>=2.5.0 <2.6.0}}
2021-11-15T03:25:30.358Z [ERROR] (pool-2-thread-20) com.aws.greengrass.deployment.DeploymentService: Error occurred while processing deployment. {deploymentId=ccd313a1-0c81-4425-b454-1c612162c908, serviceName=DeploymentService, currentState=RUNNING}
java.util.concurrent.ExecutionException: com.aws.greengrass.componentmanager.exceptions.NoAvailableComponentVersionException: Failed to negotiate component 'aws.greengrass.Nucleus' version with cloud and no local applicable version satisfying requirement '{aws.greengrass.Cli=>=2.1.0 <2.5.0, aws.greengrass.LambdaManager=>=2.5.0 <2.6.0}'.

LambdaManagerの2.1.4版をインストールすることで対応しましが、確認に手間取り、半日溶かしました。。

2.1.4の依存関係

Greengrass nucleus >=2.0.0 <2.5.0

最新の2.2.1では以下の通り。nucleusの対応バージョンが広がったのでデプロイすることができます。

Greengrass nucleus >=2.0.0 <2.6.0

似たようなエラーログがgreengrass.logに出力された場合は、コンポーネントのドキュメントを読んでみてください。

LambdaManagerコンポーネント

Greengrass Cliコンポーネント, Nucleusコンポーネント

やってみての感想

  • 色々と試行錯誤したので、IoTCore、Lambdaとお友達になることができました。特にMQTTでの双方向通信が簡単に出来るようになったので、色々と遊んでみたいです。

  • ServerlessFramework + TypeScriptでLambdaを実装するところも割と簡単に出来ることがわかったし体験できたことも良かったです。

  • AWSのサービスは日々強化されていること、割と大きな修正・変更がサラッと入ることを体験しました。ちゃんとドキュメントを読みます。

以上です。

GitHubで編集を提案

Discussion

ログインするとコメントできます