🫐

【dltHub Docs】Tutorial

2024/09/13に公開

概要

https://dlthub.com/docs/tutorial/intro
Version: 0.5.4 (latest) の参照メモ

再出力方法

https://www.perplexity.ai/

https://dlthub.com/docs/tutorial/intro
日本語に翻訳してZenn用のマークダウンに整形して
https://dlthub.com/docs/tutorial/load-data-from-an-api
日本語に翻訳してZenn用のマークダウンに整形して
https://dlthub.com/docs/tutorial/grouping-resources
日本語に翻訳してZenn用のマークダウンに整形して

dltを使用したデータパイプライン構築チュートリアル

dltを使用してデータパイプラインを効率的に構築する方法のチュートリアルへようこそ。このチュートリアルでは、dltの基本的な概念を紹介し、基本的および高度な使用シナリオについてガイドします。実践的な例として、GitHub APIからデータを取得し、DuckDBにロードするデータパイプラインを構築します。

学習内容

このチュートリアルでは、以下の内容を学びます:

  1. GitHub APIからのデータ取得
  2. データロード動作の理解と管理
  3. 新しいデータの増分ロードと既存データの重複排除
  4. データ取得をより動的にし、コードの冗長性を減らす
  5. シークレットの安全な取り扱い
  6. 再利用可能なデータソースの作成

さあ、始めましょう!

APIからデータをロードすることから始めます。次のセクションでは、GitHub APIを使用してデータを取得し、DuckDBにロードする方法を詳しく説明します。



dltを使用してAPIからデータをロードする方法

https://dlthub.com/docs/tutorial/load-data-from-an-api
このセクションでは、GitHub APIからデータを取得し、DuckDBにロードする方法を説明します。具体的には、dlt-hub/dltリポジトリからイシューをロードします。DuckDBは軽量で使いやすいインプロセスデータベースなので、目的地として選びました。始める前に、DuckDB依存関係を含むdltをインストールしてください:

pip install "dlt[duckdb]"

パイプラインの作成

まず、パイプラインを作成する必要があります。パイプラインはdltの主要な構成要素で、ソースから目的地へデータをロードするために使用されます。github_issues.pyというファイルを作成し、以下のコードを追加してください:

import dlt
from dlt.sources.helpers import requests

# APIエンドポイントのURLを指定
url = "https://api.github.com/repos/dlt-hub/dlt/issues"

# リクエストを行い、成功したかチェック
response = requests.get(url)
response.raise_for_status()

pipeline = dlt.pipeline(
    pipeline_name="github_issues",
    destination="duckdb",
    dataset_name="github_data",
)

# レスポンスにはイシューのリストが含まれています
load_info = pipeline.run(response.json(), table_name="issues")

print(load_info)

このコードは以下のことを行います:

  • GitHub APIエンドポイントにリクエストを送信し、レスポンスが成功したかチェックします。
  • github_issuesという名前のdltパイプラインを作成し、データをduckdb目的地のgithub_dataデータセットにロードするよう指定します。
  • APIレスポンスのデータ(response.json())でパイプラインを実行し、issuesテーブルにデータをロードします。

パイプラインの実行

github_issues.pyを保存し、以下のコマンドを実行してください:

python github_issues.py

データがロードされたら、Streamlitアプリを使用して作成されたデータセットを確認できます:

dlt pipeline github_issues show

データの追加または置換

デフォルトのロードモードはappendです。これは、日次データ更新がある場合などに非常に便利です。最新のデータを取得するには、スクリプトを再度実行する必要がありますが、データを重複させずに行うにはどうすればよいでしょうか?一つの方法は、replace書き込みディスポジションを使用して、目的地の既存テーブルのデータを置き換えるようdltに指示することです。

ロード動作の宣言

これまではrunメソッドにデータを直接渡していましたが、頻繁にデータを分割して受け取り、到着時にロードしたい場合があります。そのような場合、Pythonジェネレータをデータソースとして使用できます。

新しいデータのみをロード(増分ロード)

GitHub APIの例を改善し、最後のロード以降に作成されたイシューのみを取得してみましょう。

import dlt
from dlt.sources.helpers import requests

@dlt.resource(table_name="issues", write_disposition="append")
def get_issues(
    created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
):
    url = (
        "https://api.github.com/repos/dlt-hub/dlt/issues"
        "?per_page=100&sort=created&directions=desc&state=open"
    )

    while True:
        response = requests.get(url)
        response.raise_for_status()
        yield response.json()

        if created_at.start_out_of_range:
            break

        if "next" not in response.links:
            break
        url = response.links["next"]["url"]

pipeline = dlt.pipeline(
    pipeline_name="github_issues_incremental",
    destination="duckdb",
    dataset_name="github_data_append",
)

load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)

このコードでは、@dlt.resourceデコレータを使用してテーブル名を宣言し、append書き込みディスポジションを指定しています。また、dlt.sources.incrementalを使用してcreated_atフィールドを追跡し、新しく作成されたイシューをフィルタリングしています。

ページネーションヘルパーの使用

dltには、APIリクエストを簡素化する組み込みのRESTクライアントがあります。次の例では、paginate()ヘルパーを使用します:

import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    for page in paginate(
        "https://api.github.com/repos/dlt-hub/dlt/issues",
        params={
            "since": updated_at.last_value,
            "per_page": 100,
            "sort": "updated",
            "direction": "desc",
            "state": "open",
        },
    ):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="github_issues_merge",
    destination="duckdb",
    dataset_name="github_data_merge",
)

load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)

この例では、paginate()関数がページネーションを自動的に処理します。

次のステップ

リソースグループ化とシークレットのチュートリアルに進んでください。dltライブラリを最大限に活用したい場合は、既存のビルディングブロックを使用してソースを構築することを強くお勧めします。



dltを使用したリソースのグループ化とシークレットの管理

https://dlthub.com/docs/tutorial/grouping-resources
このチュートリアルでは、前回のGitHub APIの例を拡張し、以下の方法を学びます:

  • 他のGitHub APIエンドポイントからデータをロードする
  • リソースをソースにグループ化して管理を容易にする
  • シークレットと設定を扱う

ソースデコレータの使用

前回のチュートリアルでは、GitHub APIからイシューをロードしました。今回は、コメントもAPIからロードする準備をします。

import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(
    table_name="comments",
    write_disposition="merge",
    primary_key="id",
)
def get_comments(
    updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    for page in paginate(
        "https://api.github.com/repos/dlt-hub/dlt/comments",
        params={"per_page": 100}
    ):
        yield page

@dlt.source
def github_source():
    return [get_issues, get_comments]

pipeline = dlt.pipeline(
    pipeline_name='github_with_source',
    destination='duckdb',
    dataset_name='github_data',
)

load_info = pipeline.run(github_source())
print(load_info)

@dlt.sourceデコレータを使用して、リソースのリストを返す関数を作成しました。これにより、イシューとコメントを1回のパイプライン実行でロードできます。

動的リソース

コードの重複を減らすために、共通のフェッチコードを別の関数に抽出し、両方のリソースで使用できます:

import dlt
from dlt.sources.helpers.rest_client import paginate

BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt"

def fetch_github_data(endpoint, params={}):
    url = f"{BASE_GITHUB_URL}/{endpoint}"
    return paginate(url, params=params)

@dlt.source
def github_source():
    for endpoint in ["issues", "comments"]:
        params = {"per_page": 100}
        yield dlt.resource(
            fetch_github_data(endpoint, params),
            name=endpoint,
            write_disposition="merge",
            primary_key="id",
        )

pipeline = dlt.pipeline(
    pipeline_name='github_dynamic_source',
    destination='duckdb',
    dataset_name='github_data',
)

load_info = pipeline.run(github_source())

この方法では、dlt.resourceを関数として使用し、fetch_github_data()ジェネレータ関数を直接渡しています。

シークレットの扱い

認証が必要なエンドポイントにアクセスするために、シークレットを扱う方法を学びましょう:

from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

def fetch_github_data(endpoint, params={}, access_token=None):
    url = f"{BASE_GITHUB_URL}/{endpoint}"
    return paginate(
        url,
        params=params,
        auth=BearerTokenAuth(token=access_token) if access_token else None,
    )

@dlt.source
def github_source(
    access_token: str = dlt.secrets.value,
):
    for endpoint in ["issues", "comments", "traffic/clones"]:
        params = {"per_page": 100}
        yield dlt.resource(
            fetch_github_data(endpoint, params, access_token),
            name=endpoint,
            write_disposition="merge",
            primary_key="id",
        )

dlt.secrets.valueを使用することで、シークレットを安全に管理できます。シークレットは~/.dlt/secrets.tomlファイルに保存されます。

設定可能なソース

最後に、任意のGitHubリポジトリからデータをロードできるように、ソースを再利用可能にします:

import dlt
from dlt.sources.helpers.rest_client import paginate

BASE_GITHUB_URL = "https://api.github.com/repos/{repo_name}"

def fetch_github_data(repo_name, endpoint, params={}, access_token=None):
    url = BASE_GITHUB_URL.format(repo_name=repo_name) + f"/{endpoint}"
    return paginate(
        url,
        params=params,
        auth=BearerTokenAuth(token=access_token) if access_token else None,
    )

@dlt.source
def github_source(
    repo_name: str = dlt.config.value,
    access_token: str = dlt.secrets.value,
):
    for endpoint in ["issues", "comments", "traffic/clones"]:
        params = {"per_page": 100}
        yield dlt.resource(
            fetch_github_data(repo_name, endpoint, params, access_token),
            name=endpoint,
            write_disposition="merge",
            primary_key="id",
        )

.dlt/config.tomlファイルにrepo_nameパラメータを追加することで、任意のリポジトリからデータをロードできるようになりました。

次のステップ

これでチュートリアルは完了です。さらに学びたい場合は、以下のトピックを探索してみてください:

  • クラウドでのパイプラインの実行
  • トランスフォーマーの接続
  • 動的リソースの作成
  • データの変換とカスタマイズ
  • 本番環境での実行とトレーシング
  • リソースの並列実行と最適化
  • REST APIクライアントヘルパーの使用

dltを使用して、独自のデータパイプラインを構築し、さまざまなソースからデータをロードできるようになりました。

Discussion