IoTCoreで受けたデータをAmazon Timestreamに連携するハンズオン
概要
IoTCoreで受けたデータをAmazon Timestreamに連携するハンズオン資料です。
Amazon Timestreamを触ってみたかったのと、社内にTimestream仲間を作りたかったので、公式のチュートリアルに沿った社内ハンズオンを開催しました。
参考:https://www.youtube.com/watch?v=00Wersoz2Q4&t=529s
チュートリアルではセンサーデータ取得の部分をPythonのサンプルコードにより模擬していますが、今回のハンズオンではM5Stackを使用してセンサーデータを取得しIoTCoreへMQTTのメッセージを送信するようにしました。
Amazon Timestreamとは
- 時系列データ専用のデータベース
- RDBの最大 1,000 倍の速度と 10 分の 1 のコスト
- 最新データはメモリに保持、履歴データはストレージに保持
- クライアント側はどこに保存されているかを意識しなくて良い
- 分析用の組み込み関数が利用可能
- スケールアップ、スケールダウンが自動で行われるマネージド・サービス
参考:https://aws.amazon.com/jp/timestream/
Amazon Timestreamを使うと嬉しいこと
- 時系列データの保存、管理に関するアーキテクチャに悩まなくて良い
- テーブル設定の画面でポチポチするだけで、データの経過時間によるデータ保存先を設定できる
- クライアント側はどこにデータが保存されているか意識せずにデータ取得できるため実装が楽そう
- データ取得の速度が気になる、メモリ保存時とストレージ保存時の差は?(本記事では検証しない、次回ハンズオンでやりたい)
ハンズオンで使用するデバイス
M5StackとEnvUnitで構成されています。
-
EnvUnitを使って気温、湿度、気圧を定期的に取得しています
-
取得したデータをMQTTに送信します
-
UIFlow
WiFiにつなげて、Envセンサーからデータを取得して、Jsonに詰めて、IoTCoreに送っているだけです。
デザインはサンプルアプリケーションを参考としています。
IoTCoreのMQTTテストクライアント
アーキテクチャ
M5Stack → IoTCore → Timestream
- IoTCoreで受け取ったデータをIoTルールを使用して直接Timestreamに登録します
IoT ルールとは
- IoTCoreのエンドポイントで受け取ったデータとLambda、CloudWatch、DynamoDB等のAWSサービスをつなげるための設定
- 受け取ったデータの連携方法、連携先、エラー時のアクション等を設定する
- メッセージに時間情報が含まれない場合、IoTルール内で設定することも出来る
IoTCoreのエンドポイント
- Topic: dt/sensor/device_id
- Payload
{
"temperature": 23.2,
"pressure": 31.3,
"humidity": 1015.1,
"device_id": "sensor_02",
"building": "Day 1",
"room": "10.01"
}
-
device_id: デバイス固有のID
-
受け取ったデータの連携方法をSQLとして定義する
- SQL例: "SELECT humidity, pressure, temperature FROM 'dt/sensor/#'
- FROM句にはMQTTで受け取るトピックを指定する
-
#
はワイルドカード、全デバイスからデータを受け取る設定 - IoTCoreの組み込み関数も豊富
- Lambdaでデータ加工することも出来る
ハンズオン
Timestreamのデータベース、テーブルを作成する
- Timestreamのコンソールからデータベースを作成を行う
- 標準データベースを選択して作成
- データベース名は
iot
とする
- テーブルを作成
- 作成したデータベースを選択
- テーブル名を入力
- テーブル名は
sensor
とする
- テーブル名は
- データの保持設定
- データの保存先は2種類(MemoryとMagnetic Store)
- Memory:短期保存用、1時間〜1年
- Magnetic Store:長期保存用、1日〜最長7年
- 料金が桁違い、Magnetic Storeの方が安い
- データの保存先は2種類(MemoryとMagnetic Store)
IoTルールを作成する
- メッセージのルーティング > ルールに移動
- 以下の通りルールを作成する
- SQL:
SELECT humidity, pressure, temperature FROM 'dt/sensor/#'
- Action:Timestream Table
- 作成したデータベース名、テーブル名を選択
- ディメンションを定義
- ディメンションとはメッセージ内のメタデータ
- Timestreamのテーブルに時系列データと一緒に保存される
- ディメンション名とディメンション値をそれぞれ以下のペアを設定する
-
device_id
,${device_id}
-
building
,${building}
-
room
,${room}
-
-
${変数名}
とすることでメッセージ内の変数の値を取得できる
- IAMロールの作成
- Timestreamを利用するためのロールを作成する
センサーデータをIoTCoreに送信する
- M5StackからセンサーデータをMQTTに送信開始
Timestreamのクエリエディタを使ってみる
- Timestreamのコンソールからクエリエディタに移動
- 以下のSQLを書いてみる
件数取得
select count(*) from iot.sensordata
全件取得
select * from iot.sensordata
1時間前までのtemperature
のレコードを取得
SELECT * FROM "iot"."sensordata" WHERE time > ago(1h) AND measure_name = 'temperature' ORDER BY time ASC
- 一般的なSQLの他に
ago(1h)
等の組み込みの関数が使えます。
Node.jsのSDKでデータを取得してみる
- Next.jsのバックエンドでSDKを使ってみます
pages/api/measured-data/index.ts
// Next.js API route support: https://nextjs.org/docs/api-routes/introduction
import { QueryCommand, QueryCommandInput, TimestreamQueryClient } from '@aws-sdk/client-timestream-query';
import type { NextApiRequest, NextApiResponse } from 'next'
type Data = {
data: any
}
export default async function handler(
req: NextApiRequest,
res: NextApiResponse<Data>
) {
if(req.method === 'GET') {
const client = new TimestreamQueryClient({ region: "ap-northeast-1" });
const queryCommand:QueryCommandInput = {
QueryString: "select * from iot.sensordata",
}
const apiRes = await client.send(new QueryCommand(queryCommand));
console.log(apiRes);
return res.status(200).json({ data: apiRes})
}
}
-
QueryString
に実行したいSQLを書きます
select * from iot.sensordata
-
.env.local
にawsのprofileを設定します
AWS_PROFILE={プロファイル名}
参考:
- https://docs.aws.amazon.com/ja_jp/timestream/latest/developerguide/code-samples.query-client.html
- https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-timestream-query/index.html
- https://docs.aws.amazon.com/ja_jp/timestream/latest/developerguide/what-is-timestream.html
最後に
以上がハンズオンで使用した資料です。
1時間のハンズオンの中でSDKを使用したデータ取得まで出来ました。簡単、スゴい!!
データ取得のクエリの調査検証、SDKを使ったアプリケーション開発、性能調査等、引き続きTimestreamについて勉強しながら触っていきたいと思います。
以上です。
Discussion