Chapter 06

応用: AWS IoT Core連携

MIERUNE Inc.
MIERUNE Inc.
2024.12.06に更新

はじめに

前回まででスタンプラリーアプリの構築については完了しました。今回は、スタンプラリーアプリにおいて複数のユーザーが同時にスタンプラリーを行う際に、主催者がスタンプラリーの進行状況をリアルタイムで確認できるようにするために、AWS IoT Coreを使用してスタンプラリーアプリとクラウドを連携させる方法について解説します。

AWS IoT Coreとは

AWS IoT Coreは、インターネットに接続されたデバイスとクラウドアプリケーションとの間でセキュアな通信を可能にするフルマネージド型のクラウドサービスです。AWS IoT Coreを使用することで、デバイスとクラウドアプリケーション間でのデータの送受信を簡単に行うことができます。

例えば、AWS IoT Coreは以下のようなモジュール・デバイスやプロトコルをサポートしています。

  • 対応デバイスおよびモジュール:

    1. 組み込みデバイス
      • マイクロコントローラーやマイクロプロセッサを搭載し、特定のタスクを実行するために設計されたデバイス。
      • 例) センサー、アクチュエーター、スマート家電など。
    2. ゲートウェイデバイス
      • 複数のIoTデバイスを集約し、クラウドと通信する役割を持つデバイス。
      • 例) プロトコル変換、データフィルタリング、またはセキュリティ強化の役割を果たす場合もある。
    3. モバイルデバイス
      • スマートフォンやタブレットなどのモバイル端末。
      • 例) AWS IoT CoreのモバイルSDKなどを利用することで、データ収集、デバイス制御、またはクラウドとのシームレスな連携が可能。
    4. 産業用デバイス
      • 工場の機械やセンサー、ロボットなどの産業機器。
      • 例) リアルタイムのデータ収集、分析、および予測メンテナンスを通じて、運用効率や稼働時間の向上を支援。
    5. 車載デバイス
      • 自動車内のセンサーやエンターテインメントシステム。
      • 例) 車両データの収集、リモート管理、およびクラウドを利用した高度な分析が可能。
  • 対応通信プロトコル

    • MQTT(メッセージキューイング通信モデル)
      • 軽量なパブリッシュ/サブスクライブ型のメッセージングプロトコル。
      • 低帯域幅や高遅延のネットワーク環境に適している。
    • HTTP/HTTPS
      • 標準的なウェブ通信プロトコル。
      • 既存のウェブ技術との統合が容易。
    • WebSocket
      • 双方向通信を可能にするプロトコル。
      • リアルタイムアプリケーションに適している。
    • LoRaWAN
      • 低電力・長距離通信を実現するプロトコル。
      • AWS IoT Core for LoRaWANを通じてサポート。
  • メッセージ受信後の処理

    • データストレージ:
      • Amazon S3やAmazon DynamoDBにデータを保存。
    • リアルタイム処理:
      • AWS Lambdaを使用して、受信データに基づくリアルタイムな処理や分析を実行。
    • 通知とアラート:
      • Amazon SNSを利用して、特定の条件に基づく通知やアラートを送信。
    • デバイスシャドウの更新:
      • デバイスの現在の状態をクラウド上に保持し、デバイスとアプリケーション間での状態同期を実現。
    • ルールエンジンの活用:
      • AWS IoT Coreのルールエンジンを使用して、受信データに基づく自動処理や他のAWSサービスとの連携を設定。

モジュールやデバイスから送信されたデータ(例: 温度や位置情報など)は、メッセージブローカーを介してAWS IoT Coreに送信されます。AWS IoT Coreは、受信したメッセージをサブスクライバー(例えば、他のデバイスやクラウドサービス)にリアルタイムで配信することが可能です。

また、AWS IoT Coreには「ルールエンジン」という機能があり、受信したメッセージに特定の条件を設定して自動的に処理を行うことができます。例えば、温度がしきい値を超えた場合に通知を送信したり、GPSデータをAmazon DynamoDBに保存したりするアクションを実行できます。この仕組みにより、AWS IoT Coreは効率的で柔軟なメッセージ処理を実現します。

AWS IoT Coreは、セキュリティを最優先に設計されています。デバイスとクラウド間の通信は、業界標準のTLS(Transport Layer Security)プロトコルを使用して暗号化され、不正アクセスや盗聴のリスクを低減します。
さらに、AWS IoT Coreは認証と認可の仕組みを提供しています。デバイスやアプリケーションがAWSリソースにアクセスする際には、IAM(Identity and Access Management)ロールを使用してアクセス権限を細かく制御できます。また、デバイスごとに発行されるX.509証明書を使用した認証により、不正なデバイスがネットワークに接続するリスクを排除します。
例えば、工場内のIoTデバイスがAWS IoT Coreを介してデータを送信する場合、TLSで暗号化された通信を行うとともに、IAMロールを適用して特定のS3バケットにのみアクセスを許可することが可能です。この仕組みにより、セキュリティが強化されるだけでなく、柔軟なアクセス管理が実現します。

今回はMQTTプロトコルを使用し、モバイルデバイス(スタンプラリーアプリ)からAWS IoT Coreにデータを送信し、DynamoDBにデータを保存する仕組みを構築します。

モノの作成

まずは、AWS IoT Core上に「モノ(Thing)」を作成します。モノは、AWS IoT Coreに接続されるモジュール・デバイスやアプリケーションを表します。モノを作成することで、デバイスやアプリケーションがAWS IoT Coreに接続し、メッセージの送受信を行う準備が整います。ここではAWS Management Consoleからモノを作成します。

  1. AWS IoT Coreのコンソール画面を開く

  2. サイドバーの管理 > すべてのデバイス > モノを選択すると、モノの一覧が表示される
  3. モノの一覧画面で「モノを作成」をクリックする
  4. 作成するモノの数は「1 つのモノを作成」のまま「次へ」を押す
  5. 任意のモノの名前を入力、他の項目はそのまま「次へ」を押す
  6. デバイス証明書は「新しい証明書を自動生成」を選択し、「次へ」を押す
  7. 「ポリシーを作成」を押す
  8. 別タブ「ポリシーを作成」で以下の通り入力し、「作成」を押す
    • ポリシー名: flutter_app_policy
    • ポリシー効果: 許可
    • ポリシーアクション: *
    • ポリシーリソース: *

      あとで、ポリシーを制限します。
  9. 元のタブに戻り、作成したポリシーを選択し、「モノを作成」を押す
  10. 「証明書とキーをダウンロード」タブが表示されるので、「デバイス証明書」「パブリック/プライベートキーファイル」「ECC 256 ビットキー: Amazon ルート CA 1」をダウンロードし、「完了」を押す
  11. モノが作成できていることを確認する

これで、AWS IoT Core上にモノが作成されました。次に、スタンプラリーアプリからAWS IoT Coreにデータを送信するための設定を行います。

MQTTクライアントの作成

モバイルアプリからAWS IoT Coreにデータを送信するためには、先ほどダウンロードした3つのファイルを使用して、接続するためのMqttServerClientを作成します。使用するファイルは以下の通りです。

  • デバイス証明書: モノ(デバイス)を識別するための一意な証明書。デバイスが正当なものであることを証明する。
  • パブリック/プライベートキーファイル: 暗号化通信と認証のために使用されるキーペア。
  • ECC 256 ビットキー: Amazon ルート CA 1: AWS IoT Coreに接続するためのルート証明書。AWS IoT Coreが信頼できるサービスであることを証明する。

これらのファイルを使用して、スタンプラリーアプリからAWS IoT Coreにデータを送信するための証明書を設定します。

  1. ダウンロードした3つの証明書とキーファイルをプロジェクトのassets/certディレクトリに配置する。
  2. pubspec.yamlファイルに以下の設定を追加する。
    flutter:
        assets:
            - ... // 他のassetsの設定
            - assets/certs/AmazonRootCA1.pem
            - assets/certs/certificate.pem.crt
            - assets/certs/private.pem.key
    
  3. libディレクトリにaws_iot.dartファイルを作成し、以下のコードを記述する。
    import 'dart:io';
    import 'package:flutter/services.dart';
    import 'package:mqtt_client/mqtt_server_client.dart';
    
    // AWS IoT Coreに接続するためのMqttServerClientを作成する
    Future<MqttServerClient> createMqttClient(String clientId) async {
    
      // MqttServerClientを作成
      // AWS IoT CoreのエンドポイントとクライアントIDを指定
      final client = MqttServerClient(
          '**********.iot.ap-northeast-1.amazonaws.com',
          clientId,
      );
    
      client.port = 8883;
      client.secure = true; // TLSを使用 
      client.setProtocolV311(); // MQTTプロトコルバージョン3.1.1を使用
      client.keepAlivePeriod = 60; // 接続維持のためのタイムアウト時間
      client.autoReconnect = true; // 自動再接続を有効化
      client.connectTimeoutPeriod = 300; // 接続タイムアウト時間
    
      final context = SecurityContext.defaultContext;
      try {
          // 証明書とキーをロード
          final rootCert = await rootBundle.load('assets/certs/AmazonRootCA1.pem');
          final deviceCert =
              await rootBundle.load('assets/certs/certificate.pem.crt');
          final privateKey = await rootBundle.load('assets/certs/private.pem.key');
    
          // セキュリティコンテキストに証明書とキーを設定
          context
          ..setTrustedCertificatesBytes(rootCert.buffer.asUint8List())
          ..useCertificateChainBytes(deviceCert.buffer.asUint8List())
          ..usePrivateKeyBytes(privateKey.buffer.asUint8List());
    
          client.securityContext = context;
          print('Certificates loaded successfully.');
      } catch (e) {
          print('Error loading certificates: $e');
          throw Exception('Failed to load certificates');
      }
    
      // ログ出力を有効化
      client.logging(on: true);
    
      client.onConnected = () => print('Connected to AWS IoT Core'); // 接続成功時のコールバック
      client.onDisconnected = () => print('Disconnected from AWS IoT Core'); // 切断時のコールバック
      client.pongCallback = () => print('Ping response received'); // ピンポン応答時のコールバック
    
      return client;
    }
    

AWS IoT Coreのエンドポイントは、AWS IoTのコンソール画面のサイドバー > ドメイン設定から確認することができます。
デフォルトでドメイン「iot:Data-ATS」が存在するので、このドメイン名をエンドポイントに指定します。

これで、AWS IoT Coreに接続するためのMqttServerClientを作成する関数が定義されました。次に、スタンプラリーアプリからAWS IoT Coreにデータを送信するための処理を実装します。

メッセージの送信

今回作成するスタンプラリーアプリでは、チェックインの状況をAWS IoT Coreに送信(Publish)します。送信するメッセージは、以下のような形式とします。

{
"userId": "ユーザーID",
"checkpointId": "チェックポイントID",
"checkinType": "チェックインタイプ(0:GPS, 1:BLE)",
"timestamp": "チェックイン日時",
"checkInNumber": "チェックイン回数"
}

今回のアプリにユーザ作成/認証認可といった認証機能は持たせていないので、ユーザーIDにはUUID v4で生成します。UUID v4は高い一意性を持つため、ユーザーを一意に識別するのに適しています。一度生成したユーザーIDはshared_preferencesパッケージを使用することで、デバイスのローカルストレージに保存し、アプリケーションの再起動時にも再利用できます。

以下は、最初にUUID v4でユーザーIDを生成しローカルストレージに保存、2回目以降はローカルストレージからユーザーIDを取得する処理です。

import 'package:uuid/uuid.dart';
import 'package:shared_preferences/shared_preferences.dart';

// userIdを取得する関数
Future<String> getUserId() async {
  final prefs = await SharedPreferences.getInstance();
  String? userId = prefs.getString('userId');

  if (userId == null) {
    userId = const Uuid().v4(); // UUID を生成
    await prefs.setString('userId', userId); // 永続的に保存
  }

  return userId;
}

その他にもAWS Amplifyを使用してCognitoを利用ことや、Firebase Authenticationを利用することで、ユーザー認証機能を追加することができます。

続いて、チェックインの状況をAWS IoT Coreに送信する処理を実装します。

import 'package:mqtt_client/mqtt_client.dart';

// チェックイン情報をAWS IoT Coreに送信
Future<void> publishCheckin({
  required int checkpointId,
  required int checkInType,
  required String timestamp,
  required int checkInNumber,
}) async {
  final userId = await getUserId();

  final clientId = 'flutter-client-$userId';
  final client = await createMqttClient(clientId);

  try {
    await client.connect();
  } catch (e) {
    print('Connection failed: $e');
    client.disconnect();
    return;
  }

  if (client.connectionStatus?.state == MqttConnectionState.connected) {
    const topic = 'devices/checkIns';
    final timestamp = DateTime.now().toIso8601String();

    // メッセージ
    final payload = """
      {
        "userId": "$userId",
        "checkpointId": "$checkpointId",
        "checkinType": $checkInType,
        "timestamp": "$timestamp",
        "checkInNumber": $checkInNumber
      }
    """;

    client.publishMessage(
      topic,
      MqttQos.atLeastOnce,
      MqttClientPayloadBuilder().addString(payload).payload!,
    );

    // 確認のため少し待機
    await Future.delayed(Duration(seconds: 5));
    client.disconnect();
  } else {
    print('Failed to connect, status: ${client.connectionStatus}');
    client.disconnect();
  }
}

この関数では次の処理が順番に行われます。

  1. クライアントIDの生成
    ユーザーIDを元にクライアントIDを生成しています。
    クライアントIDは、例えばflutter-client-ユーザーIDのようにユーザーごとに異なるIDを指定することで、MQTTの要件であるクライアントIDの一意性を保証しています。
  2. AWS IoT Coreに接続
    createMqttClient関数を使用して、MQTTクライアントを作成し、AWS IoT Coreへの接続を試みます。
    接続が成功するとcreateMqttClient関数で設定したコールバック関数が呼び出され、ログにConnected to AWS IoT Coreが表示されます。
  3. トピックへのメッセージの送信
    devices/checkInsというトピックを指定し、チェックイン情報をJSON形式のペイロードとしてメッセージを送信します。
    トピックとは、メッセージの送信先を指定するためのアドレスのようなもので、AWS IoT Coreではトピックを使用してメッセージの送受信を行います。トピックはスラッシュ(/)で階層構造を持つことができ、例えばdevices/checkInsというトピックはdevicesという親トピックの下にcheckInsという子トピックがある構造を持ちます。
  4. 接続の切断
    メッセージの送信後、一定時間待機してから接続を切断します。

あとは、チェックイン処理が行われるタイミングでpublishCheckin関数を呼び出すことで、AWS IoT Coreにチェックイン情報を送信することができます。

// チェックイン済みの数を取得
final checkedCount =
    await SpotFeatureHelper.getSpots(widget.database).then((spots) {
    return spots.where((spot) => spot.isChecked == 1).length;
});

// チェックイン処理
checkIn() async {
    // チェックイン情報をDBに保存
    SpotFeatureHelper.updateSpotChecked(widget.database, checkInSpotInfo.id);

    // チェックインバッヂを表示
    _mapController.addCheckedMarker(checkInSpotInfo.coordinates);

    // チェックイン情報をAWS IoT Coreに送信
    await publishCheckin(
        checkpointId: checkInSpotInfo.id,
        checkInType: 0, // GPSチェックイン
        timestamp: DateTime.now().toIso8601String(),
        checkInNumber: checkedCount + 1);
}

AWS IoT Coreでの受信検証

正常にメッセージが送信されたことを確認するには、AWS IoT Coreの「MQTT テストクライアント」を使用します。

  1. AWS IoTのサイドバー > テスト > MQTTテストクライアントを選択する

  2. 「接続の詳細」のエンドポイントが``createMqttClient`関数で指定したエンドポイントと一致していることを確認する

    MQTTバージョンは、デバイスが使用するのはMQTT 3.1.1ですが、MQTT 5は下位互換性があるため、どちらでも問題ありません。

  3. タブ「トピックをサブスクライブする」にて、トピックフィルターにdevices/checkInsを入力して「サブスクライブ」を押下する

  4. デバイスからチェックイン情報を送信する

    今回はチェックインボタンを押下すると、AWS IoT Coreにチェックイン情報が送信されるようになっています。

  5. メッセージが正常に送信されたことを確認する

これで、デバイス(スタンプラリーアプリ)からチェックイン情報をMQTTプロトコルを使用して送信し、AWS IoT Coreのメッセージブローカーが受信されることが確認できました。

なお、現在モノにアタッチしている証明書のポリシーは全てのリソースに対して全てのアクションを許可するので、以下のように設定することをお勧めします。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:<region>:<account-id>:client/flutter-client-*",
      "Condition": {
        "Bool": {
          "iot:Connection.Thing.IsAttached": "true"
        }
      }
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:aws:iot:<region>:<account-id>:topic/devices/checkIns"
    }
  ]
}

設定内容か以下の通りです。

  • iot:Connect: モノがAWS IoT Coreに接続する際のアクションを許可
    ここでは、flutter-client-*というクライアントIDを持つモノで、かつモノにアタッチされていることを条件としている。
  • iot:Publish: モノが指定したトピックにメッセージを送信する際のアクションを許可
    ここでは、devices/checkInsというトピックにメッセージを送信するアクションを許可している。

DynamoDBへのデータ保存

AWS IoT Coreにはルールエンジンという機能があり、受信したメッセージに基づいて自動的に処理を行うことができます。この機能を使用して、AWS IoT Coreから受信したチェックイン情報をDynamoDBに保存する処理を実装します。

チェックイン情報は2つのテーブルに保存します。

  1. UserCheckinsテーブル
    • ユーザーごとのチェックイン情報を保存するテーブル~
    • 主キー: userId(パーティションキー), timestamp(ソートキー)
    • 属性: userId, timestamp, checkpointId, checkinType, checkInNumber
  2. CheckpointCheckinsテーブル
    • チェックポイントごとのチェックイン情報を保存するテーブル
    • 主キー: checkpointId(パーティションキー), timestamp(ソートキー)
    • 属性: checkpointId, timestamp, userId, checkinType, checkInNumber

まずは、DynamoDBにチェックイン情報を保存するためのテーブルを作成します。

  1. DynamoDBのコンソール画面を開き、サイドバー > テーブルを選択、テーブル一覧画面で「テーブルの作成」を押下する
  2. テーブル名を「UserCheckins」と入力し、パーティションキーに「userId」、ソートキーに「timestamp」を押下する

または、AWS CLIを使用して以下のコマンドを実行します。

aws dynamodb create-table \
    --table-name UserCheckins \
    --attribute-definitions \
        AttributeName=userId,AttributeType=S \
        AttributeName=timestamp,AttributeType=S \
    --key-schema \
        AttributeName=userId,KeyType=HASH \
        AttributeName=timestamp,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST \
    --region ap-northeast-1
  1. 同様に、テーブル名を「CheckpointCheckins」と入力し、パーティションキーに「checkpointId」、ソートキーに「timestamp」を押下する

または、AWS CLIを使用して以下のコマンドを実行します。

aws dynamodb create-table \
    --table-name CheckpointCheckins \
    --attribute-definitions \
        AttributeName=checkpointId,AttributeType=S \
        AttributeName=timestamp,AttributeType=S \
    --key-schema \
        AttributeName=checkpointId,KeyType=HASH \
        AttributeName=timestamp,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST \
    --region ap-northeast-1
  1. テーブルが作成されたことを確認する
    テーブル一覧に「UserCheckins」と「CheckpointCheckins」が表示されていれば成功です。

続いて、AWS IoT Coreのルールエンジンを使用して、受信したチェックイン情報をDynamoDBに保存する処理を実装します。

  1. AWS IoT Coreのコンソール画面を開き、サイドバー > メッセージのルーティング > ルールを選択する
  2. 「ルールの作成」を押下し、適当なルール名を入力する
  3. SQLステートメントには以下のように入力する
SELECT * FROM 'devices/checkIns'

メッセージブローカーが受信したメッセージのトピックがdevices/checkInsの場合にこのルールが適用されるように設定しています。
4. アクション1には「DynamoDB」を選択し、以下の設定を入力する

  • テーブル名: UserCheckins
  • パーティションキー: userId
  • パーティションキーの値: ${userId}
  • ソートキー: timestamp
  • ソートキーの値: ${timestamp}

IAMロールを割り当てる必要があるので、新しいロールを作成するか、既存のロールを選択します。
「新しいロールを作成」を選択すると、自動的に必要な権限が追加されます。

  1. アクション2には「DynamoDB」を選択し、以下の設定を入力する

    • テーブル名: CheckpointCheckins
    • パーティションキー: checkpointId
    • パーティションキーの値: ${checkpointId}
    • ソートキー: timestamp
    • ソートキーの値: ${timestamp}

    IAMロールを割り当てる必要があるので、新しいロールを作成するか、既存のロールを選択します。
    「新しいロールを作成」を選択すると、同じく自動的に必要な権限が追加されます。

  2. 「次へ」を押下し、ルールの概要を確認し、「作成」を押下する

  3. データがDynamoDBに保存されていることを確認する
    DynamoDBのコンソール画面を開き、テーブル「UserCheckins」と「CheckpointCheckins」にチェックイン情報が保存されていることを確認します。

ちなみに、通常の「DynamoDB」アクションでは、パーテーションキーとソートキー以外はPayloadの属性をそのまま保持しますが、「DynamoDBv2」を選択することで、Payloadの属性を変換して画像のように保存することができます。またパーテーションキーとソートキーも自動的に適用されるため、テーブルの紐付けのみで設定が完了します。

今回はAWS IoT Coreから直接DynamoDBにデータを保存する方法を紹介しましたが、AWS Lambdaを使用してデータの変換や加工を行ったり、AWS SNSやAWS SQSを使用して通知やキューイングを行ったり、「Republish to AWS IoT topic」アクションを使用して他のトピックにメッセージを転送することも可能です。

まとめ

今回は、スタンプラリーアプリからAWS IoT Coreにデータを送信し、DynamoDBに保存する方法を紹介しました。AWS IoT Coreを使用することで、モバイルアプリやデバイスからのデータを簡単にクラウドに送信し、リアルタイムで処理や分析を行うことができます。また、ルールエンジンを使用することで、受信したデータに基づいて自動的に処理を行うことができるため、効率的なデータ処理が可能です。