【dltHub Docs】Tutorial
概要
Version: 0.5.4 (latest) の参照メモ
再出力方法
https://dlthub.com/docs/tutorial/intro
日本語に翻訳してZenn用のマークダウンに整形して
https://dlthub.com/docs/tutorial/load-data-from-an-api
日本語に翻訳してZenn用のマークダウンに整形して
https://dlthub.com/docs/tutorial/grouping-resources
日本語に翻訳してZenn用のマークダウンに整形して
dltを使用したデータパイプライン構築チュートリアル
dltを使用してデータパイプラインを効率的に構築する方法のチュートリアルへようこそ。このチュートリアルでは、dltの基本的な概念を紹介し、基本的および高度な使用シナリオについてガイドします。実践的な例として、GitHub APIからデータを取得し、DuckDBにロードするデータパイプラインを構築します。
学習内容
このチュートリアルでは、以下の内容を学びます:
- GitHub APIからのデータ取得
- データロード動作の理解と管理
- 新しいデータの増分ロードと既存データの重複排除
- データ取得をより動的にし、コードの冗長性を減らす
- シークレットの安全な取り扱い
- 再利用可能なデータソースの作成
さあ、始めましょう!
APIからデータをロードすることから始めます。次のセクションでは、GitHub APIを使用してデータを取得し、DuckDBにロードする方法を詳しく説明します。
dltを使用してAPIからデータをロードする方法
このセクションでは、GitHub APIからデータを取得し、DuckDBにロードする方法を説明します。具体的には、dlt-hub/dltリポジトリからイシューをロードします。DuckDBは軽量で使いやすいインプロセスデータベースなので、目的地として選びました。始める前に、DuckDB依存関係を含むdltをインストールしてください:
pip install "dlt[duckdb]"
パイプラインの作成
まず、パイプラインを作成する必要があります。パイプラインはdltの主要な構成要素で、ソースから目的地へデータをロードするために使用されます。github_issues.py
というファイルを作成し、以下のコードを追加してください:
import dlt
from dlt.sources.helpers import requests
# APIエンドポイントのURLを指定
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
# リクエストを行い、成功したかチェック
response = requests.get(url)
response.raise_for_status()
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="duckdb",
dataset_name="github_data",
)
# レスポンスにはイシューのリストが含まれています
load_info = pipeline.run(response.json(), table_name="issues")
print(load_info)
このコードは以下のことを行います:
- GitHub APIエンドポイントにリクエストを送信し、レスポンスが成功したかチェックします。
-
github_issues
という名前のdltパイプラインを作成し、データをduckdb
目的地のgithub_data
データセットにロードするよう指定します。 - APIレスポンスのデータ(
response.json()
)でパイプラインを実行し、issues
テーブルにデータをロードします。
パイプラインの実行
github_issues.py
を保存し、以下のコマンドを実行してください:
python github_issues.py
データがロードされたら、Streamlitアプリを使用して作成されたデータセットを確認できます:
dlt pipeline github_issues show
データの追加または置換
デフォルトのロードモードはappend
です。これは、日次データ更新がある場合などに非常に便利です。最新のデータを取得するには、スクリプトを再度実行する必要がありますが、データを重複させずに行うにはどうすればよいでしょうか?一つの方法は、replace
書き込みディスポジションを使用して、目的地の既存テーブルのデータを置き換えるようdltに指示することです。
ロード動作の宣言
これまではrun
メソッドにデータを直接渡していましたが、頻繁にデータを分割して受け取り、到着時にロードしたい場合があります。そのような場合、Pythonジェネレータをデータソースとして使用できます。
新しいデータのみをロード(増分ロード)
GitHub APIの例を改善し、最後のロード以降に作成されたイシューのみを取得してみましょう。
import dlt
from dlt.sources.helpers import requests
@dlt.resource(table_name="issues", write_disposition="append")
def get_issues(
created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
):
url = (
"https://api.github.com/repos/dlt-hub/dlt/issues"
"?per_page=100&sort=created&directions=desc&state=open"
)
while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()
if created_at.start_out_of_range:
break
if "next" not in response.links:
break
url = response.links["next"]["url"]
pipeline = dlt.pipeline(
pipeline_name="github_issues_incremental",
destination="duckdb",
dataset_name="github_data_append",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
このコードでは、@dlt.resource
デコレータを使用してテーブル名を宣言し、append
書き込みディスポジションを指定しています。また、dlt.sources.incremental
を使用してcreated_at
フィールドを追跡し、新しく作成されたイシューをフィルタリングしています。
ページネーションヘルパーの使用
dltには、APIリクエストを簡素化する組み込みのRESTクライアントがあります。次の例では、paginate()
ヘルパーを使用します:
import dlt
from dlt.sources.helpers.rest_client import paginate
@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"since": updated_at.last_value,
"per_page": 100,
"sort": "updated",
"direction": "desc",
"state": "open",
},
):
yield page
pipeline = dlt.pipeline(
pipeline_name="github_issues_merge",
destination="duckdb",
dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
この例では、paginate()
関数がページネーションを自動的に処理します。
次のステップ
リソースグループ化とシークレットのチュートリアルに進んでください。dltライブラリを最大限に活用したい場合は、既存のビルディングブロックを使用してソースを構築することを強くお勧めします。
dltを使用したリソースのグループ化とシークレットの管理
このチュートリアルでは、前回のGitHub APIの例を拡張し、以下の方法を学びます:
- 他のGitHub APIエンドポイントからデータをロードする
- リソースをソースにグループ化して管理を容易にする
- シークレットと設定を扱う
ソースデコレータの使用
前回のチュートリアルでは、GitHub APIからイシューをロードしました。今回は、コメントもAPIからロードする準備をします。
import dlt
from dlt.sources.helpers.rest_client import paginate
@dlt.resource(
table_name="comments",
write_disposition="merge",
primary_key="id",
)
def get_comments(
updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/comments",
params={"per_page": 100}
):
yield page
@dlt.source
def github_source():
return [get_issues, get_comments]
pipeline = dlt.pipeline(
pipeline_name='github_with_source',
destination='duckdb',
dataset_name='github_data',
)
load_info = pipeline.run(github_source())
print(load_info)
@dlt.source
デコレータを使用して、リソースのリストを返す関数を作成しました。これにより、イシューとコメントを1回のパイプライン実行でロードできます。
動的リソース
コードの重複を減らすために、共通のフェッチコードを別の関数に抽出し、両方のリソースで使用できます:
import dlt
from dlt.sources.helpers.rest_client import paginate
BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt"
def fetch_github_data(endpoint, params={}):
url = f"{BASE_GITHUB_URL}/{endpoint}"
return paginate(url, params=params)
@dlt.source
def github_source():
for endpoint in ["issues", "comments"]:
params = {"per_page": 100}
yield dlt.resource(
fetch_github_data(endpoint, params),
name=endpoint,
write_disposition="merge",
primary_key="id",
)
pipeline = dlt.pipeline(
pipeline_name='github_dynamic_source',
destination='duckdb',
dataset_name='github_data',
)
load_info = pipeline.run(github_source())
この方法では、dlt.resource
を関数として使用し、fetch_github_data()
ジェネレータ関数を直接渡しています。
シークレットの扱い
認証が必要なエンドポイントにアクセスするために、シークレットを扱う方法を学びましょう:
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
def fetch_github_data(endpoint, params={}, access_token=None):
url = f"{BASE_GITHUB_URL}/{endpoint}"
return paginate(
url,
params=params,
auth=BearerTokenAuth(token=access_token) if access_token else None,
)
@dlt.source
def github_source(
access_token: str = dlt.secrets.value,
):
for endpoint in ["issues", "comments", "traffic/clones"]:
params = {"per_page": 100}
yield dlt.resource(
fetch_github_data(endpoint, params, access_token),
name=endpoint,
write_disposition="merge",
primary_key="id",
)
dlt.secrets.value
を使用することで、シークレットを安全に管理できます。シークレットは~/.dlt/secrets.toml
ファイルに保存されます。
設定可能なソース
最後に、任意のGitHubリポジトリからデータをロードできるように、ソースを再利用可能にします:
import dlt
from dlt.sources.helpers.rest_client import paginate
BASE_GITHUB_URL = "https://api.github.com/repos/{repo_name}"
def fetch_github_data(repo_name, endpoint, params={}, access_token=None):
url = BASE_GITHUB_URL.format(repo_name=repo_name) + f"/{endpoint}"
return paginate(
url,
params=params,
auth=BearerTokenAuth(token=access_token) if access_token else None,
)
@dlt.source
def github_source(
repo_name: str = dlt.config.value,
access_token: str = dlt.secrets.value,
):
for endpoint in ["issues", "comments", "traffic/clones"]:
params = {"per_page": 100}
yield dlt.resource(
fetch_github_data(repo_name, endpoint, params, access_token),
name=endpoint,
write_disposition="merge",
primary_key="id",
)
.dlt/config.toml
ファイルにrepo_name
パラメータを追加することで、任意のリポジトリからデータをロードできるようになりました。
次のステップ
これでチュートリアルは完了です。さらに学びたい場合は、以下のトピックを探索してみてください:
- クラウドでのパイプラインの実行
- トランスフォーマーの接続
- 動的リソースの作成
- データの変換とカスタマイズ
- 本番環境での実行とトレーシング
- リソースの並列実行と最適化
- REST APIクライアントヘルパーの使用
dltを使用して、独自のデータパイプラインを構築し、さまざまなソースからデータをロードできるようになりました。
Discussion