🪄

TimestreamからUNLOADしたデータから分析用S3バケットを作ってみた

2023/09/09に公開

こんにちは、アキです!
前回はTimestreamからS3へ前日計測したデータをエクスポートしました。

やりたいこと

TimestreamからエクスポートしたデータをETL処理して別のS3バケットへ保存し、さらにそこからデータカタログを作成します。
手順は大まかに以下のようになります。

  1. EventBridgeで指定した時間になったら、前日計測したデータをTimestreamからS3へデータをエクスポート(前回作成)
  2. 分析しやすい形式に変換し、別のS3バケットへデータを移動
  3. 必要に応じてデータカタログを作成・更新
    です。
    この一連の処理をStepFunctionsを利用して実行してみようと思います。
    今回2までの手順をまとめました。

対象データ

前回EventBridgeで設定した時間になった時にTimestreamのUNLOADを実行するLambda関数を紹介しました。今回はそのデータを対象に処理を行っていきますが、前回と異なり、今回利用するデータはパーティションを行っていません。
つまり、データをエクスポートすると

s3://bucket_name/row_data/2023-08-27/results/hogehoge.csv.gz

のようになります。
パーティショニングをやめた理由として、

  • ETL処理を行う際にやりずらかった
  • そもそもETL処理を行う際にその日付すべてのデータを読み込むので、パーティショニングをしておくメリットがあまりない

全体像

前回はTimestreamのデータをS3へエクスポートしました。そこからETL処理を行い、別なS3バケットへデータを保存するようにします。さらに、そのバケットからデータカタログを作成して、分析を行える状態にしたいと思います。そのために今回新たに利用するのは、

  • EventBridge
  • Step Functions
  • Glue Job
  • Glue Crawler
  • Lambda

です。
処理の順としては、

1.EventBridgeでcronを設定し、特定の時間になったらLambdaからTimestreamのUNLOADクエリを実行して、データをS3へエクスポートする。
2. 正常にUNLOADが完了したらStepFunctionsを実行
3. StepFunctions内のLambdaからGlueにETL Jobを実行させる
4. 3の処理が完了したら、TimestreamからUNLOADしたとき作成されるmetadataファイルの中身とデータカタログを比較し、スキーマが変更されていればGlue Crawlerを実行する
※今考えると3のETL JobはLambdaから実行させるのは冗長ですね…

図にすると下記のようなイメージです。

ETL処理を行い分析用のバケットにはsensor_idでパーティショニングされ、parquet形式でデータが保存されます。
※実際のパスイメージ

s3://iot-datalake/processed_data/2023-08-28/sensor_id=1/hogehoge.parquet

Glue Job

想定としては、1日おきにTimestreamからS3へデータがUNLOADされるので、ETL処理はそのUNLOADが終わったら実行されます。
一度ソースコードができ、duckDBを使ってparquetファイルの中身を確認してみると、

このようにすべてvarchar型となってしまいました。
単純にETL処理を行うのではだめなようなので、TimestreamからUNLOADするときに勝手に作成されるmetadata.jsonファイルを参照して、型を一致させるように変更しました。
最終的には下記のコードになっています。

import boto3
import sys
import os
import time
from datetime import datetime, timedelta
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import json

def addColumnName(glueContext, datasource, column_names) :
    #ToDo:単純なリネームならSparkのalias等の手段がある
    # カラム名を追加するためにDynamicFrameからDataFrameへ変換
    dataframe = datasource.toDF()
    # カラム名を変更
    for i in range(len(column_names)):
        dataframe = dataframe.withColumnRenamed(dataframe.columns[i], column_names[i])

    # DataFrameをDynamicFrameに戻す
    datasource_with_header = DynamicFrame.fromDF(dataframe, glueContext, "datasource_with_header")
    return datasource_with_header

def getColumnNameFromMetaDataJson(metadata) :
    # メタデータからカラム名を取得
    column_names = [col["Name"] for col in metadata["ColumnInfo"]]
    return column_names

def getListObjectsFromYesterdayDir(source_bucket_name, source_directory, yesterday_str, s3_client) :
    yesterday_dir_list = s3_client.list_objects_v2(Bucket=source_bucket_name, Prefix=f'{source_directory}/{yesterday_str}/')
    return yesterday_dir_list

def getMetadata(source_bucket_name, dir_objects, s3_client) :
    # メタデータの読み込み
    metadata_file = [obj['Key'] for obj in dir_objects['Contents'] if obj['Key'].endswith('_metadata.json')][0]
    metadata_object = s3_client.get_object(Bucket=source_bucket_name, Key=metadata_file)
    metadata_content = metadata_object['Body'].read().decode('utf-8')
    metadata = json.loads(metadata_content)
    return metadata

def getTypes(metadata) :
    types = []
    for col in metadata["ColumnInfo"] :
        types.append((col["Name"], "string", col["Name"], matchType(col["Type"]["ScalarType"])))
    return types

# TimestreamからのUNLOADが完了しているかをmetadata.jsonの有無で判断する
# "されていなければ"True
def hasNotUnloaded(source_bucket_name, source_directory, yesterday_str, dir_objects, s3_client) :
    target_dir = 'results/'
    res = s3_client.list_objects_v2(
        Bucket=source_bucket_name, 
        Prefix=f'{source_directory}/{yesterday_str}/{target_dir}'
        )
    results_directory_not_exists = True
    for content in dir_objects['Contents']:
        if content['Key'].startswith(f'{source_directory}/{yesterday_str}/{target_dir}'):
            results_directory_not_exists = False
            break
        
    return results_directory_not_exists

# metadata.jsonの中身と、Glueでサポートしている型を一致させる
def matchType(input_type):
    # 他に型があればここに追加していく
    matchDict = {
        'VARCHAR': 'string',
        'DOUBLE': 'double',
        'BIGINT': 'int64',
        'BOOLEAN': 'boolean',
        'TIMESTAMP': 'timestamp'
    }
    return matchDict.get(input_type, 'string')


# ◆処理する前の準備
# タイムゾーンの設定
os.environ['TZ'] = 'Asia/Tokyo'
time.tzset()
# バケット名とディレクトリ名の設定
source_bucket_name = "[row_data_bucket]"
source_directory = "row_data"
destination_bucket_name = "[processed_data_bucket]"
destination_directory = "processed_data"
# 前日の日付の計算
yesterday = datetime.now() - timedelta(days=1)
yesterday_str = yesterday.strftime('%Y-%m-%d')
# s3クライアントの準備
s3_client = boto3.client('s3')
# 前日の日付の名称のディレクトリを取得
yesterday_dir_obj = getListObjectsFromYesterdayDir(source_bucket_name, source_directory, yesterday_str, s3_client)

if hasNotUnloaded(source_bucket_name, source_directory, yesterday_str, yesterday_dir_obj, s3_client):
    print(f"No data found for {yesterday_str}. Skipping...")
else:
    # Glueの設定
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # データの読み込み
    datasource0 = glueContext.create_dynamic_frame.from_options(
        format="csv",
        connection_type="s3",
        connection_options={
            "paths": [f"s3://{source_bucket_name}/{source_directory}/{yesterday_str}/results/"],
            "compression": "gzip"
        })
    
    # metadata.jsonの内容を取得
    metadata = getMetadata(source_bucket_name, yesterday_dir_obj, s3_client)
    # カラム名を定義する処理
    column_names = getColumnNameFromMetaDataJson(metadata)
    datasource_with_header = addColumnName(glueContext, datasource0, column_names)

    # 各カラムの型をmetadataから取得
    column_types = getTypes(metadata)
    apply_mapping = ApplyMapping.apply(frame = datasource_with_header, mappings = column_types)
    
    # データの書き込み
    datasink4 = glueContext.write_dynamic_frame.from_options(
        frame=apply_mapping,
        connection_type="s3",
        connection_options={
            "path": f"s3://{destination_bucket_name}/{destination_directory}/{yesterday_str}/",
            "partitionKeys": ["sensor_id"]
        },
        format="parquet"
    )
    job.commit()

実行すると、

で、timeとmeasured_valueが適切な型になっていることが分かります(timestampは私の作り方が悪いせいでTimestreamにstring型として保存されています)。
今回は型を適切にマッピングするためにmetadata.jsonを利用しましたが、次回予定のデータカタログ作成にもこのファイルを利用します。

以上

割としっかりETL処理を書いたのは今回初めてで、かなり汚いコードかもしれません。
ただ、ETLの基礎的なことを勉強できた気がするので、今回はこれで良しとします。

Discussion