🦔

初手dlt?

2024/02/07に公開

目的

  • dltを使ってみる
  • dltでできそうなこと・できなさそうなことを洗い出す

https://dlthub.com/

Caveat

コードが断片的です

内容

最小限の実装

まずはインストール

面倒なときは公式の colab を使おう

https://colab.research.google.com/drive/1NfSB1DpwbbHX9_t5vlalBTf13utwpMGx?usp=sharing#scrollTo=GSfjew3L5Efg

pipx install "dlt[duckdb]==0.4.2"

QuickStart にあるように以下のコードで最低限の実装になる

import dlt
from dlt.sources.helpers import requests
# Create a dlt pipeline that will load
# chess player data to the DuckDB destination
pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)
# Grab some player data from Chess.com API
data = []
for player in ['magnuscarlsen', 'rpragchess']:
    response = requests.get(f'https://api.chess.com/pub/player/{player}')
    response.raise_for_status()
    data.append(response.json())
# Extract, normalize, and load the data
pipeline.run(data, table_name='player')
python main.py

で実行すると chess_pipeline.duckdb ができていることを確認できる

ローカルで可視化

vs code の拡張機能で duckdb の中身を操作したかったがこれはうまくいかなかった
より正確には、公式にあるサンプルの.dbは読み込めたが自分で生成した.dbを読み込めなかった

https://marketplace.visualstudio.com/items?itemName=RandomFractalsInc.duckdb-sql-tools

代わりにここでは dlt 組み込みの show コマンドを用いて確認する
その際に依存ライブラリがないと怒られるのでそれも入れる

pip install "pandas==2.2.0"
pip install "streamlit==1.31.0"

dlt pipeline chess_pipeline show

すると streamlit が立ち上がる http://localhost:8501
ちなみにパイプラインは ls $HOME/.dlt/pipelines で確認できる

プロジェクトの作成

では少し掘り下げてみていく
dlt init コマンドでプロジェクト作成ができる

dlt init <source> <destination>

ここではそれぞれ filesystem, duckdb とした

❯ tree -L 2                                       
.
├── filesystem
│   ├── README.md
│   ├── __init__.py
│   ├── helpers.py
│   ├── readers.py
│   └── settings.py
├── filesystem_pipeline.py
└── requirements.txt

豊富なインテグレーション

ちなみに現時点でsourceは27、destinationは11ある

destination に athena あるのが地味に嬉しい

https://dlthub.com/docs/dlt-ecosystem

❯  dlt init --list-verified-sources
Looking up for verified sources in https://github.com/dlt-hub/verified-sources.git...
kinesis: Reads messages from Kinesis queue.
inbox: Reads messages and attachments from e-mail inbox via IMAP protocol
mux: Loads Mux views data using https://docs.mux.com/api-reference
google_sheets: Loads Google Sheets data from tabs, named and explicit ranges. Contains the main source functions.
google_analytics: Defines all the sources and resources needed for Google Analytics V4
pokemon: This source provides data extraction from an example source as a starting point for new pipelines.
pipedrive: Highly customizable source for Pipedrive, supports endpoint addition, selection and column rename
workable: This source uses Workable API and dlt to load data such as Candidates, Jobs, Events, etc. to the database.
sql_database: Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.
facebook_ads: Loads campaigns, ads sets, ads, leads and insight data from Facebook Marketing API
personio: Fetches Personio Employees, Absences, Attendances.
filesystem: Reads files in s3, gs or azure buckets using fsspec and provides convenience resources for chunked reading of various file formats
mongodb: Source that loads collections form any a mongo database, supports incremental loads.
notion: A source that extracts data from Notion API
hubspot: This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint.
airtable: Source that loads tables form Airtable.
shopify_dlt: Fetches Shopify Orders and Products.
jira: This source uses Jira API and dlt to load data such as Issues, Users, Workflows and Projects to the database.
github: Source that load github issues, pull requests and reactions for a specific repository via customizable graphql query. Loads events incrementally.
zendesk: Defines all the sources and resources needed for ZendeskSupport, ZendeskChat and ZendeskTalk
kafka: A source to extract Kafka messages. [needs update: dlt<0.4,>=0.3.25]
asana_dlt: This source provides data extraction from the Asana platform via their API.
unstructured_data: This source converts unstructured data from a specified data resource to structured data using provided queries.
chess: A source loading player profiles and games from chess.com api
salesforce: Source for Salesforce depending on the simple_salesforce python package.
slack: Fetches Slack Conversations, History and logs.
strapi: Basic strapi source
stripe_analytics: This source uses Stripe API and dlt to load data such as Customer, Subscription, Event etc. to the database and to calculate the MRR and churn rate.
matomo: Loads reports and raw visits data from Matomo

destination をみんな大好き bigquery にできる、その場合はサービスアカウントを作成しsecrets.tomlに記載する

{
  "type": "service_account",
  "project_id": "XXXXX",
  "private_key_id": "XXXXX",
  "private_key": "XXXXX",
  "client_email": "XXXXX@XXXXX.iam.gserviceaccount.com",
  "client_id": "XXXXX",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "XXXXX.iam.gserviceaccount.com",
  "universe_domain": "googleapis.com"
}

パイプラインは次のように書き換える

 pipeline = dlt.pipeline(
   pipeline_name="hogehogehoge",
  destination="bigquery",
   dataset_name="xxxxxxxxxxxxx",
  full_refresh = False
)

差分更新upsert と 加工

では bigquery にデータ投入してみよう

iecremental load という機能があるのでupesert とし差分更新されているか見てみる。もちろん複数キーを指定する
ちなみに single atomic transaction で実装されているらしい

投入するデータはローカルにある data/hoge.csv とする

col1,col22,第二カラム,00xx,lastlast
211,c1,222,233,
111,c2,32,3333,
1121,c3,32,3333,

データ投入時に少し加工してみる。具体的にはカラム名の変更とレコードの置換

(現状、日本語カラムがある場合うまくデータ自体は投入されたがカラム名は変更された)

https://dlthub.com/docs/general-usage/incremental-loading

import dlt
from dlt.sources.helpers import requests
import posixpath
from dlt.common.libs.pyarrow import remove_null_columns

try:
    from .filesystem import FileItemDict, filesystem, readers, read_csv  # type: ignore
except ImportError:
    from filesystem import (
        FileItemDict,
        filesystem,
        readers,
        read_csv,
    )


pipeline = dlt.pipeline(
    pipeline_name="bb",
    destination='duckdb',
    dataset_name="bbdataset",
)

bucket_url = dlt.secrets.get("sources.filesystem.readers.bucket_url")
file_glob = dlt.secrets.get("sources.filesystem.readers.file_glob")


@dlt.resource(primary_key=("col1", "col22"), write_disposition="merge")
def csv_data():
    yield from readers(
        bucket_url=posixpath.abspath(bucket_url),
        file_glob=file_glob
    ).read_csv()

def func_rename_col(d):
    result = {}
    for k, v in d.items():
        if k == '第二カラム':
            result["rename_col2"] = v
        else:
            result[k] = v
    return result

def pseudonymize_name(doc):
    doc['00xx'] = "hashed"
    return doc


a = csv_data()
a = a.add_map(func_rename_col).add_map(pseudonymize_name)
for row in a:
    print(row)



load_info = pipeline.run(csv_data, table_name="unun")

ローカルのcsvのレコードを書き換えてもう一度パイプラインを実行すると確かにupsertされていた、
sourceのレコードが削除されてもdestinationへは反映されなかった

upesrtのデバッグ

ちなみにcloudloggingのログからどんなクエリが動いてたのか見てみた

DELETE
FROM   `hoge`.`xxxxxxxxxxxxx`.`read_csv` AS d
WHERE  EXISTS
       (
              SELECT 1
              FROM   `hoge`.`xxxxxxxxxxxxx_staging`.`read_csv` AS s
              WHERE  d.`col1` = s.`col1`);INSERT INTO `hoge`.`xxxxxxxxxxxxx`.`read_csv`
            (
                        `col1`,
                        `col2`,
                        `col3`,
                        `_dlt_load_id`,
                        `_dlt_id`
            )
SELECT `col1`,
       `col2`,
       `col3`,
       `_dlt_load_id`,
       `_dlt_id`
FROM   (
                SELECT   row_number() OVER (partition BY `col1` ORDER BY (
                                SELECT NULL)) AS _dlt_dedup_rn,
                         `col1`,
                         `col2`,
                         `col3`,
                         `_dlt_load_id`,
                         `_dlt_id`
                FROM     `hoge`.`xxxxxxxxxxxxx_staging`.`read_csv` ) AS _dlt_dedup_numbered
WHERE  _dlt_dedup_rn = 1;

_staging というサフィックスのテーブルが使われている。ドキュメントにもあるようにこれはmerge用の一時テーブルである

インテグレーションを独自実装しているとわかると思うが merge を使った差分更新は少しだけ面倒、以下のライブラリでも issue が立っている。これを dlt 側が担ってくれるのは地味に嬉しい

https://github.com/googleapis/python-bigquery-pandas/issues/323

スキーマ、データのバリデーション

パイプラインを実装するときにスキーマ変更を検知すると落ちて欲しいなどはあると思う、そんなときはfreezeを使うといい

pipeline.run(
  csv_data,
  schema_contract="freeze"
)

ただドキュメントを見ている感じ pydantic の機能があるのでスキーマやデータの検証は柔軟にできそうだ

https://dlthub.com/docs/general-usage/schema-contracts

https://dlthub.com/docs/general-usage/schema

dltのメタデータ

おそらくこの記事を読んでる人は dbt を使っており freshness が気になっているころだろう

https://dlthub.com/docs/general-usage/destination-tables#load-packages-and-load-ids

メタデータを持ったテーブルが作成されておりその inserted_at を使えば良い

mydata._dlt_loads

https://dlthub.com/docs/general-usage/state

デプロイ

デプロイ周りも充実しており githubactions から cloudfunction, airflow などがある

https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/

通知、監視

通知の機能は切り出せという考えもありそうだが簡易にスラック通知できるのは嬉しい

https://dlthub.com/docs/running-in-production/running

まだまだ話を掘り下げたいが一旦ここで終わるが、スキーマ周りとメタデータ周りを時間があるときに追加する

余談

duckdb は count が登場したときに初めて知った
あれから motherduck が出たりしてたがその盛り上がりはかなり限定的だったと思う

少なくとも自分の周りでこの技術に超感動してたのは地理空間データを触ってる人たちのみだった

https://github.com/pacificspatial/flateau/

dltを機にいろいろ盛り上がって欲しい

参考

https://count.co

https://motherduck.com/

Discussion