DataformでBigQueryのPython UDFを管理する事例紹介
こんにちは、atama plus でデータエンジニアをしている kumewata です。
先日プレビュー版が公開された BigQuery の Python UDFを、Dataform 経由で利用してみたので紹介します。
背景
弊社で利用している BigQuery 上で iCal 形式で保存しているデータ(弊社プロダクトで利用しているスケジュール機能で利用)があり、実際の日付のデータの変換にしたものを BigQuery 上の Warehouse/Mart で提供したいと考えていました。
そのために iCal データを扱えるライブラリを利用したく、たとえば Python だと dateutil が候補に上がりました。
最初は Cloud Run Functions で処理を用意して BigQuery から HTTP リクエストで呼び出すことを検討していましたが、ちょうど Python UDF のプレビュー版が発表されたところだったので、簡単にやりたいことを実現できるこの方法を採用しました。
また弊社では BigQuery の UDF を Dataform で管理しているため、Python UDF も同様に管理することにしました。
調査する中でこの組み合わせの事例を見つけられず、せっかくなので紹介させてください。
前提:Dataform での UDF(SQL)を定義する
Dataform では UDF を直接管理する方法はなく、弊社でも参考記事にあるように operation を使って管理しています。
例えばテーブルの差分チェックをする UDF を SQL で書く場合は、以下のようになります。
config {
type: "operations",
hasOutput: true,
name : "diff_tables",
tags: ["udf"],
}
create or replace procedure `(Google Project ID).(Dataset ID).diff_tables`(tableA string, tableB string)
options (description="...")
begin
declare sql_str string;
-- 動的に差分チェックのクエリを組み立て
set sql_str = """
select * from (
select * from `""" || tableA || """`
except distinct
select * from `""" || tableB || """`
)
union all
select * from (
select * from `""" || tableB || """`
except distinct
select * from `""" || tableA || """`
)
""";
-- 組み立てたクエリを実行
execute immediate sql_str;
end;
Dataform で Python の UDF を定義する
現状 Python UDF では以下のランタイムとプリインストールされた追加パッケージが用意されています(バージョンは変化しうるので、最新情報は公式ドキュメントを参照ください)。これ以外にも、ランタイムのベースイメージに含まれるシステムライブラリやサードパーティのパッケージを使用することもできます。
-
ラインタイム:python-3.11
-
パッケージ
- numpy 1.26.3
- pyarrow 14.0.2
- pandas 2.1.4
- python-dateutil 2.8.2
今回はたまたま使いたいパッケージ(python-dateutil)がプリインストールされていたため、今回の事例では追加のパッケージ指定は不要でした。
別途検証したところ、options で packages=["pakage name"]を指定することで、Dataform 管理でも PyPI からインストールすることができました。
コード例は以下です。書き方は Dataform 外で定義するときとほぼ変わらなず、すぐに導入できます。
config {
type: "operations",
hasOutput: true,
name : "expand_recurrence_vectorized",
tags: ["udf"],
}
create or replace function `(Google Project ID).(Dataset ID).expand_recurrence_vectorized`(
recurrence string,
end_at timestamp,
expanded_start_date date,
expanded_end_date date,
timezone string
)
returns array<struct<idx int64, start_time datetime, end_time datetime>>
language python
options(runtime_version="python-3.11", entry_point="main")
as r'''
from dateutil import rrule
from datetime import datetime, timedelta, time
import pytz
# 定数定義
DAY_START_TIME = time.min
DAY_END_TIME = time(23, 59, 59)
DEFAULT_EVENT_DURATION = timedelta(hours=1)
UTC = pytz.UTC
def main(recurrence, end_at, expanded_start_date, expanded_end_date, timezone):
"""単一のレコードに対して繰り返し予定を展開"""
if not recurrence:
return []
# タイムゾーンの設定(キャッシュ効果あり)
tz = pytz.timezone(timezone if timezone else 'UTC')
# 期間の設定(一度だけ計算)
start_dt = tz.localize(datetime.combine(expanded_start_date, DAY_START_TIME))
end_dt = tz.localize(datetime.combine(expanded_end_date, DAY_END_TIME))
try:
# DTSTARTを高速に抽出
dtstart = None
dtstart_line = None
for line in recurrence.split('\n'):
if line.startswith('DTSTART:'):
dtstart_line = line[8:] # 'DTSTART:'の長さ分スキップ
break
if dtstart_line:
if dtstart_line.endswith('Z'):
from dateutil import parser
dtstart = parser.parse(dtstart_line).replace(tzinfo=UTC)
else:
from dateutil import parser
dtstart = parser.parse(dtstart_line)
# rrulestrでiCalendar形式をパース
rule_set = rrule.rrulestr(recurrence, forceset=True)
# 終日イベントかどうかの判定(一度だけ)
is_all_day = end_at is None
# durationの計算
if is_all_day:
duration = None
elif dtstart and end_at:
if end_at.tzinfo is None:
end_at = end_at.replace(tzinfo=tz)
if dtstart.tzinfo == UTC:
end_at_utc = end_at.astimezone(UTC)
duration = end_at_utc - dtstart
else:
duration = end_at - dtstart
else:
duration = DEFAULT_EVENT_DURATION
# 指定期間内の発生日時を取得
occurrences = list(rule_set.between(start_dt, end_dt, inc=True))
if not occurrences:
return []
if is_all_day: # 終日イベント
return [{
'idx': i,
'start_time': tz.localize(datetime.combine(
occ.astimezone(tz).date(),
DAY_START_TIME
)),
'end_time': tz.localize(datetime.combine(
occ.astimezone(tz).date(),
DAY_END_TIME
))
} for i, occ in enumerate(occurrences)]
else: # 通常イベント
return [{
'idx': i,
'start_time': occ.astimezone(tz) if occ.tzinfo else occ.replace(tzinfo=UTC).astimezone(tz),
'end_time': (occ.astimezone(tz) if occ.tzinfo else occ.replace(tzinfo=UTC).astimezone(tz)) + duration
} for i, occ in enumerate(occurrences)]
except Exception as e:
print(f"Error processing recurrence: {str(e)}")
return []
''';
注意点
「BigQuery Python UDF 完全に理解した」でも触れられていますが、Python UDF は初回の実行速度が遅く、追加のパッケージインストールなしでも 3 分ほどかかります(2 回目以降では速くなります)。今回は Dataform 処理中でそれほどパフォーマンスが求められなかったことから、このデメリットは許容しています。
まとめ
以上、Dataform で Python UDF を管理する事例紹介でした。
スペシャルサンクスとして、社内で初めて Python UDF を利用するきっかけを作ってくれた yub0n、ありがとうございました!
参考
Discussion