atama plus techblog
🔖

DataformでBigQueryのPython UDFを管理する事例紹介

に公開

こんにちは、atama plus でデータエンジニアをしている kumewata です。

先日プレビュー版が公開された BigQuery の Python UDFを、Dataform 経由で利用してみたので紹介します。

背景

弊社で利用している BigQuery 上で iCal 形式で保存しているデータ(弊社プロダクトで利用しているスケジュール機能で利用)があり、実際の日付のデータの変換にしたものを BigQuery 上の Warehouse/Mart で提供したいと考えていました。
そのために iCal データを扱えるライブラリを利用したく、たとえば Python だと dateutil が候補に上がりました。
最初は Cloud Run Functions で処理を用意して BigQuery から HTTP リクエストで呼び出すことを検討していましたが、ちょうど Python UDF のプレビュー版が発表されたところだったので、簡単にやりたいことを実現できるこの方法を採用しました。
また弊社では BigQuery の UDF を Dataform で管理しているため、Python UDF も同様に管理することにしました。
調査する中でこの組み合わせの事例を見つけられず、せっかくなので紹介させてください。

前提:Dataform での UDF(SQL)を定義する

Dataform では UDF を直接管理する方法はなく、弊社でも参考記事にあるように operation を使って管理しています。
例えばテーブルの差分チェックをする UDF を SQL で書く場合は、以下のようになります。

config {
  type: "operations",
  hasOutput: true,
  name : "diff_tables",
  tags: ["udf"],
}

create or replace procedure `(Google Project ID).(Dataset ID).diff_tables`(tableA string, tableB string)
options (description="...")
begin
  declare sql_str string;
  -- 動的に差分チェックのクエリを組み立て
  set sql_str = """
    select * from (
      select * from `""" || tableA || """`
      except distinct
      select * from `""" || tableB || """`
    )
    union all
    select * from (
      select * from `""" || tableB || """`
      except distinct
      select * from `""" || tableA || """`
    )
  """;
  -- 組み立てたクエリを実行
  execute immediate sql_str;
end;

Dataform で Python の UDF を定義する

現状 Python UDF では以下のランタイムとプリインストールされた追加パッケージが用意されています(バージョンは変化しうるので、最新情報は公式ドキュメントを参照ください)。これ以外にも、ランタイムのベースイメージに含まれるシステムライブラリやサードパーティのパッケージを使用することもできます。

  • ラインタイム:python-3.11

  • パッケージ

    • numpy 1.26.3
    • pyarrow 14.0.2
    • pandas 2.1.4
    • python-dateutil 2.8.2

今回はたまたま使いたいパッケージ(python-dateutil)がプリインストールされていたため、今回の事例では追加のパッケージ指定は不要でした。
別途検証したところ、options で packages=["pakage name"]を指定することで、Dataform 管理でも PyPI からインストールすることができました。

コード例は以下です。書き方は Dataform 外で定義するときとほぼ変わらなず、すぐに導入できます。

config {
  type: "operations",
  hasOutput: true,
  name : "expand_recurrence_vectorized",
  tags: ["udf"],
}

create or replace function `(Google Project ID).(Dataset ID).expand_recurrence_vectorized`(
  recurrence string,
  end_at timestamp,
  expanded_start_date date,
  expanded_end_date date,
  timezone string
)
returns array<struct<idx int64, start_time datetime, end_time datetime>>
language python
options(runtime_version="python-3.11", entry_point="main")
as r'''
from dateutil import rrule
from datetime import datetime, timedelta, time
import pytz

# 定数定義
DAY_START_TIME = time.min
DAY_END_TIME = time(23, 59, 59)
DEFAULT_EVENT_DURATION = timedelta(hours=1)
UTC = pytz.UTC

def main(recurrence, end_at, expanded_start_date, expanded_end_date, timezone):
    """単一のレコードに対して繰り返し予定を展開"""

    if not recurrence:
        return []

    # タイムゾーンの設定(キャッシュ効果あり)
    tz = pytz.timezone(timezone if timezone else 'UTC')

    # 期間の設定(一度だけ計算)
    start_dt = tz.localize(datetime.combine(expanded_start_date, DAY_START_TIME))
    end_dt = tz.localize(datetime.combine(expanded_end_date, DAY_END_TIME))

    try:
        # DTSTARTを高速に抽出
        dtstart = None
        dtstart_line = None
        for line in recurrence.split('\n'):
            if line.startswith('DTSTART:'):
                dtstart_line = line[8:]  # 'DTSTART:'の長さ分スキップ
                break

        if dtstart_line:
            if dtstart_line.endswith('Z'):
                from dateutil import parser
                dtstart = parser.parse(dtstart_line).replace(tzinfo=UTC)
            else:
                from dateutil import parser
                dtstart = parser.parse(dtstart_line)

        # rrulestrでiCalendar形式をパース
        rule_set = rrule.rrulestr(recurrence, forceset=True)

        # 終日イベントかどうかの判定(一度だけ)
        is_all_day = end_at is None

        # durationの計算
        if is_all_day:
            duration = None
        elif dtstart and end_at:
            if end_at.tzinfo is None:
                end_at = end_at.replace(tzinfo=tz)

            if dtstart.tzinfo == UTC:
                end_at_utc = end_at.astimezone(UTC)
                duration = end_at_utc - dtstart
            else:
                duration = end_at - dtstart
        else:
            duration = DEFAULT_EVENT_DURATION

        # 指定期間内の発生日時を取得
        occurrences = list(rule_set.between(start_dt, end_dt, inc=True))

        if not occurrences:
            return []

        if is_all_day: # 終日イベント
            return [{
                'idx': i,
                'start_time': tz.localize(datetime.combine(
                    occ.astimezone(tz).date(),
                    DAY_START_TIME
                )),
                'end_time': tz.localize(datetime.combine(
                    occ.astimezone(tz).date(),
                    DAY_END_TIME
                ))
            } for i, occ in enumerate(occurrences)]
        else: # 通常イベント
            return [{
                'idx': i,
                'start_time': occ.astimezone(tz) if occ.tzinfo else occ.replace(tzinfo=UTC).astimezone(tz),
                'end_time': (occ.astimezone(tz) if occ.tzinfo else occ.replace(tzinfo=UTC).astimezone(tz)) + duration
            } for i, occ in enumerate(occurrences)]

    except Exception as e:
        print(f"Error processing recurrence: {str(e)}")
        return []
''';

注意点

「BigQuery Python UDF 完全に理解した」でも触れられていますが、Python UDF は初回の実行速度が遅く、追加のパッケージインストールなしでも 3 分ほどかかります(2 回目以降では速くなります)。今回は Dataform 処理中でそれほどパフォーマンスが求められなかったことから、このデメリットは許容しています。

まとめ

以上、Dataform で Python UDF を管理する事例紹介でした。
スペシャルサンクスとして、社内で初めて Python UDF を利用するきっかけを作ってくれた yub0n、ありがとうございました!

参考

https://cloud.google.com/bigquery/docs/user-defined-functions-python?hl=ja

https://buildersbox.corp-sansan.com/entry/2023/12/03/000000

atama plus techblog
atama plus techblog

Discussion