ServerlessFrameworkでdltをAWS Lambdaにデプロイする:PostgreSQL→BigQueryのデータ転送
はじめに
こんにちは、健常者エミュレータ事例集の管理人をしていますcontradiction29です。健常者エミュレータ事例集はバックエンドでPostgreSQL (on Supabase)を利用しているのですが、PostgreSQLの中身をデータ分析用のBigQueryに同期させる仕組みの一部でdltを活用しています。
dltはデータのロードに特化したオープンソースのPython用ライブラリです。
近い役割を果たすツールとしてFivetranやAirbyteがありますが、これらはELTの中でE(Extract : データソースからのデータ抽出)とL(Load : デスティネーションへのデータの書き込み)を一貫して行うのに対して、dltはLoadの部分に特化しています。ELTが何かに関しては以下の記事をご覧ください。
実際にdltの公式ページにあるサンプルコードを見てみると、Extractの部分がsqlalchemyで書いてあるのがわかります。dltはデータのロードに特化した、より疎結合な志向性を持ったツールであるといえます。
import dlt
from sqlalchemy import create_engine
# Use any SQL database supported by SQLAlchemy, below we use a public
# MySQL instance to get data.
# NOTE: you'll need to install pymysql with `pip install pymysql`
# NOTE: loading data from public mysql instance may take several seconds
engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam")
with engine.connect() as conn:
# Select genome table, stream data in batches of 100 elements
query = "SELECT * FROM genome LIMIT 1000"
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)
pipeline = dlt.pipeline(
pipeline_name="from_database",
destination="duckdb",
dataset_name="genome_data",
)
# Convert the rows into dictionaries on the fly with a map function
load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome")
print(load_info)
今回は、dltを実際に使ってみたので、その実装方法を紹介します。使うスタックは以下の通りです。
- PostgreSQL
- データソースとして利用します
- 実態はSupabaseです
- Google BigQuery
- データのロード先として使います
- もはや説明不要
- dlt
- データのロードのために使います
- 今回の目玉
- ServerlessFramework
- デプロイのために使います
- 通常のzipファイル形式のデプロイだと容量オーバーになるため、ECR経由でのデプロイを行います
- AWS Secret Manager
- シークレットの格納に使います
では、実装に入っていきます
TL;DR
実装は下記のレポジトリに格納されています。
上記レポジトリには今回の話題には関係ないコードもあるので、今回のdltに関するコードのみ抜粋します。
tree
.
├── ExtractAndLoadToBQ
│ ├── Dockerfile
│ ├── lambda_function.py
│ └── requirements.txt
└── serverless.yml
service: hpefunctions
frameworkVersion: "3"
provider:
name: aws
architecture: x86_64
region: ap-northeast-1
stage: prod
runtime: python3.9
iam:
role: arn:aws:iam::662924458234:role/lambda_execution_role
ecr:
images:
extract_and_load_to_bq:
path: ./ExtractAndLoadToBQ
package:
individually: true
exclude:
- ./**
functions:
ExtractAndLoadToBQ:
image:
name: extract_and_load_to_bq
package:
include:
- ExtractAndLoadToBQ/**
module: ExtractAndLoadToBQ
timeout: 900
events:
- schedule: cron(0 16 * * ? *)
environment:
DLT_DATA_DIR : /tmp
DLT_PIPELINE_DIR : /tmp
DLT_PROJECT_DIR : /tmp
plugins:
- serverless-python-requirements
FROM public.ecr.aws/lambda/python:3.9
COPY lambda_function.py ./lambda_function.py
COPY requirements.txt ./requirements.txt
RUN pip install -r requirements.txt
CMD ["lambda_function.lambda_handler"]
import dlt
from sqlalchemy import create_engine, text
from concurrent.futures import ThreadPoolExecutor
from time import time
import boto3
import json
BQ_DATASET = "hpe_raw"
def get_secrets():
secretmanager_client = boto3.client("secretsmanager")
secret_value = secretmanager_client.get_secret_value(SecretId="DLT_CONNECTION_PARAMS")
return json.loads(secret_value["SecretString"])
def process_table(table_name, engine, secrets):
try:
with engine.connect() as conn:
start_time = time()
query = f"SELECT * FROM {table_name}"
rows = conn.execute(text(query))
pipeline = dlt.pipeline(
pipeline_name=f"extract_{table_name}",
destination=dlt.destinations.bigquery(
credentials=secrets,
),
dataset_name="HPE_RAW",
)
pipeline.run(
rows,
table_name=table_name,
write_disposition="replace",
)
end_time = time()
print(f"Table {table_name} processed in {end_time - start_time:.2f} seconds")
except Exception as e:
print(f"Failed to process {table_name}: {e}")
raise e
def lambda_handler(event, context):
secrets = get_secrets()
connection_string = secrets["connection_string"]
engine = create_engine(
connection_string,
connect_args={"connect_timeout": 60 * 15}
)
with engine.connect() as conn:
res = conn.execute(text("SELECT tablename FROM pg_catalog.pg_tables where schemaname='public'"))
table_names = [row[0] for row in res]
with ThreadPoolExecutor() as executor:
executor.map(lambda table_name: process_table(table_name, engine, secrets), table_names)
if __name__ == "__main__":
lambda_handler(None, None)
sqlalchemy==2.0.29
dlt==0.4.8
psycopg2-binary==2.9.9
dlt[bigquery]
躓きポイント
結構つまずくところがあったので補足します。
1. デプロイ方式について
通常のzipファイル形式のデプロイだと、dlt
パッケージとdlt[bigquery]
のサイズが大きすぎて50MB(zipped)のサイズ上限を突破してしまいます
そのため、ECRを経由したコンテナイメージ経由のデプロイ方式に変更し、サイズ上限を回避しています
2. 環境変数について
- dltはデータをロードする際、一時的な作業用ディレクトリを
/home
配下に作ろうとします - しかし、AWS Lambdaでは
/tmp
ディレクトリ配下以外に書き込みアクセスをすることができない仕様になっています
- そのため、環境変数を以下のように上書きし、一時的な作業ディレクトリを
/tmp
配下にするように上書き設定しています
environment:
DLT_DATA_DIR : /tmp
DLT_PIPELINE_DIR : /tmp
DLT_PROJECT_DIR : /tmp
3. クレデンシャルについて
- Google Cloud側で作成した秘密鍵をAWS Secret Managerにアップロードする際、GUI上から「キー/値」で編集を行うと「\n」がエスケープされ「\\n」に変換されます。これだと接続がうまくいきません
- private_keyを「プレーンテキスト」から編集し、不必要なエスケープを無効化する必要があります
- なお、クレデンシャルの値の受け渡しはdestinationを設定する以下の箇所で行っています
pipeline = dlt.pipeline(
pipeline_name=f"extract_{table_name}",
destination=dlt.destinations.bigquery(
credentials=secrets,
),
dataset_name="HPE_RAW",
)
- そのほかクレデンシャルに関する設定に関しては、以下の公式ドキュメントをご覧ください。
おわりに
AirbyteやFivetranと比較すると、dltを使いたくなるのは以下のようなユースケースだと思いました。
- 特定の情報提供型APIからローカル環境で動いているデータベースにデータを格納し、分析・機械学習を行いたい
- FivetranやAirbyteでは明らかに対応していなさそうなデータソースには、dltは向いていると思います
- 両者ともにカスタムコネクター機能がありますが、「手元のJupyter Notebookで使うだけだし、そんなのを使うほどじゃないしな...」と思ったらdltの出番です
裏を返すと、PostgresSQLやMySQLのようなメジャーなデータソースの場合、FivetranやAirbyteの方に軍配が上がりそうに思えます。
ELTアーキテクチャを構成する一つのパーツとして、選択肢の一つに入れてみてはいかがでしょうか。
参考ドキュメント
Discussion