はじめに
前回まででスタンプラリーアプリの構築については完了しました。今回は、スタンプラリーアプリにおいて複数のユーザーが同時にスタンプラリーを行う際に、主催者がスタンプラリーの進行状況をリアルタイムで確認できるようにするために、AWS IoT Coreを使用してスタンプラリーアプリとクラウドを連携させる方法について解説します。
AWS IoT Coreとは
AWS IoT Coreは、インターネットに接続されたデバイスとクラウドアプリケーションとの間でセキュアな通信を可能にするフルマネージド型のクラウドサービスです。AWS IoT Coreを使用することで、デバイスとクラウドアプリケーション間でのデータの送受信を簡単に行うことができます。
例えば、AWS IoT Coreは以下のようなモジュール・デバイスやプロトコルをサポートしています。
-
対応デバイスおよびモジュール:
- 組み込みデバイス
- マイクロコントローラーやマイクロプロセッサを搭載し、特定のタスクを実行するために設計されたデバイス。
- 例) センサー、アクチュエーター、スマート家電など。
- ゲートウェイデバイス
- 複数のIoTデバイスを集約し、クラウドと通信する役割を持つデバイス。
- 例) プロトコル変換、データフィルタリング、またはセキュリティ強化の役割を果たす場合もある。
- モバイルデバイス
- スマートフォンやタブレットなどのモバイル端末。
- 例) AWS IoT CoreのモバイルSDKなどを利用することで、データ収集、デバイス制御、またはクラウドとのシームレスな連携が可能。
- 産業用デバイス
- 工場の機械やセンサー、ロボットなどの産業機器。
- 例) リアルタイムのデータ収集、分析、および予測メンテナンスを通じて、運用効率や稼働時間の向上を支援。
- 車載デバイス
- 自動車内のセンサーやエンターテインメントシステム。
- 例) 車両データの収集、リモート管理、およびクラウドを利用した高度な分析が可能。
- 組み込みデバイス
-
対応通信プロトコル
- MQTT(メッセージキューイング通信モデル)
- 軽量なパブリッシュ/サブスクライブ型のメッセージングプロトコル。
- 低帯域幅や高遅延のネットワーク環境に適している。
- HTTP/HTTPS
- 標準的なウェブ通信プロトコル。
- 既存のウェブ技術との統合が容易。
- WebSocket
- 双方向通信を可能にするプロトコル。
- リアルタイムアプリケーションに適している。
- LoRaWAN
- 低電力・長距離通信を実現するプロトコル。
- AWS IoT Core for LoRaWANを通じてサポート。
- MQTT(メッセージキューイング通信モデル)
-
メッセージ受信後の処理
- データストレージ:
- Amazon S3やAmazon DynamoDBにデータを保存。
- リアルタイム処理:
- AWS Lambdaを使用して、受信データに基づくリアルタイムな処理や分析を実行。
- 通知とアラート:
- Amazon SNSを利用して、特定の条件に基づく通知やアラートを送信。
- デバイスシャドウの更新:
- デバイスの現在の状態をクラウド上に保持し、デバイスとアプリケーション間での状態同期を実現。
- ルールエンジンの活用:
- AWS IoT Coreのルールエンジンを使用して、受信データに基づく自動処理や他のAWSサービスとの連携を設定。
- データストレージ:
モジュールやデバイスから送信されたデータ(例: 温度や位置情報など)は、メッセージブローカーを介してAWS IoT Coreに送信されます。AWS IoT Coreは、受信したメッセージをサブスクライバー(例えば、他のデバイスやクラウドサービス)にリアルタイムで配信することが可能です。
また、AWS IoT Coreには「ルールエンジン」という機能があり、受信したメッセージに特定の条件を設定して自動的に処理を行うことができます。例えば、温度がしきい値を超えた場合に通知を送信したり、GPSデータをAmazon DynamoDBに保存したりするアクションを実行できます。この仕組みにより、AWS IoT Coreは効率的で柔軟なメッセージ処理を実現します。
AWS IoT Coreは、セキュリティを最優先に設計されています。デバイスとクラウド間の通信は、業界標準のTLS(Transport Layer Security)プロトコルを使用して暗号化され、不正アクセスや盗聴のリスクを低減します。
さらに、AWS IoT Coreは認証と認可の仕組みを提供しています。デバイスやアプリケーションがAWSリソースにアクセスする際には、IAM(Identity and Access Management)ロールを使用してアクセス権限を細かく制御できます。また、デバイスごとに発行されるX.509証明書を使用した認証により、不正なデバイスがネットワークに接続するリスクを排除します。
例えば、工場内のIoTデバイスがAWS IoT Coreを介してデータを送信する場合、TLSで暗号化された通信を行うとともに、IAMロールを適用して特定のS3バケットにのみアクセスを許可することが可能です。この仕組みにより、セキュリティが強化されるだけでなく、柔軟なアクセス管理が実現します。
今回はMQTTプロトコルを使用し、モバイルデバイス(スタンプラリーアプリ)からAWS IoT Coreにデータを送信し、DynamoDBにデータを保存する仕組みを構築します。
モノの作成
まずは、AWS IoT Core上に「モノ(Thing)」を作成します。モノは、AWS IoT Coreに接続されるモジュール・デバイスやアプリケーションを表します。モノを作成することで、デバイスやアプリケーションがAWS IoT Coreに接続し、メッセージの送受信を行う準備が整います。ここではAWS Management Consoleからモノを作成します。
- AWS IoT Coreのコンソール画面を開く
- サイドバーの管理 > すべてのデバイス > モノを選択すると、モノの一覧が表示される
- モノの一覧画面で「モノを作成」をクリックする
- 作成するモノの数は「1 つのモノを作成」のまま「次へ」を押す
- 任意のモノの名前を入力、他の項目はそのまま「次へ」を押す
- デバイス証明書は「新しい証明書を自動生成」を選択し、「次へ」を押す
- 「ポリシーを作成」を押す
- 別タブ「ポリシーを作成」で以下の通り入力し、「作成」を押す
- ポリシー名:
flutter_app_policy
- ポリシー効果:
許可
- ポリシーアクション:
*
- ポリシーリソース:
*
あとで、ポリシーを制限します。
- ポリシー名:
- 元のタブに戻り、作成したポリシーを選択し、「モノを作成」を押す
- 「証明書とキーをダウンロード」タブが表示されるので、「デバイス証明書」「パブリック/プライベートキーファイル」「ECC 256 ビットキー: Amazon ルート CA 1」をダウンロードし、「完了」を押す
- モノが作成できていることを確認する
これで、AWS IoT Core上にモノが作成されました。次に、スタンプラリーアプリからAWS IoT Coreにデータを送信するための設定を行います。
MQTTクライアントの作成
モバイルアプリからAWS IoT Coreにデータを送信するためには、先ほどダウンロードした3つのファイルを使用して、接続するためのMqttServerClientを作成します。使用するファイルは以下の通りです。
- デバイス証明書: モノ(デバイス)を識別するための一意な証明書。デバイスが正当なものであることを証明する。
- パブリック/プライベートキーファイル: 暗号化通信と認証のために使用されるキーペア。
- ECC 256 ビットキー: Amazon ルート CA 1: AWS IoT Coreに接続するためのルート証明書。AWS IoT Coreが信頼できるサービスであることを証明する。
これらのファイルを使用して、スタンプラリーアプリからAWS IoT Coreにデータを送信するための証明書を設定します。
- ダウンロードした3つの証明書とキーファイルをプロジェクトの
assets/cert
ディレクトリに配置する。
-
pubspec.yaml
ファイルに以下の設定を追加する。flutter: assets: - ... // 他のassetsの設定 - assets/certs/AmazonRootCA1.pem - assets/certs/certificate.pem.crt - assets/certs/private.pem.key
-
lib
ディレクトリにaws_iot.dart
ファイルを作成し、以下のコードを記述する。import 'dart:io'; import 'package:flutter/services.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; // AWS IoT Coreに接続するためのMqttServerClientを作成する Future<MqttServerClient> createMqttClient(String clientId) async { // MqttServerClientを作成 // AWS IoT CoreのエンドポイントとクライアントIDを指定 final client = MqttServerClient( '**********.iot.ap-northeast-1.amazonaws.com', clientId, ); client.port = 8883; client.secure = true; // TLSを使用 client.setProtocolV311(); // MQTTプロトコルバージョン3.1.1を使用 client.keepAlivePeriod = 60; // 接続維持のためのタイムアウト時間 client.autoReconnect = true; // 自動再接続を有効化 client.connectTimeoutPeriod = 300; // 接続タイムアウト時間 final context = SecurityContext.defaultContext; try { // 証明書とキーをロード final rootCert = await rootBundle.load('assets/certs/AmazonRootCA1.pem'); final deviceCert = await rootBundle.load('assets/certs/certificate.pem.crt'); final privateKey = await rootBundle.load('assets/certs/private.pem.key'); // セキュリティコンテキストに証明書とキーを設定 context ..setTrustedCertificatesBytes(rootCert.buffer.asUint8List()) ..useCertificateChainBytes(deviceCert.buffer.asUint8List()) ..usePrivateKeyBytes(privateKey.buffer.asUint8List()); client.securityContext = context; print('Certificates loaded successfully.'); } catch (e) { print('Error loading certificates: $e'); throw Exception('Failed to load certificates'); } // ログ出力を有効化 client.logging(on: true); client.onConnected = () => print('Connected to AWS IoT Core'); // 接続成功時のコールバック client.onDisconnected = () => print('Disconnected from AWS IoT Core'); // 切断時のコールバック client.pongCallback = () => print('Ping response received'); // ピンポン応答時のコールバック return client; }
AWS IoT Coreのエンドポイントは、AWS IoTのコンソール画面のサイドバー > ドメイン設定から確認することができます。
デフォルトでドメイン「iot:Data-ATS」が存在するので、このドメイン名をエンドポイントに指定します。
これで、AWS IoT Coreに接続するためのMqttServerClientを作成する関数が定義されました。次に、スタンプラリーアプリからAWS IoT Coreにデータを送信するための処理を実装します。
メッセージの送信
今回作成するスタンプラリーアプリでは、チェックインの状況をAWS IoT Coreに送信(Publish)します。送信するメッセージは、以下のような形式とします。
{
"userId": "ユーザーID",
"checkpointId": "チェックポイントID",
"checkinType": "チェックインタイプ(0:GPS, 1:BLE)",
"timestamp": "チェックイン日時",
"checkInNumber": "チェックイン回数"
}
今回のアプリにユーザ作成/認証認可といった認証機能は持たせていないので、ユーザーIDにはUUID v4で生成します。UUID v4は高い一意性を持つため、ユーザーを一意に識別するのに適しています。一度生成したユーザーIDはshared_preferences
パッケージを使用することで、デバイスのローカルストレージに保存し、アプリケーションの再起動時にも再利用できます。
以下は、最初にUUID v4でユーザーIDを生成しローカルストレージに保存、2回目以降はローカルストレージからユーザーIDを取得する処理です。
import 'package:uuid/uuid.dart';
import 'package:shared_preferences/shared_preferences.dart';
// userIdを取得する関数
Future<String> getUserId() async {
final prefs = await SharedPreferences.getInstance();
String? userId = prefs.getString('userId');
if (userId == null) {
userId = const Uuid().v4(); // UUID を生成
await prefs.setString('userId', userId); // 永続的に保存
}
return userId;
}
その他にもAWS Amplifyを使用してCognitoを利用ことや、Firebase Authenticationを利用することで、ユーザー認証機能を追加することができます。
続いて、チェックインの状況をAWS IoT Coreに送信する処理を実装します。
import 'package:mqtt_client/mqtt_client.dart';
// チェックイン情報をAWS IoT Coreに送信
Future<void> publishCheckin({
required int checkpointId,
required int checkInType,
required String timestamp,
required int checkInNumber,
}) async {
final userId = await getUserId();
final clientId = 'flutter-client-$userId';
final client = await createMqttClient(clientId);
try {
await client.connect();
} catch (e) {
print('Connection failed: $e');
client.disconnect();
return;
}
if (client.connectionStatus?.state == MqttConnectionState.connected) {
const topic = 'devices/checkIns';
final timestamp = DateTime.now().toIso8601String();
// メッセージ
final payload = """
{
"userId": "$userId",
"checkpointId": "$checkpointId",
"checkinType": $checkInType,
"timestamp": "$timestamp",
"checkInNumber": $checkInNumber
}
""";
client.publishMessage(
topic,
MqttQos.atLeastOnce,
MqttClientPayloadBuilder().addString(payload).payload!,
);
// 確認のため少し待機
await Future.delayed(Duration(seconds: 5));
client.disconnect();
} else {
print('Failed to connect, status: ${client.connectionStatus}');
client.disconnect();
}
}
この関数では次の処理が順番に行われます。
- クライアントIDの生成
ユーザーIDを元にクライアントIDを生成しています。
クライアントIDは、例えばflutter-client-ユーザーID
のようにユーザーごとに異なるIDを指定することで、MQTTの要件であるクライアントIDの一意性を保証しています。 - AWS IoT Coreに接続
createMqttClient
関数を使用して、MQTTクライアントを作成し、AWS IoT Coreへの接続を試みます。
接続が成功するとcreateMqttClient
関数で設定したコールバック関数が呼び出され、ログにConnected to AWS IoT Core
が表示されます。 - トピックへのメッセージの送信
devices/checkIns
というトピックを指定し、チェックイン情報をJSON形式のペイロードとしてメッセージを送信します。
トピックとは、メッセージの送信先を指定するためのアドレスのようなもので、AWS IoT Coreではトピックを使用してメッセージの送受信を行います。トピックはスラッシュ(/
)で階層構造を持つことができ、例えばdevices/checkIns
というトピックはdevices
という親トピックの下にcheckIns
という子トピックがある構造を持ちます。 - 接続の切断
メッセージの送信後、一定時間待機してから接続を切断します。
あとは、チェックイン処理が行われるタイミングでpublishCheckin
関数を呼び出すことで、AWS IoT Coreにチェックイン情報を送信することができます。
// チェックイン済みの数を取得
final checkedCount =
await SpotFeatureHelper.getSpots(widget.database).then((spots) {
return spots.where((spot) => spot.isChecked == 1).length;
});
// チェックイン処理
checkIn() async {
// チェックイン情報をDBに保存
SpotFeatureHelper.updateSpotChecked(widget.database, checkInSpotInfo.id);
// チェックインバッヂを表示
_mapController.addCheckedMarker(checkInSpotInfo.coordinates);
// チェックイン情報をAWS IoT Coreに送信
await publishCheckin(
checkpointId: checkInSpotInfo.id,
checkInType: 0, // GPSチェックイン
timestamp: DateTime.now().toIso8601String(),
checkInNumber: checkedCount + 1);
}
AWS IoT Coreでの受信検証
正常にメッセージが送信されたことを確認するには、AWS IoT Coreの「MQTT テストクライアント」を使用します。
-
AWS IoTのサイドバー > テスト > MQTTテストクライアントを選択する
-
「接続の詳細」のエンドポイントが``createMqttClient`関数で指定したエンドポイントと一致していることを確認する
MQTTバージョンは、デバイスが使用するのはMQTT 3.1.1ですが、MQTT 5は下位互換性があるため、どちらでも問題ありません。 -
タブ「トピックをサブスクライブする」にて、トピックフィルターに
devices/checkIns
を入力して「サブスクライブ」を押下する
-
デバイスからチェックイン情報を送信する
今回はチェックインボタンを押下すると、AWS IoT Coreにチェックイン情報が送信されるようになっています。 -
メッセージが正常に送信されたことを確認する
これで、デバイス(スタンプラリーアプリ)からチェックイン情報をMQTTプロトコルを使用して送信し、AWS IoT Coreのメッセージブローカーが受信されることが確認できました。
なお、現在モノにアタッチしている証明書のポリシーは全てのリソースに対して全てのアクションを許可するので、以下のように設定することをお勧めします。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": "arn:aws:iot:<region>:<account-id>:client/flutter-client-*",
"Condition": {
"Bool": {
"iot:Connection.Thing.IsAttached": "true"
}
}
},
{
"Effect": "Allow",
"Action": "iot:Publish",
"Resource": "arn:aws:iot:<region>:<account-id>:topic/devices/checkIns"
}
]
}
設定内容か以下の通りです。
-
iot:Connect
: モノがAWS IoT Coreに接続する際のアクションを許可
ここでは、flutter-client-*
というクライアントIDを持つモノで、かつモノにアタッチされていることを条件としている。 -
iot:Publish
: モノが指定したトピックにメッセージを送信する際のアクションを許可
ここでは、devices/checkIns
というトピックにメッセージを送信するアクションを許可している。
DynamoDBへのデータ保存
AWS IoT Coreにはルールエンジンという機能があり、受信したメッセージに基づいて自動的に処理を行うことができます。この機能を使用して、AWS IoT Coreから受信したチェックイン情報をDynamoDBに保存する処理を実装します。
チェックイン情報は2つのテーブルに保存します。
- UserCheckinsテーブル
- ユーザーごとのチェックイン情報を保存するテーブル~
- 主キー: userId(パーティションキー), timestamp(ソートキー)
- 属性: userId, timestamp, checkpointId, checkinType, checkInNumber
- CheckpointCheckinsテーブル
- チェックポイントごとのチェックイン情報を保存するテーブル
- 主キー: checkpointId(パーティションキー), timestamp(ソートキー)
- 属性: checkpointId, timestamp, userId, checkinType, checkInNumber
まずは、DynamoDBにチェックイン情報を保存するためのテーブルを作成します。
- DynamoDBのコンソール画面を開き、サイドバー > テーブルを選択、テーブル一覧画面で「テーブルの作成」を押下する
- テーブル名を「UserCheckins」と入力し、パーティションキーに「userId」、ソートキーに「timestamp」を押下する
または、AWS CLIを使用して以下のコマンドを実行します。
aws dynamodb create-table \
--table-name UserCheckins \
--attribute-definitions \
AttributeName=userId,AttributeType=S \
AttributeName=timestamp,AttributeType=S \
--key-schema \
AttributeName=userId,KeyType=HASH \
AttributeName=timestamp,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST \
--region ap-northeast-1
- 同様に、テーブル名を「CheckpointCheckins」と入力し、パーティションキーに「checkpointId」、ソートキーに「timestamp」を押下する
または、AWS CLIを使用して以下のコマンドを実行します。
aws dynamodb create-table \
--table-name CheckpointCheckins \
--attribute-definitions \
AttributeName=checkpointId,AttributeType=S \
AttributeName=timestamp,AttributeType=S \
--key-schema \
AttributeName=checkpointId,KeyType=HASH \
AttributeName=timestamp,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST \
--region ap-northeast-1
- テーブルが作成されたことを確認する
テーブル一覧に「UserCheckins」と「CheckpointCheckins」が表示されていれば成功です。
続いて、AWS IoT Coreのルールエンジンを使用して、受信したチェックイン情報をDynamoDBに保存する処理を実装します。
- AWS IoT Coreのコンソール画面を開き、サイドバー > メッセージのルーティング > ルールを選択する
- 「ルールの作成」を押下し、適当なルール名を入力する
- SQLステートメントには以下のように入力する
SELECT * FROM 'devices/checkIns'
メッセージブローカーが受信したメッセージのトピックがdevices/checkIns
の場合にこのルールが適用されるように設定しています。
4. アクション1には「DynamoDB」を選択し、以下の設定を入力する
- テーブル名:
UserCheckins
- パーティションキー:
userId
- パーティションキーの値:
${userId}
- ソートキー:
timestamp
- ソートキーの値:
${timestamp}
IAMロールを割り当てる必要があるので、新しいロールを作成するか、既存のロールを選択します。
「新しいロールを作成」を選択すると、自動的に必要な権限が追加されます。
-
アクション2には「DynamoDB」を選択し、以下の設定を入力する
- テーブル名:
CheckpointCheckins
- パーティションキー:
checkpointId
- パーティションキーの値:
${checkpointId}
- ソートキー:
timestamp
- ソートキーの値:
${timestamp}
IAMロールを割り当てる必要があるので、新しいロールを作成するか、既存のロールを選択します。
「新しいロールを作成」を選択すると、同じく自動的に必要な権限が追加されます。
- テーブル名:
-
「次へ」を押下し、ルールの概要を確認し、「作成」を押下する
-
データがDynamoDBに保存されていることを確認する
DynamoDBのコンソール画面を開き、テーブル「UserCheckins」と「CheckpointCheckins」にチェックイン情報が保存されていることを確認します。
ちなみに、通常の「DynamoDB」アクションでは、パーテーションキーとソートキー以外はPayloadの属性をそのまま保持しますが、「DynamoDBv2」を選択することで、Payloadの属性を変換して画像のように保存することができます。またパーテーションキーとソートキーも自動的に適用されるため、テーブルの紐付けのみで設定が完了します。
今回はAWS IoT Coreから直接DynamoDBにデータを保存する方法を紹介しましたが、AWS Lambdaを使用してデータの変換や加工を行ったり、AWS SNSやAWS SQSを使用して通知やキューイングを行ったり、「Republish to AWS IoT topic」アクションを使用して他のトピックにメッセージを転送することも可能です。
まとめ
今回は、スタンプラリーアプリからAWS IoT Coreにデータを送信し、DynamoDBに保存する方法を紹介しました。AWS IoT Coreを使用することで、モバイルアプリやデバイスからのデータを簡単にクラウドに送信し、リアルタイムで処理や分析を行うことができます。また、ルールエンジンを使用することで、受信したデータに基づいて自動的に処理を行うことができるため、効率的なデータ処理が可能です。