🍳

OSSなPython製ETLライブラリ「dlt」の紹介

2024/12/15に公開

dbt アドベントカレンダー 2024 11日目の記事です。

前置き

普段はdbt Cloudを使っていて、Google Cloud に BigQueryとLookerStudio な技術スタックで社内のデータ基盤を構築・運用しています。

データ基盤で扱うSourceの拡充をするため、ETL/ELT ツールをいくつも試してたどり着いたのが、dltというツールです。

世間では非エンジニアでも画面上でポチポチ設定をしていくだけで、簡単にELT パイプラインが構築できるサービスを使われている話をよく聞きます。エンジニアのいない組織でも、すぐに各種データを収集して分析を始められる点では重宝されるのでしょう。しかしながら、チームメンバーがエンジニア主体の我々にとって、コスト感覚がマッチしなかった、というのが正直なところです。(私だけ?)

各種SaaSのAPIやDB接続をイチから用意してELTパイプライン構築をするのは大変だけど、多少のコードは読み書きができる、パイプラインの実行環境も構築可能、そんなチームで扱いやすいツールを探していました。 dltは、比較検討した他のツールのなかでもライブラリによる処理の共通化・隠蔽化具合がちょうど良い塩梅なところが気に入っています。

対象読者

  • dbtをキーワードに情報収集中の方
  • 各種データベースやSaaSサービスからのELTツールを探している
  • 多少のPythonコードは読み書きできる
  • web系の開発経験がある

dlt って何?

Pythonで作られたデータ連携のためのOSSのELT ライブラリです。2024年9月16日に、dlt 1.0.0がリリースされました。

dbtと一文字違いなので混同しないようにお気をつけください。dbtとは別物ながら、dbtとの連携機能も用意されています。(後述)

Introducing dlt 1.0.0: A Production-Ready Python

元データ(Source)、転送先(Destination)も十分に揃っていて、大きく3つに分けられます

  • SQL database
  • cloud storage or filesystem
  • Verified Source
  • REST API

ひとつずつ見ていきます

SQL database

直接データベースに接続して読み書きができる場合の第一選択肢でしょう。

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server
  • MariaDB
  • IBM DB2 and Informix
  • Google BigQuery
  • Snowflake
  • Redshift
  • Apache Hive and Presto
  • SAP Hana
  • CockroachDB
  • Firebird
  • Teradata Vantage

これらに加えて、DuckDB も対応しています(後述)

Cloud Storage or Filesystem

  • AWS S3
  • Google Cloud Storage
  • Google Drive
  • Azure Blob Storage
  • remote filesystem (via SFTP)
  • local filesystem

Cloud Storage であればアクセスに必要なKEYやSecret・バケット名を設定ファイルに書くだけで、クラウド毎に必要な認証処理はdlt側で判別して処理されます。

ファイル形式は CSVParquet, and JSONL に対応しています

Filesystem source | dlt Docs

REST API

HTTP形式でGET/POSTするものであれば、大概のものは対応できるでしょう。PaginateやRetry処理にも対応したRESTClient が用意されています。

RESTClient | dlt Docs

Verified Source

よく使われるSaaSのAPIは、ライブラリが用意されています。これらは、v1.0.0 リリースと合わせて、別リポジトリで管理されるようになりました。

verified-sources/sources at master · dlt-hub/verified-sources

パイイプライン のサンプルコード

データが取得できることを確認、取得されたデータのテーブル構成などを確認する際には、DuckDBが非常に便利です。dltのドキュメント内でもdestination先として頻繁に使われています。

An in-process SQL OLAP database management system

Slackユーザーを取得する

初めて作成する場合は、 dlt init で初期セットアップを行うのが便利です。パイプラインに必要なファイルを一式作成してくれます。

引数は、 dlt init [source] [destination] の形式になっていて、次のコマンドは slack から取得したものを duckdb に保存するパイプラインを作成します。

dlt init slack duckdb

冒頭のimport部分を省略すると、自分で書くコード量としては、このくらいのものです。

def slack_pipeline() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="slack",
        destination='duckdb',
    )
    
    load_info = pipeline.run(load_slack_users())

if __name__ == "__main__":
    slack_pipeline()

開発時には、.dlt/secret.toml に、Slack bot のアクセストークンを設定しておきます。このファイルは秘匿情報が記載されるため、バージョン管理に含めてはいけません。本番環境では環境変数での設定が想定されています。

[sources.slack]
access_token = "xoxp-xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  • verified source のリポジトリから持ってきた、 @source@resource

    今回、関連テーブルは不要だったため、 @sourcemax_table_nesting を1に変更しました。
    slack_source のパラメータは多いですが、デフォルト値のままで良いか使用しないものが多く、パイプラインからは何も指定せずに呼び出しています。

    @dlt.source(name="slack", max_table_nesting=1)
    def load_slack_users(
            page_size: int = 100,
            access_token: str = dlt.secrets.value,
            start_date: Optional[TAnyDateTime] = DEFAULT_START_DATE,
            end_date: Optional[TAnyDateTime] = None,
            selected_channels: Optional[List[str]] = dlt.config.value,
            table_per_channel: bool = True,
            replies: bool = False,
    ) -> Iterable[DltResource]:
        """
        The source for the Slack pipeline. Available resources are conversations, conversations_history
        and access_logs.
    
        Args:
            page_size: The max number of items to fetch per page. Defaults to 1000.
            access_token: the oauth access_token used to authenticate.
            start_date: The start time of the range for which to load. Defaults to January 1st 2000.
            end_date: The end time of the range for which to load data.
            selected_channels: The list of channels to load. If None, all channels will be loaded.
            table_per_channel: Boolean flag, True by default. If True - for each channel separate table with messages is created.
                Otherwise, all messages are put in one table.
            replies: Boolean flag indicating if you want a replies table to be present as well. False by default.
    
        Returns:
            Iterable[DltResource]: A list of DltResource objects representing the data resources.
        """
    
        end_dt: Optional[DateTime] = ensure_dt_type(end_date)
        start_dt: Optional[DateTime] = ensure_dt_type(start_date)
        write_disposition: Literal["append", "merge"] = (
            "append" if end_date is None else "merge"
        )
    
        api = SlackAPI(
            access_token=access_token,
            page_size=page_size,
        )
    
        @dlt.resource(name="users", primary_key="id", write_disposition="replace")
        def users_resource() -> Iterable[TDataItem]:
            """
            Yield all users as a DLT resource.
    
            Yields:
                Iterable[TDataItem]: A list of users.
            """
    
            for page_data in api.get_pages(
                    resource="users.list",
                    response_path="$.members[*]",
                    params=dict(include_locale=True),
                    datetime_fields=DEFAULT_DATETIME_FIELDS,
            ):
                yield page_data
    
        yield from (
            users_resource,
        )
    
    

REST API で Notion のユーザーを取得する

Notionのユーザー情報は、REST APIのhelperライブラリを使って取得してみます。

dlt init rest_api duckdb

上のslackパイプラインと見比べて見てください。pipeline_nameとdataset_nameのほか、 pipeline.run() に渡している引数の load_* の関数名が違うくらいの差分になっています。

def notion_user_pipeline() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="notion_users",
        destination='duckdb',
    )

    load_info = pipeline.run(load_notion_users())

if __name__ == "__main__":
    notion_user_pipeline()

Notionのapi_keyは、先程同様に、 .dlt/secret.toml に設定しておきます

[notion_users]
api_key = "secret_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

次にNotion APIのドキュメントを見ながらload_notion_users() の実装を書いていきます。

@dlt.source(name="notion_user")
def load_notion_users(api_key: str = dlt.secrets.value) -> Iterable[DltResource]:
    """
    
    Args:
        api_key (str): Notion Integration Key
    """

    client = RESTClient(
        base_url="https://api.notion.com",
        headers={"Notion-Version": "2022-06-28"},
        auth=BearerTokenAuth(token=api_key),
        paginator=JSONResponseCursorPaginator(cursor_path="next_cursor", cursor_param="start_cursor"),
        data_selector="results",
    )
    
    
    @dlt.resource(write_disposition="replace", primary_key="id")
    def user_list() -> Iterable[TDataItem]:
        for page in client.paginate("/v1/users"):
            yield from page
    

    return (
        user_list(),
    )

RESTClientにbase_urlやNotion API特有で必須のHeaderを指定することができます。一度のリクエストで取得できる上限が100件、数百アカウントが登録されている環境なので複数回にわけて取得する必要があります。paginatorで、JSONResponseCursorPaginator() を使ってcursor_pathやcursor_param を指定することができました。

取得したJSONのうち、resultsのなかに目的のユーザー配列が格納されていますので、data_selectorにresultsを指定しました。

client = RESTClient(
        base_url="https://api.notion.com",
        headers={"Notion-Version": "2022-06-28"},
        auth=BearerTokenAuth(token=api_key),
        paginator=JSONResponseCursorPaginator(cursor_path="next_cursor", cursor_param="start_cursor"),
        data_selector="results",
    )

clientを作成できたら、いよいよエンドポイント毎のデータ取得部分に入っていきます。

    @dlt.resource(write_disposition="replace", primary_key="id")
    def user_list() -> Iterable[TDataItem]:
        for page in client.paginate("/v1/users"):
            yield from page

write_disposition は replace にしました。 APIレスポンスに更新日時が格納されている場合は、merge で増分更新方式にすることができます。

    return (
        user_list(),
    )

複数APIから取得する場合は、@dlt.resource を複数作成し、最後のreturnに関数名を追加していきます。

パイプラインができたら実行してみます。

python slack.py

Macで実行すると、プラットフォームが未サポートの警告メッセージが出てきます。
念の為、本番環境を想定したUbuntu環境を作って実行したりもしましたが、今のところ動作が違って困ることは発生していません。

> ALTS: Platforms other than Linux and Windows are not supported

DuckDBに取り込まれたデータを確認してみましょう。

duckdb notion_users.duckdb
D .tables
_dlt_loads                                   
_dlt_pipeline_state                          
_dlt_version
user_list

_ から始まるテーブルは、DLTがパイプライン管理に使っているものです。

user_list のテーブルができていますね。

D DESCRIBE lake_notion_users.user_list;
┌───────────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│      column_name      │ column_type │  null   │   key   │ default │  extra  │
│        varchar        │   varchar   │ varchar │ varchar │ varchar │ varchar │
├───────────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ object                │ VARCHAR     │ YES     │         │         │         │
│ id                    │ VARCHAR     │ NO      │         │         │         │
│ name                  │ VARCHAR     │ YES     │         │         │         │
│ type                  │ VARCHAR     │ YES     │         │         │         │
│ person__email         │ VARCHAR     │ YES     │         │         │         │
│ _dlt_load_id          │ VARCHAR     │ NO      │         │         │         │
│ _dlt_id               │ VARCHAR     │ NO      │         │         │         │
│ avatar_url            │ VARCHAR     │ YES     │         │         │         │
│ bot__owner__type      │ VARCHAR     │ YES     │         │         │         │
│ bot__owner__workspace │ BOOLEAN     │ YES     │         │         │         │
│ bot__workspace_name   │ VARCHAR     │ YES     │         │         │         │
├───────────────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┤
│ 11 rows                                                           6 columns │
└─────────────────────────────────────────────────────────────────────────────┘

D select count(id) from lake_notion_users.user_list;
┌───────────┐
│ count(id) │
│   int64   │
├───────────┤
│       543 │
└───────────┘

テーブルのスキーマ情報と、レコード数を取得することができました。(件数はダミーです)

Destination を BigQuery に変更

これがdltを使うメリットのひとつで、インターフェイスが統一されているので、気軽にDestinationを切り替えることができます。

※ Google Cloud上でのプロジェクト作成や、BigQuery のAPI利用開始、そしてパイプラインを実行する環境での認証などは事前に行っておいてください。

BigQueryを使う環境が揃ったら、destinationを bigquery に変更、取り込み先のdataset名をdataset_nameに指定します。

def notion_user_pipeline() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="notion_users",
        destination='bigquery',
        dataset_name="lake_notion_users",
    )

デフォルトで USリージョン に設定されています。リージョンを変更する場合は .dlt/secrets.toml で設定します。

[destination.bigquery]
location = "asia-northeast1"

コンソールからレコード件数を取得するSQLを実行してみましょう

SELECT count(id) FROM `プロジェクト名.notion_users.user_list`

┌───────────┐
│ count(id) │
├───────────┤
│       543 │
└───────────┘

DuckDBを使って確認したのと同じ件数がBigQueryに保存されたことが確認できました。

DBTとの連携

ここまでお読みいただきありがとうございます。ようやくdbtとの連携が出てきます。

dlt pipelineで DBや各種APIから集めたデータをdestinationへ格納したあと、データの変換が必要な場合もあるでしょう。そこでdbtをTransformのために使う機能が用意されています。

dbt-core を使う場合

# Get runner, optionally pass the venv
dbt = dlt.dbt.package(
    pipeline,
    package_location="./dbt", # git repository url to be cloned or a local path
    package_profiles_dir=os.path.abspath("."),  # profiles.yml must be placed in this dir
    package_profile_name="duckdb_dlt_dbt_test",  # name of the profile
)

models = dbt.run_all()

dbt-cloud を使う場合

# Initialize the client
client = DBTCloudClientV2(api_token="YOUR_API_TOKEN", account_id="YOUR_ACCOUNT_ID")

# Example: Trigger a job run
job_run_id = client.trigger_job_run(job_id=1234, data={"cause": "Triggered via API"})
print(f"Job run triggered successfully. Run ID: {job_run_id}")

# Example: Get run status
run_status = client.get_run_status(run_id=job_run_id)
print(f"Job run status: {run_status['status_humanized']}")

dbt-cloudの認証情報も .dlt/secret.toml に記載 もしくは 環境変数をつかって設定します。

[dbt_cloud]
api_token = "set me up!" # required for authentication
account_id = "set me up!" # required for both helper functions
job_id = "set me up!" # optional only for the run_dbt_cloud_job function (you can pass this explicitly as an argument to the function)
run_id = "set me up!" # optional for the get_dbt_cloud_run_status function (you can pass this explicitly as an argument to the function)
DBT_CLOUD__API_TOKEN="set me up!"
DBT_CLOUD__ACCOUNT_ID="set me up!"
DBT_CLOUD__JOB_ID="set me up!"

dbtとの連携まで紹介し終えて、ようやくdbtアドベントカレンダーにこの記事を投稿する理由(わけ)をお伝えすることができました。dltを使ってデータ基盤をスモールスタートしようというときに、一緒に導入すると良さそうです。

dltのクラウドサービス dlt+ では dbtのコード生成ツールも提供されるそうで、これが使えるとかなりアドバンテージになりそうだなと思っています。

dlthub.com

まとめ

公式ドキュメントには、GitHub Actions、Google Cloud Composer (Apache Airflow)、Google Cloud Run と Cloud Functions、 Kestra、Dagster、Prefect、Modal といったツールとの連携方法も紹介されています。Python3が使える環境という意味では、オンプレミス・クラウド問わず、実行環境は比較的自由に選ぶことができるでしょう。

ということで、web系エンジニア視点から、dbtをキーワードに新技術スタックを模索している方に向けて、データを収集するお手軽なツール「dlt」を紹介してみました。

株式会社ゆめみ

Discussion