👋

IoTCoreで受けたデータをAmazon Timestreamに連携するハンズオン

2022/09/11に公開

概要

IoTCoreで受けたデータをAmazon Timestreamに連携するハンズオン資料です。
Amazon Timestreamを触ってみたかったのと、社内にTimestream仲間を作りたかったので、公式のチュートリアルに沿った社内ハンズオンを開催しました。
参考:https://www.youtube.com/watch?v=00Wersoz2Q4&t=529s

チュートリアルではセンサーデータ取得の部分をPythonのサンプルコードにより模擬していますが、今回のハンズオンではM5Stackを使用してセンサーデータを取得しIoTCoreへMQTTのメッセージを送信するようにしました。

Amazon Timestreamとは

  • 時系列データ専用のデータベース
  • RDBの最大 1,000 倍の速度と 10 分の 1 のコスト
  • 最新データはメモリに保持、履歴データはストレージに保持
    • クライアント側はどこに保存されているかを意識しなくて良い
  • 分析用の組み込み関数が利用可能
  • スケールアップ、スケールダウンが自動で行われるマネージド・サービス

参考:https://aws.amazon.com/jp/timestream/

Amazon Timestreamを使うと嬉しいこと

  • 時系列データの保存、管理に関するアーキテクチャに悩まなくて良い
    • テーブル設定の画面でポチポチするだけで、データの経過時間によるデータ保存先を設定できる
  • クライアント側はどこにデータが保存されているか意識せずにデータ取得できるため実装が楽そう
    • データ取得の速度が気になる、メモリ保存時とストレージ保存時の差は?(本記事では検証しない、次回ハンズオンでやりたい)

ハンズオンで使用するデバイス

M5StackとEnvUnitで構成されています。

  • EnvUnitを使って気温、湿度、気圧を定期的に取得しています

  • 取得したデータをMQTTに送信します

  • UIFlow

WiFiにつなげて、Envセンサーからデータを取得して、Jsonに詰めて、IoTCoreに送っているだけです。
デザインはサンプルアプリケーションを参考としています。

IoTCoreのMQTTテストクライアント

アーキテクチャ

M5Stack → IoTCore → Timestream

  • IoTCoreで受け取ったデータをIoTルールを使用して直接Timestreamに登録します

IoT ルールとは

  • IoTCoreのエンドポイントで受け取ったデータとLambda、CloudWatch、DynamoDB等のAWSサービスをつなげるための設定
  • 受け取ったデータの連携方法、連携先、エラー時のアクション等を設定する
  • メッセージに時間情報が含まれない場合、IoTルール内で設定することも出来る

IoTCoreのエンドポイント

  • Topic: dt/sensor/device_id
  • Payload
{
  "temperature": 23.2,
  "pressure": 31.3,
  "humidity": 1015.1,
  "device_id": "sensor_02",
  "building": "Day 1",
  "room": "10.01"
}
  • device_id: デバイス固有のID

  • 受け取ったデータの連携方法をSQLとして定義する

    • SQL例: "SELECT humidity, pressure, temperature FROM 'dt/sensor/#'
    • FROM句にはMQTTで受け取るトピックを指定する
    • # はワイルドカード、全デバイスからデータを受け取る設定
    • IoTCoreの組み込み関数も豊富
      • Lambdaでデータ加工することも出来る

ハンズオン

Timestreamのデータベース、テーブルを作成する

  1. Timestreamのコンソールからデータベースを作成を行う
  • 標準データベースを選択して作成
  • データベース名はiotとする
  1. テーブルを作成
  • 作成したデータベースを選択
  • テーブル名を入力
    • テーブル名はsensorとする
  • データの保持設定
    • データの保存先は2種類(MemoryとMagnetic Store)
      • Memory:短期保存用、1時間〜1年
      • Magnetic Store:長期保存用、1日〜最長7年
      • 料金が桁違い、Magnetic Storeの方が安い

IoTルールを作成する

  1. メッセージのルーティング > ルールに移動
  2. 以下の通りルールを作成する
  • SQL:SELECT humidity, pressure, temperature FROM 'dt/sensor/#'
  • Action:Timestream Table
    • 作成したデータベース名、テーブル名を選択
    • ディメンションを定義
      • ディメンションとはメッセージ内のメタデータ
      • Timestreamのテーブルに時系列データと一緒に保存される
      • ディメンション名とディメンション値をそれぞれ以下のペアを設定する
        • device_id, ${device_id}
        • building, ${building}
        • room, ${room}
      • ${変数名}とすることでメッセージ内の変数の値を取得できる
  • IAMロールの作成
    • Timestreamを利用するためのロールを作成する

センサーデータをIoTCoreに送信する

  1. M5StackからセンサーデータをMQTTに送信開始

Timestreamのクエリエディタを使ってみる

  1. Timestreamのコンソールからクエリエディタに移動
  2. 以下のSQLを書いてみる

件数取得

select count(*) from iot.sensordata

全件取得

select * from iot.sensordata

1時間前までのtemperatureのレコードを取得

SELECT * FROM "iot"."sensordata" WHERE time > ago(1h) AND measure_name = 'temperature' ORDER BY time ASC

Node.jsのSDKでデータを取得してみる

  • Next.jsのバックエンドでSDKを使ってみます

pages/api/measured-data/index.ts

// Next.js API route support: https://nextjs.org/docs/api-routes/introduction
import { QueryCommand, QueryCommandInput, TimestreamQueryClient } from '@aws-sdk/client-timestream-query';
import type { NextApiRequest, NextApiResponse } from 'next'

type Data = {
  data: any
}

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse<Data>
) {
  if(req.method === 'GET') {
    const client = new TimestreamQueryClient({ region: "ap-northeast-1" });
    const queryCommand:QueryCommandInput = {
      QueryString: "select * from iot.sensordata",
    }
    const apiRes = await client.send(new QueryCommand(queryCommand));
    console.log(apiRes);
    return res.status(200).json({ data: apiRes})
  }
}
  • QueryString に実行したいSQLを書きます
select * from iot.sensordata
  • .env.localにawsのprofileを設定します
AWS_PROFILE={プロファイル名}

参考:

最後に

以上がハンズオンで使用した資料です。

1時間のハンズオンの中でSDKを使用したデータ取得まで出来ました。簡単、スゴい!!
データ取得のクエリの調査検証、SDKを使ったアプリケーション開発、性能調査等、引き続きTimestreamについて勉強しながら触っていきたいと思います。

以上です。

GitHubで編集を提案

Discussion