🐕

ServerlessFrameworkでdltをAWS Lambdaにデプロイする:PostgreSQL→BigQueryのデータ転送

2024/04/20に公開

はじめに

こんにちは、健常者エミュレータ事例集の管理人をしていますcontradiction29です。健常者エミュレータ事例集はバックエンドでPostgreSQL (on Supabase)を利用しているのですが、PostgreSQLの中身をデータ分析用のBigQueryに同期させる仕組みの一部でdltを活用しています。

dltはデータのロードに特化したオープンソースのPython用ライブラリです。

https://dlthub.com/docs/intro

近い役割を果たすツールとしてFivetranやAirbyteがありますが、これらはELTの中でE(Extract : データソースからのデータ抽出)とL(Load : デスティネーションへのデータの書き込み)を一貫して行うのに対して、dltはLoadの部分に特化しています。ELTが何かに関しては以下の記事をご覧ください。

https://medium.com/@habibsahab973/elt-fundamentals-101-15ab13154092

実際にdltの公式ページにあるサンプルコードを見てみると、Extractの部分がsqlalchemyで書いてあるのがわかります。dltはデータのロードに特化した、より疎結合な志向性を持ったツールであるといえます。

import dlt
from sqlalchemy import create_engine

# Use any SQL database supported by SQLAlchemy, below we use a public
# MySQL instance to get data.
# NOTE: you'll need to install pymysql with `pip install pymysql`
# NOTE: loading data from public mysql instance may take several seconds
engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam")

with engine.connect() as conn:
    # Select genome table, stream data in batches of 100 elements
    query = "SELECT * FROM genome LIMIT 1000"
    rows = conn.execution_options(yield_per=100).exec_driver_sql(query)

    pipeline = dlt.pipeline(
        pipeline_name="from_database",
        destination="duckdb",
        dataset_name="genome_data",
    )

    # Convert the rows into dictionaries on the fly with a map function
    load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome")

print(load_info)

今回は、dltを実際に使ってみたので、その実装方法を紹介します。使うスタックは以下の通りです。

  • PostgreSQL
    • データソースとして利用します
    • 実態はSupabaseです
  • Google BigQuery
    • データのロード先として使います
    • もはや説明不要
  • dlt
    • データのロードのために使います
    • 今回の目玉
  • ServerlessFramework
    • デプロイのために使います
    • 通常のzipファイル形式のデプロイだと容量オーバーになるため、ECR経由でのデプロイを行います
  • AWS Secret Manager
    • シークレットの格納に使います

では、実装に入っていきます

TL;DR

実装は下記のレポジトリに格納されています。

https://github.com/sora32127/healthy-person-emulator

上記レポジトリには今回の話題には関係ないコードもあるので、今回のdltに関するコードのみ抜粋します。

tree
tree
.
├── ExtractAndLoadToBQ
│   ├── Dockerfile
│   ├── lambda_function.py
│   └── requirements.txt
└── serverless.yml
serverless.yml
service: hpefunctions
frameworkVersion: "3"

provider:
  name: aws
  architecture: x86_64
  region: ap-northeast-1
  stage: prod
  runtime: python3.9
  iam:
    role: arn:aws:iam::662924458234:role/lambda_execution_role
  ecr:
    images:
      extract_and_load_to_bq:
        path: ./ExtractAndLoadToBQ

package:
  individually: true
  exclude:
    - ./**

functions:
  ExtractAndLoadToBQ:
    image:
      name: extract_and_load_to_bq
    package:
      include:
        - ExtractAndLoadToBQ/**
    module: ExtractAndLoadToBQ
    timeout: 900
    events:
     - schedule: cron(0 16 * * ? *)
    environment:
      DLT_DATA_DIR : /tmp
      DLT_PIPELINE_DIR : /tmp
      DLT_PROJECT_DIR : /tmp
plugins:
  - serverless-python-requirements

Dockerfile
FROM public.ecr.aws/lambda/python:3.9 

COPY lambda_function.py ./lambda_function.py
COPY requirements.txt ./requirements.txt

RUN pip install -r requirements.txt

CMD ["lambda_function.lambda_handler"]
lambda_function.py
import dlt
from sqlalchemy import create_engine, text
from concurrent.futures import ThreadPoolExecutor
from time import time
import boto3
import json
BQ_DATASET = "hpe_raw"

def get_secrets():
    secretmanager_client = boto3.client("secretsmanager")
    secret_value = secretmanager_client.get_secret_value(SecretId="DLT_CONNECTION_PARAMS")
    return json.loads(secret_value["SecretString"])


def process_table(table_name, engine, secrets):
    try:
        with engine.connect() as conn:
            start_time = time()
            query = f"SELECT * FROM {table_name}"
            rows = conn.execute(text(query))
            pipeline = dlt.pipeline(
                pipeline_name=f"extract_{table_name}",
                destination=dlt.destinations.bigquery(
                    credentials=secrets,
                    ),
                dataset_name="HPE_RAW",
            )
            pipeline.run(
                rows,
                table_name=table_name,
                write_disposition="replace",
            )
            end_time = time()

            print(f"Table {table_name} processed in {end_time - start_time:.2f} seconds")
            
    except Exception as e:
        print(f"Failed to process {table_name}: {e}")
        raise e



def lambda_handler(event, context):
    secrets = get_secrets()
    connection_string = secrets["connection_string"]
    engine = create_engine(
        connection_string,
        connect_args={"connect_timeout": 60 * 15}
    )

    with engine.connect() as conn:
        res = conn.execute(text("SELECT tablename FROM pg_catalog.pg_tables where schemaname='public'"))
        table_names = [row[0] for row in res]
    with ThreadPoolExecutor() as executor:
        executor.map(lambda table_name: process_table(table_name, engine, secrets), table_names)

if __name__ == "__main__":
    lambda_handler(None, None)

requirements.txt
sqlalchemy==2.0.29
dlt==0.4.8
psycopg2-binary==2.9.9
dlt[bigquery]

躓きポイント

結構つまずくところがあったので補足します。

1. デプロイ方式について

通常のzipファイル形式のデプロイだと、dltパッケージとdlt[bigquery]のサイズが大きすぎて50MB(zipped)のサイズ上限を突破してしまいます

https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html

そのため、ECRを経由したコンテナイメージ経由のデプロイ方式に変更し、サイズ上限を回避しています

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images-create.html

2. 環境変数について

  • dltはデータをロードする際、一時的な作業用ディレクトリを/home配下に作ろうとします
  • しかし、AWS Lambdaでは/tmpディレクトリ配下以外に書き込みアクセスをすることができない仕様になっています

https://docs.aws.amazon.com/lambda/latest/api/API_EphemeralStorage.html

  • そのため、環境変数を以下のように上書きし、一時的な作業ディレクトリを/tmp配下にするように上書き設定しています
environment:
  DLT_DATA_DIR : /tmp
  DLT_PIPELINE_DIR : /tmp
  DLT_PROJECT_DIR : /tmp

3. クレデンシャルについて

  • Google Cloud側で作成した秘密鍵をAWS Secret Managerにアップロードする際、GUI上から「キー/値」で編集を行うと「\n」がエスケープされ「\\n」に変換されます。これだと接続がうまくいきません
  • private_keyを「プレーンテキスト」から編集し、不必要なエスケープを無効化する必要があります
  • なお、クレデンシャルの値の受け渡しはdestinationを設定する以下の箇所で行っています
pipeline = dlt.pipeline(
                pipeline_name=f"extract_{table_name}",
                destination=dlt.destinations.bigquery(
                    credentials=secrets,
                    ),
                dataset_name="HPE_RAW",
            )
  • そのほかクレデンシャルに関する設定に関しては、以下の公式ドキュメントをご覧ください。

https://dlthub.com/docs/dlt-ecosystem/destinations/bigquery

おわりに

AirbyteやFivetranと比較すると、dltを使いたくなるのは以下のようなユースケースだと思いました。

  • 特定の情報提供型APIからローカル環境で動いているデータベースにデータを格納し、分析・機械学習を行いたい
    • FivetranやAirbyteでは明らかに対応していなさそうなデータソースには、dltは向いていると思います
    • 両者ともにカスタムコネクター機能がありますが、「手元のJupyter Notebookで使うだけだし、そんなのを使うほどじゃないしな...」と思ったらdltの出番です

裏を返すと、PostgresSQLやMySQLのようなメジャーなデータソースの場合、FivetranやAirbyteの方に軍配が上がりそうに思えます。

ELTアーキテクチャを構成する一つのパーツとして、選択肢の一つに入れてみてはいかがでしょうか。

参考ドキュメント

https://dlthub.com/docs/dlt-ecosystem/destinations/bigquery

https://dlthub.com/docs/blog/dlt-aws-taktile-blog

Discussion