⏲️

TimestreamのデータをLambdaを使ってS3へエクスポートする

2023/08/12に公開

こんにちは、アキです!
今回は、AWS TimestreamからS3へデータをエクスポートする処理を、Lambdaを使って実装したいと思います。

やりたいこと

前回までデータレイクのあれこれを学習し、アウトプットをしていました。ある程度理解できるようになってきたので、実際にデータを収集するところからデータレイクでデータを管理し、分析するところまでを構築してみようと思いました。
丁度使っていないRaspberry Piを発見したので、IoTデータの計測データをもとにデータレイクの構築を行おうと思います。
今回はデータを受け取り、一度Timestreamに保存されたデータを定期的にデータレイク用のS3に格納するために、Event Bridge+Lambdaを使おうと思います。

Timestreamを使う理由

サーバー構成図からもわかる通り、データレイクはS3ですが、その前段にTimestreamがあります。いったんTimestreamでデータを受ける理由は、

  1. IoT Coreから直接データを受け取り、保存することができる
  2. 時系列でデータを保持できる
  3. カラムの変更があっても柔軟に対応できる
  4. 計測されたデータをすぐに分析することができる

ためです。
リアルタイムデータを分析することができるTimestreamとIoTとの相性は非常に良いと感じます。

LambdaでTimestreamからS3へエクスポート

とはいえ、Timestreamは料金がまあまあ高く、リアルタイムな分析を必要としない場合には他のDBなどにデータを移行したほうが良いのではないか考えました。
TimestreamはS3へデータエクスポートをサポートしているので、今回は毎日0時に前日のデータをS3へ移行することを想定した処理をLambda+Event Bridgeで実行しようと思います。

Timestreamテーブル

  • DB名→IoT-Database
  • テーブル名→sensor-data
    スキーマは以下です。

    ※measure_value::varchar (varchar)は誤って作成してしまったカラムなので利用しません。

Lambda関数

Lambdaで実行するコードです。
ランタイムはPython3.11です。

import boto3
import os
from datetime import datetime, timedelta

def lambda_handler(event, context):
    client = boto3.client('timestream-query')
    
    start_date     = datetime.now() - timedelta(days=1)
    end_date = start_date + timedelta(days=1)
    start_date_str     = start_date.strftime('%Y-%m-%d')
    end_date_str = end_date.strftime('%Y-%m-%d')
    
    query = f"""
    UNLOAD (SELECT timestamp, measure_name, time, measure_value::double, sensor_id 
    FROM "IoT-Database"."sensor-data" WHERE time >= '{end_date_str} 15:00:00'
    AND time < '{start_date_str} 15:00:00')
    TO 's3://iot-data-row/row_data/{end_date_str}'
    WITH( partitioned_by=ARRAY['sensor_id'])
    """

    try:
        response = client.query(QueryString=query)
    except Exception as e:
        print(f"Failed to execute query: {e}")
        raise e

このコードではS3へデータをエクスポートする際に、sensor_idでパーティショニングを行っています。パーティショニングを行う際に、WITHを使うのですが、この中で指定するカラム名はSELECT内の最後に来ないとエラーが発生します。
また、TimestreamのtimeはUTC時間でデータを保存するため、日本時間で前日の丸一日分を範囲選択するようなクエリにしています。
実行をすると、
S3にsensor_id=のフォルダが作成され、.gz形式でデータが保存されていることが分かります。

以上

データレイクの構築といいつつ、その前段の内容の紹介で終わってしまいました。次回は生のデータレイクから、分析用のデータレイク構築までを実践できたらなと思います!

Discussion