💪🏻

dbtとデータパーティショニングで、大量データを扱う

2022/12/20に公開

dbt Advent Calendar 2022 の20日目の記事です。

背景

筆者は、dbtを使った広告プラットフォームのデータ基盤の構築・運用をしています。
この基盤は、最初からdbtを使っていたわけではなく、過去にフルスクラッチから、dbtへのリプレイスをしました。
広告レポーティング基盤に、dbtを導入したら別物になった話

そのdbtへのリプレイスで、当初困ったことがありました。世の中で紹介されているdbtのサンプルコードは、データ量が少ないもの(広告に比べると)を前提にしているので、大量データを扱っている筆者にとっては参考に出来るものがありませんでした。
けれども、元々フルスクラッチで実装していた時に、採用していたパーティショニングを使ったデータ処理のパターンが、dbtでの実装においても、非常に有効だったので、今回はそれについてシェアします。
今回、紹介する設計は、データウェアハウス特有の機能を使った設計ではないので、ほとんどのデータウェアハウスで、適用可能なものとなっていますので、ご安心ください。
一方で、サンプルとして示すコードは、何かのデータウェアハウスを使って示す必要があるため、Snowflakeを前提として進めさせて頂きます。
もし、この設計手法を他のデータウェアハウスで行いたい場合は?については、最後に書いておきます。

用語集

  • impression
    • 広告表示イベント。ブラウザやアプリ上に、広告が表示されると計上されるイベント。
  • click
    • 広告クリックイベント。ブラウザやアプリ上、で表示された広告がクリックされると計上されるイベント。

素直にincremental insertでは、間に合わない

広告のログは、dailyで数億データ発生します。そのため、 materialized='table' では効率が悪いので、逐次処理ができるmaterialized='incremental' を採用しています。
まずは、よくあるインターネットサンプルを真似て、以下のようなdbt modelを構築をしていました。

ref('impressions') は、S3にPutされたログを、データウェアハウスに取り込んだだけのdbt modelがあると思ってください。

ref('impressions') に入ってるサンプルデータ。

IMPRESSION_ID IMPRESSION_AT
hogebb25-8acf-4f27-a864-39baff9f9d12 2022-11-04T17:43:42Z
bar23f0c-4d04-495e-bf43-8b6cc6777c38 2022-11-04T17:11:27Z
aar23f0c-4d04-495e-bf43-8b6cc6777c38 2022-11-04T17:12:27Z

素直に実装したdbt modelの例。

{{
  config(
    materialized='incremental',
    unique_key='impression_id', -- 同じidのものがあったら上書きしたい
  )
}}

with source as (

  select * from {{ ref('impressions') }}
  {% if is_incremental() %}
    where _load_timestamp > (select max(_load_timestamp) from {{ this }}) -- 新しいimpressionログを取り込む
  {% endif %}

),

final as (
  select
    -- 様々な処理
  from source
)

select * from final

上記のdbt modelで大量データを扱うには、いくつか問題があります。その中でもクリティカルなのは、materialized='incremental'unique_key は、全てのデータをスキャンしようとする挙動です。
この話題は、以下のdbt-coreのissueにも上がっています。
Incremental Load Predicates To Bound unique_id scans #3293

論理パーティションごとに、incremental insert

上記のクエリでは実装できないため、先程紹介したissueにあるようなはcustom materializedを作成するか考えましたが、フルスクラッチで作っていたデータパイプラインで使っていたパーティションごとに、処理するパターンで実装したところ、すんなり終わったので、そちらを採用しました。

使っているデータウェアハウスによっては、物理的なパーティション機能を提供していないかもしれませんが、論理的にデータのパーティションできるカラムを作ってやれば問題ありません。
また、データのパーティションの単位には、基本的に時間(一般的には、datestampと呼ばれるもの)を使っています。
これは、S3のようなデータストレージを使っていると、多くの場合、時間ラベル付きのディレクトリ構成(eg: s3://{logname}/yyyy/mm/dd/hh)で管理されていて、しかも、データウェアハウスへの取り込みジョブが、例えば1日に1回で行われているなら、毎日新しい日付パーティションが作成されていることを意味します。
この実態を、素直に表現できて、フィルタリングにも有効であるため、時間ベースのパーティションを採用しています。

時間ベースパーティションを利用した例。

{{
  config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='_partition_hourly',
  )
}}

with source as (

  select * from {{ ref('impressions') }}
  where _partition_hourly = {{ dbt.safe_cast("'"~var('target_partition')~"'", api.Column.translate_type("timestamp")) }} -- var target_partitionは、dbt run時に渡される eg: 12-16T10:00

),

final as (
  select
    -- 様々な処理
  from source
)

select * from final

挙動は、以下の通りです。

  • sourceで、_partition_hourly = '2022-12-16 10:00' のデータを取得する
  • finalで、必要な処理を施す。
  • 最後に、_partition_hourly = '2022-12-16 10:00' を持つデータを全て削除し、今回生成した結果をinsertする。

これは、俗に言う洗い替え処理が、unique_key='_partition_hourly' と指定するだけで実現しています。

ref('impressions') に入ってるサンプルデータ。

IMPRESSION_ID IMPRESSION_AT _PARTITION_HOURLY
11111111-9d42-479b-b5c8-268e76f6c971 2022-11-01T08:58:00Z 2022-11-01T08:00:00Z
22222222-2322-4177-aecf-c13de108d3b9 2022-11-01T08:59:00Z 2022-11-01T08:00:00Z
33333333-d799-4fb0-a667-dcc12f920986 2022-11-01T08:59:59Z 2022-11-01T09:00:00Z
  • impression_id は、それぞれに振られたID。(ソースシステムで発行される)
  • impression_at は、データが生成された時間。(ソースシステムで発行される)
  • _partition_hourly が、時間ベースの論理パーティション。

_partition_hourly に設定する値は、筆者は、S3の時間ラベル付きディレクトリのパスの情報を使っています。
例えば、ad-log/impressions/y=2022/m=11/d=01/h=09 というパスにログがあるなら、_partition_hourly には、2022-11-01T09:00 を設定します。

上流であるS3から、下流まで、このパーティションをベースに、データパイプラインを構築していきます。

論理パーティションごとに、処理をするメリット

クエリの効率が良い

ログの発生時間ベースで、<<=でフィルタリングするよりも、パーティションを = でフィルタリングする方が、クエリの効率は良いです。

-- slow...
where '2022-11-01T08:00:00' <= impression_at and impression_at < '2022-11-01T09:00:00'

-- high speed
where _partition_hourly = '2022-11-01T08:00:00'

また、ログの時間でフィルタリングすると、遅れてやってくるログが(1時間後のS3パスに入ってしまうなど)、処理対象から漏れてしまう問題を考慮に入れる必要もあったりもするので、私は基本的に、データパイプライン内で、処理範囲を決定するのに、ログの時間を使わないようにしています。

データの総量が増えても、クエリパフォーマンスが落ちない

最初にやろうとしていた方法だと、データウェアハウスに格納されているデータの総量に応じて、日に日にクエリのパフォーマンスが落ちます。
しかし、今回紹介した方法の場合、論理パーティションごと、今回でいうと、1時間のレンジで処理をしていくため、時間経過と共にクエリが遅くなる問題が発生しなくなる。(もちろん単位時間あたりの流量が増えれば、別の問題は発生する)

ロジックの変更とデータの変更を分離できる

ビジネスルールは、常に変わります。同じ指標の話であっても、計算ロジックに微調整が必要になることがあります。もちろんロジックは変更するのですが、過去データは、その当時の計算ロジックで生成されたものを残したい。
全てのデータを洗い替えする方式でやっている場合、過去分の事実も丸っと新しいロジックが反映されてしまいます。ですが、今回紹介したパーティションごとに処理をしていく方式だと、ただ事実を積み上げていくだけになるので、過去データには特に影響がありません。
この性質は、実装ミスによって生まれた間違ったデータの再集計や、単純なデータ再取り込みにも役立つでしょう。

dbt test戦略

パーティショニングをうまく使うことで、データの流れを明快にしつつ、パフォーマンスも上げれました。
しかし、まだ課題があります。それがdbt testです。
dbt testは、基本的に、全量のデータをスキャンします。そのため、素直にやると非常にコストが高いクエリになってしまいます。
全量はテストしたくないんだけど、直近の変更があったデータだけテストしてえ・・・そこで、dbt modelが走った時間をカラム化することで、必要な範囲だけテストするを実現しています。

{{
  config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='_partition_hourly',
  )
}}

with source as (

  select * from {{ ref('impressions') }}
  where _partition_hourly = {{ dbt.safe_cast("'"~var('target_partition')~"'", api.Column.translate_type("timestamp")) }}

),

final as (
  select
    ...,
    {{ dbt.current_timestamp() }} as _last_dbt_run_at
  from source
)

select * from final

_last_dbt_run_at で、test範囲を絞り込む。

version: 2

models:
  - name: fct_impressions
    columns:
      - name: impression_id
        tests:
          - not_null:
              config:
                where: "_last_dbt_run_at >= dateadd('hour', -1, current_timestamp())"

パーティションを超えて、処理が必要な場合

パーティションを超えて、処理が必要なケースがありますが、その場合は、累積テーブルを作ると良いでしょう。
以下に簡単な累積テーブルの例を示します。
これが役に立つのは、そのパーティションだけ見てても事実が判定できず、過去の積み上げによって生まれる事実を、dbt modelにしたい時です。

筆者のケースだと、いくつかのクリックで、どれが一番最初のクリックで、どれが一番最後のクリックだったか?を判定する時に使っています。

{{
  config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='_partition_hourly',
  )
}}

{%- set datehour = var('target_partition') -%}
{%- set start_datehour = dbt.dateadd(datepart="hour", interval=-7, from_date_or_timestamp="'"~datehour~"'") -%}
{%- set end_datehour = dbt.safe_cast("'"~datehour~"'", api.Column.translate_type("timestamp")) -%}

with source as (

  select * from {{ ref('clicks') }}
  where 
    {{ start_datehour }} <= _partition_hourly
    and _partition_hourly <= {{ end_datehour }}
),

final as (
  select
    -- 様々な処理
  from source
)

select * from final

まとめ

パーティショニングとdbtを組み合わせると、dbtのデフォルトの実装だけで、大量データを扱うことが出来ます。また、パーティショニング自体は、データの量の多さに関係なく、便利な場面もあるので、是非お試しあれ。

Redshiftでの実装

デフォルトのincrementalで同じことを実装できます。

{{
    config(
        materialized='incremental',
        unique_key='_partition_hourly',
    )
}}

BigQueryでの実装

BigQuqeryの場合、パーティション機能が用意されているので、それを使えばできます。
https://docs.getdbt.com/reference/resource-configs/bigquery-configs#static-partitions

Discussion