TimestreamからUNLOADしたデータから分析用S3バケットを作ってみた
こんにちは、アキです!
前回はTimestreamからS3へ前日計測したデータをエクスポートしました。
やりたいこと
TimestreamからエクスポートしたデータをETL処理して別のS3バケットへ保存し、さらにそこからデータカタログを作成します。
手順は大まかに以下のようになります。
- EventBridgeで指定した時間になったら、前日計測したデータをTimestreamからS3へデータをエクスポート(前回作成)
- 分析しやすい形式に変換し、別のS3バケットへデータを移動
- 必要に応じてデータカタログを作成・更新
です。
この一連の処理をStepFunctionsを利用して実行してみようと思います。
今回2までの手順をまとめました。
対象データ
前回EventBridgeで設定した時間になった時にTimestreamのUNLOADを実行するLambda関数を紹介しました。今回はそのデータを対象に処理を行っていきますが、前回と異なり、今回利用するデータはパーティションを行っていません。
つまり、データをエクスポートすると
s3://bucket_name/row_data/2023-08-27/results/hogehoge.csv.gz
のようになります。
パーティショニングをやめた理由として、
- ETL処理を行う際にやりずらかった
- そもそもETL処理を行う際にその日付すべてのデータを読み込むので、パーティショニングをしておくメリットがあまりない
全体像
前回はTimestreamのデータをS3へエクスポートしました。そこからETL処理を行い、別なS3バケットへデータを保存するようにします。さらに、そのバケットからデータカタログを作成して、分析を行える状態にしたいと思います。そのために今回新たに利用するのは、
- EventBridge
- Step Functions
- Glue Job
- Glue Crawler
- Lambda
です。
処理の順としては、
1.EventBridgeでcronを設定し、特定の時間になったらLambdaからTimestreamのUNLOADクエリを実行して、データをS3へエクスポートする。
2. 正常にUNLOADが完了したらStepFunctionsを実行
3. StepFunctions内のLambdaからGlueにETL Jobを実行させる
4. 3の処理が完了したら、TimestreamからUNLOADしたとき作成されるmetadataファイルの中身とデータカタログを比較し、スキーマが変更されていればGlue Crawlerを実行する
※今考えると3のETL JobはLambdaから実行させるのは冗長ですね…
図にすると下記のようなイメージです。
ETL処理を行い分析用のバケットにはsensor_idでパーティショニングされ、parquet形式でデータが保存されます。
※実際のパスイメージ
s3://iot-datalake/processed_data/2023-08-28/sensor_id=1/hogehoge.parquet
Glue Job
想定としては、1日おきにTimestreamからS3へデータがUNLOADされるので、ETL処理はそのUNLOADが終わったら実行されます。
一度ソースコードができ、duckDBを使ってparquetファイルの中身を確認してみると、
このようにすべてvarchar型となってしまいました。
単純にETL処理を行うのではだめなようなので、TimestreamからUNLOADするときに勝手に作成されるmetadata.jsonファイルを参照して、型を一致させるように変更しました。
最終的には下記のコードになっています。
import boto3
import sys
import os
import time
from datetime import datetime, timedelta
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import json
def addColumnName(glueContext, datasource, column_names) :
#ToDo:単純なリネームならSparkのalias等の手段がある
# カラム名を追加するためにDynamicFrameからDataFrameへ変換
dataframe = datasource.toDF()
# カラム名を変更
for i in range(len(column_names)):
dataframe = dataframe.withColumnRenamed(dataframe.columns[i], column_names[i])
# DataFrameをDynamicFrameに戻す
datasource_with_header = DynamicFrame.fromDF(dataframe, glueContext, "datasource_with_header")
return datasource_with_header
def getColumnNameFromMetaDataJson(metadata) :
# メタデータからカラム名を取得
column_names = [col["Name"] for col in metadata["ColumnInfo"]]
return column_names
def getListObjectsFromYesterdayDir(source_bucket_name, source_directory, yesterday_str, s3_client) :
yesterday_dir_list = s3_client.list_objects_v2(Bucket=source_bucket_name, Prefix=f'{source_directory}/{yesterday_str}/')
return yesterday_dir_list
def getMetadata(source_bucket_name, dir_objects, s3_client) :
# メタデータの読み込み
metadata_file = [obj['Key'] for obj in dir_objects['Contents'] if obj['Key'].endswith('_metadata.json')][0]
metadata_object = s3_client.get_object(Bucket=source_bucket_name, Key=metadata_file)
metadata_content = metadata_object['Body'].read().decode('utf-8')
metadata = json.loads(metadata_content)
return metadata
def getTypes(metadata) :
types = []
for col in metadata["ColumnInfo"] :
types.append((col["Name"], "string", col["Name"], matchType(col["Type"]["ScalarType"])))
return types
# TimestreamからのUNLOADが完了しているかをmetadata.jsonの有無で判断する
# "されていなければ"True
def hasNotUnloaded(source_bucket_name, source_directory, yesterday_str, dir_objects, s3_client) :
target_dir = 'results/'
res = s3_client.list_objects_v2(
Bucket=source_bucket_name,
Prefix=f'{source_directory}/{yesterday_str}/{target_dir}'
)
results_directory_not_exists = True
for content in dir_objects['Contents']:
if content['Key'].startswith(f'{source_directory}/{yesterday_str}/{target_dir}'):
results_directory_not_exists = False
break
return results_directory_not_exists
# metadata.jsonの中身と、Glueでサポートしている型を一致させる
def matchType(input_type):
# 他に型があればここに追加していく
matchDict = {
'VARCHAR': 'string',
'DOUBLE': 'double',
'BIGINT': 'int64',
'BOOLEAN': 'boolean',
'TIMESTAMP': 'timestamp'
}
return matchDict.get(input_type, 'string')
# ◆処理する前の準備
# タイムゾーンの設定
os.environ['TZ'] = 'Asia/Tokyo'
time.tzset()
# バケット名とディレクトリ名の設定
source_bucket_name = "[row_data_bucket]"
source_directory = "row_data"
destination_bucket_name = "[processed_data_bucket]"
destination_directory = "processed_data"
# 前日の日付の計算
yesterday = datetime.now() - timedelta(days=1)
yesterday_str = yesterday.strftime('%Y-%m-%d')
# s3クライアントの準備
s3_client = boto3.client('s3')
# 前日の日付の名称のディレクトリを取得
yesterday_dir_obj = getListObjectsFromYesterdayDir(source_bucket_name, source_directory, yesterday_str, s3_client)
if hasNotUnloaded(source_bucket_name, source_directory, yesterday_str, yesterday_dir_obj, s3_client):
print(f"No data found for {yesterday_str}. Skipping...")
else:
# Glueの設定
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# データの読み込み
datasource0 = glueContext.create_dynamic_frame.from_options(
format="csv",
connection_type="s3",
connection_options={
"paths": [f"s3://{source_bucket_name}/{source_directory}/{yesterday_str}/results/"],
"compression": "gzip"
})
# metadata.jsonの内容を取得
metadata = getMetadata(source_bucket_name, yesterday_dir_obj, s3_client)
# カラム名を定義する処理
column_names = getColumnNameFromMetaDataJson(metadata)
datasource_with_header = addColumnName(glueContext, datasource0, column_names)
# 各カラムの型をmetadataから取得
column_types = getTypes(metadata)
apply_mapping = ApplyMapping.apply(frame = datasource_with_header, mappings = column_types)
# データの書き込み
datasink4 = glueContext.write_dynamic_frame.from_options(
frame=apply_mapping,
connection_type="s3",
connection_options={
"path": f"s3://{destination_bucket_name}/{destination_directory}/{yesterday_str}/",
"partitionKeys": ["sensor_id"]
},
format="parquet"
)
job.commit()
実行すると、
で、timeとmeasured_valueが適切な型になっていることが分かります(timestampは私の作り方が悪いせいでTimestreamにstring型として保存されています)。
今回は型を適切にマッピングするためにmetadata.jsonを利用しましたが、次回予定のデータカタログ作成にもこのファイルを利用します。
以上
割としっかりETL処理を書いたのは今回初めてで、かなり汚いコードかもしれません。
ただ、ETLの基礎的なことを勉強できた気がするので、今回はこれで良しとします。
Discussion