初手dlt?
目的
- dltを使ってみる
- dltでできそうなこと・できなさそうなことを洗い出す
Caveat
コードが断片的です
内容
最小限の実装
まずはインストール
面倒なときは公式の colab を使おう
pipx install "dlt[duckdb]==0.4.2"
QuickStart にあるように以下のコードで最低限の実装になる
import dlt
from dlt.sources.helpers import requests
# Create a dlt pipeline that will load
# chess player data to the DuckDB destination
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='duckdb',
dataset_name='player_data'
)
# Grab some player data from Chess.com API
data = []
for player in ['magnuscarlsen', 'rpragchess']:
response = requests.get(f'https://api.chess.com/pub/player/{player}')
response.raise_for_status()
data.append(response.json())
# Extract, normalize, and load the data
pipeline.run(data, table_name='player')
python main.py
で実行すると chess_pipeline.duckdb ができていることを確認できる
ローカルで可視化
vs code の拡張機能で duckdb の中身を操作したかったがこれはうまくいかなかった
より正確には、公式にあるサンプルの.dbは読み込めたが自分で生成した.dbを読み込めなかった
代わりにここでは dlt 組み込みの show コマンドを用いて確認する
その際に依存ライブラリがないと怒られるのでそれも入れる
pip install "pandas==2.2.0"
pip install "streamlit==1.31.0"
dlt pipeline chess_pipeline show
すると streamlit が立ち上がる http://localhost:8501
ちなみにパイプラインは ls $HOME/.dlt/pipelines
で確認できる
プロジェクトの作成
では少し掘り下げてみていく
dlt init コマンドでプロジェクト作成ができる
dlt init <source> <destination>
ここではそれぞれ filesystem, duckdb とした
❯ tree -L 2
.
├── filesystem
│ ├── README.md
│ ├── __init__.py
│ ├── helpers.py
│ ├── readers.py
│ └── settings.py
├── filesystem_pipeline.py
└── requirements.txt
豊富なインテグレーション
ちなみに現時点でsourceは27、destinationは11ある
destination に athena あるのが地味に嬉しい
❯ dlt init --list-verified-sources
Looking up for verified sources in https://github.com/dlt-hub/verified-sources.git...
kinesis: Reads messages from Kinesis queue.
inbox: Reads messages and attachments from e-mail inbox via IMAP protocol
mux: Loads Mux views data using https://docs.mux.com/api-reference
google_sheets: Loads Google Sheets data from tabs, named and explicit ranges. Contains the main source functions.
google_analytics: Defines all the sources and resources needed for Google Analytics V4
pokemon: This source provides data extraction from an example source as a starting point for new pipelines.
pipedrive: Highly customizable source for Pipedrive, supports endpoint addition, selection and column rename
workable: This source uses Workable API and dlt to load data such as Candidates, Jobs, Events, etc. to the database.
sql_database: Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.
facebook_ads: Loads campaigns, ads sets, ads, leads and insight data from Facebook Marketing API
personio: Fetches Personio Employees, Absences, Attendances.
filesystem: Reads files in s3, gs or azure buckets using fsspec and provides convenience resources for chunked reading of various file formats
mongodb: Source that loads collections form any a mongo database, supports incremental loads.
notion: A source that extracts data from Notion API
hubspot: This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint.
airtable: Source that loads tables form Airtable.
shopify_dlt: Fetches Shopify Orders and Products.
jira: This source uses Jira API and dlt to load data such as Issues, Users, Workflows and Projects to the database.
github: Source that load github issues, pull requests and reactions for a specific repository via customizable graphql query. Loads events incrementally.
zendesk: Defines all the sources and resources needed for ZendeskSupport, ZendeskChat and ZendeskTalk
kafka: A source to extract Kafka messages. [needs update: dlt<0.4,>=0.3.25]
asana_dlt: This source provides data extraction from the Asana platform via their API.
unstructured_data: This source converts unstructured data from a specified data resource to structured data using provided queries.
chess: A source loading player profiles and games from chess.com api
salesforce: Source for Salesforce depending on the simple_salesforce python package.
slack: Fetches Slack Conversations, History and logs.
strapi: Basic strapi source
stripe_analytics: This source uses Stripe API and dlt to load data such as Customer, Subscription, Event etc. to the database and to calculate the MRR and churn rate.
matomo: Loads reports and raw visits data from Matomo
destination をみんな大好き bigquery にできる、その場合はサービスアカウントを作成しsecrets.tomlに記載する
{
"type": "service_account",
"project_id": "XXXXX",
"private_key_id": "XXXXX",
"private_key": "XXXXX",
"client_email": "XXXXX@XXXXX.iam.gserviceaccount.com",
"client_id": "XXXXX",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "XXXXX.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
パイプラインは次のように書き換える
pipeline = dlt.pipeline(
pipeline_name="hogehogehoge",
destination="bigquery",
dataset_name="xxxxxxxxxxxxx",
full_refresh = False
)
差分更新upsert と 加工
では bigquery にデータ投入してみよう
iecremental load という機能があるのでupesert とし差分更新されているか見てみる。もちろん複数キーを指定する
ちなみに single atomic transaction で実装されているらしい
投入するデータはローカルにある data/hoge.csv
とする
col1,col22,第二カラム,00xx,lastlast
211,c1,222,233,
111,c2,32,3333,
1121,c3,32,3333,
データ投入時に少し加工してみる。具体的にはカラム名の変更とレコードの置換
(現状、日本語カラムがある場合うまくデータ自体は投入されたがカラム名は変更された)
import dlt
from dlt.sources.helpers import requests
import posixpath
from dlt.common.libs.pyarrow import remove_null_columns
try:
from .filesystem import FileItemDict, filesystem, readers, read_csv # type: ignore
except ImportError:
from filesystem import (
FileItemDict,
filesystem,
readers,
read_csv,
)
pipeline = dlt.pipeline(
pipeline_name="bb",
destination='duckdb',
dataset_name="bbdataset",
)
bucket_url = dlt.secrets.get("sources.filesystem.readers.bucket_url")
file_glob = dlt.secrets.get("sources.filesystem.readers.file_glob")
@dlt.resource(primary_key=("col1", "col22"), write_disposition="merge")
def csv_data():
yield from readers(
bucket_url=posixpath.abspath(bucket_url),
file_glob=file_glob
).read_csv()
def func_rename_col(d):
result = {}
for k, v in d.items():
if k == '第二カラム':
result["rename_col2"] = v
else:
result[k] = v
return result
def pseudonymize_name(doc):
doc['00xx'] = "hashed"
return doc
a = csv_data()
a = a.add_map(func_rename_col).add_map(pseudonymize_name)
for row in a:
print(row)
load_info = pipeline.run(csv_data, table_name="unun")
ローカルのcsvのレコードを書き換えてもう一度パイプラインを実行すると確かにupsertされていた、
sourceのレコードが削除されてもdestinationへは反映されなかった
upesrtのデバッグ
ちなみにcloudloggingのログからどんなクエリが動いてたのか見てみた
DELETE
FROM `hoge`.`xxxxxxxxxxxxx`.`read_csv` AS d
WHERE EXISTS
(
SELECT 1
FROM `hoge`.`xxxxxxxxxxxxx_staging`.`read_csv` AS s
WHERE d.`col1` = s.`col1`);INSERT INTO `hoge`.`xxxxxxxxxxxxx`.`read_csv`
(
`col1`,
`col2`,
`col3`,
`_dlt_load_id`,
`_dlt_id`
)
SELECT `col1`,
`col2`,
`col3`,
`_dlt_load_id`,
`_dlt_id`
FROM (
SELECT row_number() OVER (partition BY `col1` ORDER BY (
SELECT NULL)) AS _dlt_dedup_rn,
`col1`,
`col2`,
`col3`,
`_dlt_load_id`,
`_dlt_id`
FROM `hoge`.`xxxxxxxxxxxxx_staging`.`read_csv` ) AS _dlt_dedup_numbered
WHERE _dlt_dedup_rn = 1;
_staging というサフィックスのテーブルが使われている。ドキュメントにもあるようにこれはmerge用の一時テーブルである
インテグレーションを独自実装しているとわかると思うが merge を使った差分更新は少しだけ面倒、以下のライブラリでも issue が立っている。これを dlt 側が担ってくれるのは地味に嬉しい
スキーマ、データのバリデーション
パイプラインを実装するときにスキーマ変更を検知すると落ちて欲しいなどはあると思う、そんなときはfreezeを使うといい
pipeline.run(
csv_data,
schema_contract="freeze"
)
ただドキュメントを見ている感じ pydantic の機能があるのでスキーマやデータの検証は柔軟にできそうだ
dltのメタデータ
おそらくこの記事を読んでる人は dbt を使っており freshness が気になっているころだろう
メタデータを持ったテーブルが作成されておりその inserted_at を使えば良い
mydata._dlt_loads
デプロイ
デプロイ周りも充実しており githubactions から cloudfunction, airflow などがある
通知、監視
通知の機能は切り出せという考えもありそうだが簡易にスラック通知できるのは嬉しい
まだまだ話を掘り下げたいが一旦ここで終わるが、スキーマ周りとメタデータ周りを時間があるときに追加する
余談
duckdb は count が登場したときに初めて知った
あれから motherduck が出たりしてたがその盛り上がりはかなり限定的だったと思う
少なくとも自分の周りでこの技術に超感動してたのは地理空間データを触ってる人たちのみだった
dltを機にいろいろ盛り上がって欲しい
参考
Discussion