🚢

Claude Codeと走り切るELTパイプライン移行

に公開

ELTパイプラインを Airbyte Cloud に移行しました

この記事では、データパイプライン移行に伴って発生した問題と、その対処をログとして残しておきます。
同様のことを試している人が検索して見つけた時、すこしでも役に立つ内容になっていれば幸いです。

移行の背景

2年前、メダリオンアーキテクチャを採用したDWHをBigQuery上に構築しました。

いろいろとデータ不整合やコスト増加に苦しみながらも、LookerはじめBIツールと接続して社内で便利に使っていました。

課題

ただし、2年の間に当時0.x系だったAirbyteは2.x系までアップデートされる中、社内システムのアップデートはできていませんでした。理由は:

  • ツール移行を試すと、過去データとの不整合が出る & 原因調査に時間を投資できていなかった
  • 古い機能に依存したテーブルが混ざっており、移行コストが見積もりできなかった

なぜ移行を決意したのか

0.x系では

  • カスタムコネクターの実装がかなり難しい(ネストしたJSONリクエストを動的に送れない)
  • 新種のコネクターを軒並み使えない

という課題があり、そのせいでAirbyteがあるにも関わらずデータ同期のスクリプトを社内で別途開発したりしていました。そんなコストをこの先も支払うくらいなら移行してしまおう!というのが今回の意思決定の背景です。

移行先の比較・検討

次に目的の達成方法を検討する事になりますが、今回は以下のように検討し、Airbyte Cloudに決めました。

Airbyte Cloud + dbt Cloud

  • ✅ Airbyte系のすべてのメリットを享受できる
  • ❌ 費用は自前運用より若干上がる

Airbyte OSSの新しいバージョンをデプロイする

  • ✅ データ連携パイプラインの更新自体はできる
  • ❌ dbt連携が組込み機能に存在しない(アプデを経て有償版限定機能になっていました)
  • ❌ 別途Airbyteの状態をpullして動くシステムの開発が必要

Custom transformation is not available for Airbyte Open-Source.
from: https://docs.airbyte.com/platform/using-airbyte/core-concepts/

Airbyteは塩漬けし、スクラッチ開発を繰り返す

  • ✅ 時間はかかるが、確実に成功することが分かっている
  • ❌ メンテしなければいけないシステム資産が増える
  • ❌ 似たような別要件に対応するために、何度もシステム構成を作り直すコストがかかる

Airbyte1.0系でなんとかする

  • ✅ dbt-core連携が組み込まれている
  • ❌ ほしいコネクタが存在しない。かつ、カスタムコネクタでネストしたフィールドに書き込みができない。

全く違うデータパイプラインを探し直す

  • ✅ 長期利用できる新しいシステム資産になる可能性がある
  • ❌ 探すのにも移行にも時間がかかる
  • ❌ 徒労に終わる可能性があり、投資リスクも高い

移行アプローチ

今回の移行準備では、以下のステップを踏みながら移行しました。

フェーズ1: 準備

  • Airbyte Cloud環境のセットアップ
  • 開発用スキーマで同期開始
  • dbtマクロの改修

フェーズ2: 検証

  • LLMとタッグを組み、包括的なデータ整合性チェック
  • ビジネスロジックの動作確認

フェーズ3: 本番切り替え

  • 本番環境への適用

移行前検証の手法

Claude Codeに bq コマンドの実行権限を渡し、複数スキーマを横断してクエリを発行させることで、人間よりも遥かに高速に、正確な差分調査を行うことができました。

人間がやるなら

  1. クエリを書く
  2. 結果をExcelやNotebookに取り込み
  3. 目grepや集計単位を変更して、インサイトを得る

という手順を踏むところ、LLMに任せると高速にクエリを書きながらPDCAを回してくれます。

データ移行などの正解が確実にわかり、データアクセスの手法も確立しているタスクは、まさにLLMの得意分野でした。

実施した検証

全データの全一致は理想ですが、データロードの仕方から変わっているため、そこは諦めて主要なKPIを集計することで、確率的に移行内容が確からしいと判断できる最低ラインを模索しました。

ビジネスインパクトの大きいKPIでいうと、以下は確実に担保する必要があると判断し、完全一致を実現できるか検証を進めました。

  • (そもそも)行数
  • タイムスタンプやタイムゾーンの誤りが無いか
  • 金額などの数値系プロパティに誤りが無いか

実装例は以下:

comparison.sql
WITH prod_data AS (
  -- 本番環境のデータを集計
  SELECT
    作成月,
    ライフサイクルステージ,
    COUNT(*) as record_count
  FROM 本番環境.テーブル
  GROUP BY 作成月, ライフサイクルステージ
),

new_data AS (
  -- 新環境のデータを集計(同じロジック)
  SELECT ...
  FROM 新環境.テーブル
  ...
),

comparison AS (
  -- FULL OUTER JOINで差分を検出
  SELECT
    COALESCE(p.作成月, n.作成月) as 作成月,
    COALESCE(p.ステージ, n.ステージ) as ステージ,
    COALESCE(p.record_count, 0) as prod_count,
    COALESCE(n.record_count, 0) as new_count,
    n.record_count - p.record_count as diff
  FROM prod_data p
  FULL OUTER JOIN new_data n
    ON p.作成月 = n.作成月 AND p.ステージ = n.ステージ
)

SELECT
  *,
  CASE
    WHEN diff = 0 THEN '✓ OK'
    WHEN diff > 0 THEN '⚠ 新環境の方が多い'
    WHEN diff < 0 THEN '⚠ 本番環境の方が多い'
  END as status
FROM comparison
WHERE diff != 0  -- 差分があるもののみ表示
ORDER BY 作成月 DESC
run_comparison.sh
bq query --use_legacy_sql=false < test_scripts/validate_contacts.sql

トラブルシューティング

よくある問題と解決策

HubSpotコネクターのバージョン差分

今回のバージョンアップで、JSON APIの戻り値が勝手に平坦化されるようになりました。

contacts
  ├─ id, createdAt, archived (トップレベル)
  ├─ properties (JSON - 全プロパティ)
  └─ properties_* (個別カラムとして平坦化済み) → 追加
       ├─ properties_lifecyclestage
       ├─ properties_email
       └─ properties_firstname

これまでdbtマクロで平坦化していたので、そこは楽になったのですが、 properties_ というprefixは今後のシステム運用で絶対に無視したく、マクロを用意しました。

remove_properties_prefix.sql
{% macro remove_properties_prefix(dataset_name, table_name) %}

-- このマクロは、Airbyte Cloudが出力した properties_* カラムから
-- properties_ プレフィックスを除去して、既存のスキーマと互換性を保つ

-- 1. テーブルのカラム一覧を取得
{% set get_columns_query %}
select
    column_name,
    data_type
from {{ dataset_name }}.INFORMATION_SCHEMA.COLUMNS
where
    table_name = '{{ table_name }}'
order by ordinal_position
{% endset %}

{% if execute %}

{% set columns = run_query(get_columns_query).rows %}

-- 2. カラムを3つのグループに分類
{% set airbyte_meta_columns = [] %}
{% set top_level_columns = [] %}
{% set properties_columns = [] %}

{% for col in columns %}
    {% set col_name = col["column_name"] %}

    {% if col_name.startswith('_airbyte_') %}
        {# Airbyteメタデータカラムは除外 #}
        {% do airbyte_meta_columns.append(col_name) %}

    {% elif col_name == 'properties' %}
        {# properties JSONカラムも除外 #}

    {% elif col_name.startswith('properties_') %}
        {# properties_* カラムはプレフィックスを除去 #}
        {% do properties_columns.append(col_name) %}

    {% else %}
        {# トップレベルカラムはそのまま #}
        {% do top_level_columns.append(col_name) %}

    {% endif %}
{% endfor %}

-- 3. SELECT文を生成
select
    {# トップレベルカラム(そのまま) #}
    {% for col_name in top_level_columns %}
    {{ col_name }}
    {% if not loop.last or properties_columns|length > 0 %},{% endif %}
    {% endfor %}

    {# properties_* カラム(プレフィックスを除去してリネーム) #}
    {% for col_name in properties_columns %}
    {{ col_name }} as {{ col_name.replace('properties_', '') }}
    {% if not loop.last %},{% endif %}
    {% endfor %}
from {{ dataset_name }}.{{ table_name }}

{% endif %}
{% endmacro %}

これで、dbtでやる事を減らし、平坦化と型キャストの手間をAirbyteに負担させられるようになりました。

Python 3.12でdbtが動かない

問題:

ModuleNotFoundError: No module named 'distutils'

原因: Python 3.12でdistutilsが削除された

解決策: dbt-coreをアップグレード

[project]
dependencies = [
    "dbt-bigquery>=1.8.0",  # Python 3.12対応版
]

まとめ

成功のポイント

今回の移行が比較的容易に進んだ理由は、LLMといっしょにデータ分析・原因調査をした点があります。
LLMが自分でクエリを書き、 bq コマンドを発行してデータを舐めることで、人間の目grepや第6感を凌駕した原因特定が、ものすごい速度で実現しました。

  1. 段階的アプローチ: 一気に移行せず、並行稼働期間を設ける
  2. LLMとの協業: SQLスクリプトで何度でも検証可能
  3. 詳細な追跡: レコードレベルでの差分を全て記録

参考リソース

Discussion