🏄

AWS IoT CoreからTimestreamのTimeにレコード挿入時刻ではなく独自の時刻を使いたいときは

2024/05/14に公開

結論

AWS IoT Core から Timestream へ直接的にレコードを挿入するのではなく、Lambda を介してレコードを挿入するしかありません。現状、AWSマネジメントコンソール上ではディメンションの指定なしでタイムスタンプを指定することができません。

やりたいこと

とあるセンサーから次のようなMQTTトピックのデータを AWS IoT Core に投げ、Timestream に蓄積したいです。

{
  "fetchedAt": 1715650067,
  "temperature": 23.60000038,
  "humidity": 40.01074219,
  "pressure": 974.468125,
  "co2": 459,
  "rssi": -53
}

ここでのポイントは fetchedAt で、これはセンサーが観測を行った時刻(単位は秒)を表しています。Timestream にレコード挿入する際、レコードに挿入した時刻ではなく、この fetchedAt (観測時間)を time (タイムスタンプ)として使いたいというのが今回のやりたいことです。

余談: レコード挿入時間か、観測時間か

time がレコード挿入時間のままでよい場合も存在すると考えられます。

通信状況が安定したセンサーデバイスから送られたデータは「観測時間=レコード挿入時間」と考えても差し支えない場合があります。たとえばセンサーデータの取得時に瞬時に観測が行われて、即座にMQTTトピックを介して Timestream にレコードが挿入される状況が保証できるのであれば、time はレコード挿入時間であるとみなしてもよい、と考えます。

一方、即座にセンサーデータが送られない場合はレコード挿入時間であると不都合が生じるおそれがあります。たとえばセンサーデバイスが一時的にインターネットとの接続を失って、後刻、接続が回復してから送信停止中に観測したセンサーデータを送るとしましょう。ここで time がレコード挿入時間である場合、インターネット接続が回復した瞬間、同一センサーデバイスの同一観測名のデータが、同一の時刻に複数の値で Timestream に投入されることになります。要するに時系列が壊れた状態でデータが投入されてしまいます。

今回の私の場合、センサーデバイスの通信状況が安定している保証がなく、観測とMQTTトピックの発行に30秒以上かかる状況が存在します。よって、Timestream に投入されるデータにおいても time を観測時間として正しく扱う必要があると判断しました。

失敗した方法

以下は解決を試行して結果的に失敗したものです。どちらも AWS IoT Core から Timestream に直接レコードを挿入するように連携したものです。

タイムスタンプを指定する

AWS IoT Core > メッセージのルーティング > ルール > ルールを作成 で次のようなSQLステートメントとルートアクションを設定します。ここで、MQTTトピックは observation/home/esp32-central/sensors、Timestream のディメンションとして observatorfetchedAt を指定します。ルートアクションにて、オプション扱いになっているタイムスタンプは fetchedAt を指定します。

SELECT
  'esp32-central' AS observator
  fetchedAt,
  temperature,
  humidity,
  pressure,
  co2,
  rssi
FROM 'observation/home/esp32-central/sensors'

すると以下のような Timestream レコードが追加されます。

ディメンションに observatorfetchedAt を指定したのでカラムとして存在するのは理解できます。しかしタイムスタンプとして指定しているのに fetchedAt のレコードまで挿入されています。それでいて全ての Time カラムには正しく情報が入っているだけに、ディメンション(カラム)とレコードで冗長に時刻情報が入っているのはいただけません。

fetchedAt を time にリネーム

タイムスタンプが time というカラムになるのであれば、ルールのSQLステートメントで fetchedAttime にリネームすればいいのではないかと考えました。すなわち、以下のような指定方法です。今回はタイムスタンプは指定しません。

SELECT
  'esp32-central' AS observator
  fetchedAt AS time,  -- リネーム
  temperature,
  humidity,
  pressure,
  co2,
  rssi
FROM 'observation/home/esp32-central/sensors'

その結果、以下のようなレコードが Timestream に追加されます。

今回は fetchedAt のディメンションとレコードがなくなり、time が正しく設定されているように見えます。しかし実はこの timefetchedAt ではなく レコードの挿入時間 になっています。注意して time カラムを観察すると、もともとの fetchedAt には持っていなかったはずのミリ秒が追加されています。 これは以下の2つの問題が同時に起こっていると考えられます。

  • time という名前のカラムは型 TIMESTAMP としてテーブル作成時に自動的に作られますが、SQLステートメント上やAWSコンソール上では型を指定する方法がありません。既存のカラムに対する誤った型の挿入はエラーになるため、入力データは無視されます。
  • タイムスタンプの指定が空の場合、レコードの挿入時間が自動で使われます

以上の問題の結果、timefetchedAt ではなくレコードの挿入時間になります。

Lambda を介した方法

IoT Core から一度 Lambda にMQTTデータを投入し、Lambda で Timestream の API を用いてレコードを挿入する方法を試しました。

IoT Core のルールで指定するSQLステートメントは、前述したタイムスタンプを指定した方法と同じものを使います。今回はリネームしません。

SELECT
  'esp32-central' AS observator
  fetchedAt,
  temperature,
  humidity,
  pressure,
  co2,
  rssi
FROM 'observation/home/esp32-central/sensors'

次に IoT Core から呼び出される Lambda のコードを記述します。

import { TimestreamWriteClient, WriteRecordsCommand } from "@aws-sdk/client-timestream-write";

export const handler = async (event) => {
  const client = new TimestreamWriteClient();
  const dimensions = [
    { Name: 'observator', Value: event.observator },
  ];
  const recordBase = {
    Dimensions: dimensions,
    Time: event.fetchedAt.toString(),
    TimeUnit: 'SECONDS',
  };
  const params = {
    DatabaseName: 'observation',
    TableName: 'sensor',
    Records: [
      {
        MeasureName: 'temperature',
        MeasureValue: event.temperature.toString(),
        MeasureValueType: 'DOUBLE',
        ...recordBase,
      },
      {
        MeasureName: 'humidity',
        MeasureValue: event.humidity.toString(),
        MeasureValueType: 'DOUBLE',
        ...recordBase,
      },
      {
        MeasureName: 'pressure',
        MeasureValue: event.pressure.toString(),
        MeasureValueType: 'DOUBLE',
        ...recordBase,
      },
      {
        MeasureName: 'co2',
        MeasureValue: event.co2.toString(),
        MeasureValueType: 'BIGINT',
        ...recordBase,
      },
      {
        MeasureName: 'rssi',
        MeasureValue: event.rssi.toString(),
        MeasureValueType: 'BIGINT',
        ...recordBase,
      },
    ]
  };
  
  const command = new WriteRecordsCommand(params);
  const response = await client.send(command);
  return response;
};

Lambda のハンドラに渡される引数 event には次のような、SQLステートメントの実行結果が渡されます。

{
  observator: 'esp32-central',
  fetchedAt: 1715647607,
  temperature: 23.57999992,
  humidity: 43.79882813,
  pressure: 974.9288281,
  co2: 829,
  rssi: -47
}

Lambda が Timestream へのレコード挿入を行うため、Lambda には以下のポリシを忘れずに付与します(RegionとID部分は書き換えてください)。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "timestream:WriteRecords"
            ],
            "Resource": "arn:aws:timestream:<Region>:<AWS Account ID>:database/observation/table/sensor"
        },
        {
            "Effect": "Allow",
            "Action": [
                "timestream:DescribeEndpoints"
            ],
            "Resource": "*"
        }
    ]
}

Lambda のコード上のポイントは MeasureValue および Time は必ず文字列であることです。これを守らないと SerializationException: NUMBER_VALUE can not be converted to a String とエラーが出ます。

また、MeasureValueType が既にテーブル上に存在する型と不一致であると Measure name already has an assigned measure value type. Each measure name can have only one measure value type and cannot be changed. とエラーが出ます。一度作成されたカラムのデータ型を変更することはできないので、変更したい場合はテーブルの再作成が必要になります。

実行結果

MQTTトピックが発行され、センサデータが IoT Core → Lambda → Timestream と連携し、以下のようにレコードが挿入されます。

fetchedAt 由来の time が正しく設定されていることがわかります。

time は Timestream の中核になるものですし、観測時間を time として扱いたいというケースは多分にあると思われますが、Lambda を介さず、なおかつ冗長なデータを発生させない方法がとれるよう柔軟に指定できるようになってほしいです。

Lambda を介したほうがうまくいくケースは time 以外にも存在するので、本格的な運用を考える際は Lambda 前提で考えたほうが楽になるかもしれません。

参考文献

Discussion