PyAirbyteで始める簡単Data Ingest Pipeline
はじめに
PyAirbyteがリリースされました。(2024/03/16時点ではBeta版なのでご注意を)
PyAirbyteはExtractのコネクタ部分をPythonのライブラリとして提供してPandasに格納するという機能を提供しているらしい。
つまり、BigQueryのクライアントと合わせればExtractとLoadの部分を過疎結合にしつつ、スケジューラーでPythonを呼び出すだけのシンプルなData Ingest Pipelineを作ることが可能なのでは!?ということで検証します。
個人的に考えるData Ingestツールの抱える課題点
- FivetranのようなSaaSを使い始める際は規約確認や、契約がとても面倒
- Airbyteは契約関連の面倒な部分は無いが、運用工数が大きすぎる
- worker, sever, temporal, api, dbなどなど(ちゃんと拡張性を考えてやるなら)
- SaaSの場合はBigQueryにアップロードする際にネットワーク周りで特別な対応が必要なケースがある
- SSHトンネルを行うためのサーバーを一つ用意する必要がある
- スケジューラーとの連携周りが微妙
- Airflowでスケジューリング機能があって、Fivetranでもスケジューリング機能があって、Airflowの各種OperatorでELの機能があって、FivetranでELの機能があってのように機能面の重複があるのも微妙...(Google Cloud workflowsで簡単に呼べ出せない)
- 設定のバージョン管理
- AirbyteではTerraformで管理出来そうだけど、管理物は増える
といった課題があると個人的に思っています。
実装のイメージ
GCPでミニマムにやるならスケジューラーはCloud ScheudlerでPythonの実行をCloud Runとかの組み合わせだと簡単かも?
※ 今回は普通にPythonが実行できるところだけを検証します。
個人的に以下の点がAirbyteやFivetranを管理・導入するのに比べて良い点だと思っています
- Extractで利用するツールと、Loadで利用するツールがPandasで過疎結合になっている
- 部分的な置き換えがしやすい
- 中身はExtractの部分はPythonのコードで実装するので、バージョン管理やlintでの管理がしやすい
- Airbyteと比べて管理する対象が少ない
- スケジューラーと相互連携しやすい
- 実行ログもスケジューラーに統一できる
逆に微妙な点としては
- GUIが無いのでPythonが触れないビジネスサイドは使いにくい
- Airbyteは日本国内のSaaSのコネクタが少ないので、連携したいSaaSによっては使えない
といった点が微妙かなと考えています。
具体的な実装
セットアップ
# python 3.11.4で作成しました(python >= 3.9なら大丈夫なはず)
python -m venv .venv
# 必要なライブラリをinstall
source .venv/bin/activate
pip install --upgrade pip
pip install 'git+https://github.com/airbytehq/PyAirbyte.git'
pip install google-cloud-bigquery
pip install pyarrow
requirements.txt
airbyte @ git+https://github.com/airbytehq/PyAirbyte.git@8f6c011db163efba3122f3ec55b2d7e118ccaff0
airbyte-cdk==0.58.9
airbyte-protocol-models==0.5.1
asn1crypto==1.5.1
attrs==23.2.0
backoff==2.2.1
bracex==2.4
cachetools==5.3.3
cattrs==23.2.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
cryptography==41.0.7
Deprecated==1.2.14
dpath==2.0.8
duckdb==0.9.2
duckdb-engine==0.9.2
filelock==3.13.1
genson==1.2.2
google-api-core==2.17.1
google-auth==2.28.2
google-cloud-bigquery==3.19.0
google-cloud-core==2.4.1
google-crc32c==1.5.0
google-resumable-media==2.7.0
googleapis-common-protos==1.63.0
grpcio==1.62.1
grpcio-status==1.62.1
idna==3.6
isodate==0.6.1
Jinja2==3.1.3
jsonref==0.3.0
jsonschema==3.2.0
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
numpy==1.26.4
orjson==3.9.15
overrides==7.7.0
packaging==24.0
pandas==2.1.4
pendulum==2.1.2
platformdirs==3.11.0
protobuf==4.25.3
psycopg2-binary==2.9.9
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.21
pydantic==1.10.14
Pygments==2.17.2
PyJWT==2.8.0
pyOpenSSL==23.3.0
pyrate-limiter==3.1.1
pyrsistent==0.20.0
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-ulid==2.2.0
pytz==2024.1
pytzdata==2020.1
PyYAML==6.0.1
requests==2.31.0
requests-cache==1.2.0
rich==13.7.1
rsa==4.9
six==1.16.0
snowflake-connector-python==3.6.0
snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0
SQLAlchemy==1.4.51
sqlalchemy-bigquery==1.9.0
tomlkit==0.12.4
types-PyYAML==6.0.12.20240311
typing_extensions==4.10.0
tzdata==2024.1
ulid==1.1
url-normalize==1.4.3
urllib3==2.2.1
wcmatch==8.4
wrapt==1.16.0
ファイルを作成
"""PyAirbyte demo"""
import json
import airbyte as ab
from google.cloud import bigquery
from google.oauth2 import service_account
# 検証用のため雑に文字列をペーストして検証した。
# 本番運用では真似しないでください。
CREDENTIAL_JSON = r"""
"""
credential_dict = json.loads(CREDENTIAL_JSON)
source = ab.get_source(
# source-{connector名}で指定する
# 利用可能なコネクタ名は以下を指定
# see. https://docs.airbyte.com/using-airbyte/pyairbyte/getting-started#available-connectors
name="source-gcs",
# 各コネクタの設定を記述
# gcsの場合は以下を参照した
# see. https://docs.airbyte.com/integrations/sources/gcs#reference
config={
"streams": [
{
"name": "gcs-demo", # ここの名前は任意
"format": {
"filetype": "csv",
"skip_rows_after_header": 1,
"header_definition": {
"header_definition_type": "User Provided",
# 後続のpandasのロードで英語名のカラム名を指定しないと上手くロード出来なかった
"column_names": [
"name",
"id",
],
},
},
# ここも **/*.csvなどが使えるはずだが上手く動かなかった
# bucket直下にフォルダを作って区切ると上手くいった(GCSなので厳密なフォルダではなくキー名だけど)
"globs": ["upload/*.csv"],
# これはheader_definitionと一致している必要がある
# ここを指定しないとcsvの場合は全ての項目がstringで処理される
"input_schema": '{"name":"string","id":"integer"}',
}
],
"service_account": CREDENTIAL_JSON,
"bucket": "{bucket_name}",
},
install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read(force_full_refresh=True)
gcp_service_account = service_account.Credentials.from_service_account_info(
credential_dict
)
client = bigquery.Client(project="{project_id}", credentials=gcp_service_account)
for name, records in result.streams.items():
records_pandas = records.to_pandas()
client.load_table_from_dataframe(
dataframe=records_pandas,
destination="{dataset_id}.{table_name}",
)
感想
簡単な手順と簡単なコードでお手軽データ連携できるのはとても良かった。
ツールも疎結合で繋がっていて、Loadの部分は公式のライブラリを使うので安心だし新機能も使いやすくなるのは嬉しい。Extractの部分もコミュニティが強く欲しいと思った機能がよく実装されるのはいい感じ。
一方で、Beta版なのもあってドキュメントが不足していたりinstallが上手くいかないとかは結構詰まったので、すぐに採用は難しいかも。
最初はS3からのロードを検証しようと思ったけど、上手く動かずに失敗した
こんな感じのエラーが出ている
snappyのinstallがpoetryベースのプロジェクトでないと正常に行かないらしいけど、poetryで管理しても新しくvenv(install_if_missing=True)で作る際にpipでinstallしに行くから落ちているっぽい。
多分poetryでsource-s3でinstallする環境をpyairbyteの環境と別で用意すれば出来るのだろうけど...
self._run_subprocess_and_raise_on_failure(
File "/Users/kashira/Documents/work/pyairbyte_bigquery_dataframes/.venv/lib/python3.11/site-packages/airbyte/_executor.py", line 176, in _run_subprocess_and_raise_on_failure
raise exc.AirbyteSubprocessFailedError(
airbyte.exceptions.AirbyteSubprocessFailedError: AirbyteSubprocessFailedError: Subprocess failed.
Run Args: ['/Users/kashira/Documents/work/pyairbyte_bigquery_dataframes/.venv-source-s3/bin/pip', 'install', 'airbyte-source-s3']
Exit Code: 1
Log output:
error: subprocess-exited-with-error
× Building wheel for python-snappy (pyproject.toml) did not run successfully.
│ exit code: 1
╰─> [27 lines of output]
/private/var/folders/zp/38rxgcyd5m14drsx0gckmhww0000gn/T/pip-build-env-ft_xdaok/overlay/lib/python3.11/site-packages/setuptools/_distutils/dist.py:265: UserWarning: Unknown distribution option: 'cffi_modules'
warnings.warn(msg)
running bdist_wheel
running build
running build_py
creating build
creating build/lib.macosx-13.4-arm64-cpython-311
creating build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/snappy.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/snappy_cffi.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/__init__.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/hadoop_snappy.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/snappy_formats.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/__main__.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
copying src/snappy/snappy_cffi_builder.py -> build/lib.macosx-13.4-arm64-cpython-311/snappy
running build_ext
building 'snappy._snappy' extension
creating build/temp.macosx-13.4-arm64-cpython-311
creating build/temp.macosx-13.4-arm64-cpython-311/src
creating build/temp.macosx-13.4-arm64-cpython-311/src/snappy
clang -Wsign-compare -Wunreachable-code -DNDEBUG -g -fwrapv -O3 -Wall -I/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include -I/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include -I/Users/kashira/Documents/work/pyairbyte_bigquery_dataframes/.venv-source-s3/include -I/Users/kashira/.pyenv/versions/3.11.4/include/python3.11 -c src/snappy/crc32c.c -o build/temp.macosx-13.4-arm64-cpython-311/src/snappy/crc32c.o
clang -Wsign-compare -Wunreachable-code -DNDEBUG -g -fwrapv -O3 -Wall -I/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include -I/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include -I/Users/kashira/Documents/work/pyairbyte_bigquery_dataframes/.venv-source-s3/include -I/Users/kashira/.pyenv/versions/3.11.4/include/python3.11 -c src/snappy/snappymodule.cc -o build/temp.macosx-13.4-arm64-cpython-311/src/snappy/snappymodule.o
src/snappy/snappymodule.cc:33:10: fatal error: 'snappy-c.h' file not found
#include <snappy-c.h>
^~~~~~~~~~~~
1 error generated.
error: command '/usr/bin/clang' failed with exit code 1
[end of output]
note: This error originates from a subprocess, and is likely not a problem with pip.
ERROR: Failed building wheel for python-snappy
ERROR: Could not build wheels for python-snappy, which is required to install pyproject.toml-based projects
パフォーマンスが気になるけど、面倒でやっていないので誰か検証したら教えてください🙇♂️
またPyAirbyte以外にもExtractをPythonで簡単に行えるOSSは最近出ているので、別のOSSを使うのも良さそうで気になっています。(こんなツールがおすすめなどあれば教えてください)
ここまで読んでいただきありがとうございました。
Discussion