BigQuery Python UDF 完全に理解した
マイベスト データエンジニアの snhryt です。
先月頭に、BigQueryにユーザー定義関数 (UDF) をPythonで書ける機能がプレビュー版でやってきました🎉
少し時間が空いてしまいましたが、自分でもいろいろ触って完全に理解した[1]ので、体験記をまとめます。
Python UDFの基本構文
早速、カラムxとカラムyの足し算をするだけの簡単なPython UDFを書いてみましょう↓
create or replace function sandbox.python_udf_sample(
x int64,
y int64
)
returns int64
language python
options (
runtime_version = "python-3.11",
entry_point = "main"
)
as r'''
import pandas as pd
def main(df: pd.DataFrame):
result = df["x"] + df["y"]
return result
''';
with data as (
select 1 as x, 1 as y
union all select 3, 3
union all select 5, 5
)
select
sandbox.python_udf_sample(x, y)
from
data
;
得られる結果はこんな感じ。ちゃんと足し算できてますね。
外部ライブラリの利用
先ほどまでのはお遊びで、実践上期待されているのはここからです。やはり、Python UDFの最大の魅力はPythonの豊富なライブラリ群の恩恵にあやかれる点です。
デフォルトでは NumPy, Pandas, Pyarrow, python-dateutil しか使えませんが、PyPIで公開されているライブラリであれば、バージョン指定のうえでインストールして使えます。しかも packages=["ライブラリ名"]
をワンライナー追加するだけで利用可能です。 わざわざ .js ファイルをGCSに置かないと外部のライブラリが使えないJavaScript UDFと比べて、利便性が高くて大変いいですね。
例題
scipy で対応のあるt検定をして、t統計量、p値、効果量 (Cohen's d) の3つをSTRUCT型で返す処理を書いてみます。
create or replace function sandbox.ttest(
before int64,
after int64
)
returns struct<t float64, p float64, d float64>
language python
options (
runtime_version = "python-3.11",
entry_point = "main",
packages=["scipy"]
)
as r'''
import pandas as pd
from scipy import stats
def main(df: pd.DataFrame):
# floatに変換しないとエラーが出る
before = df["before"].astype(float)
after = df["after"].astype(float)
# 対応のあるt検定
t_stat, p_value = stats.ttest_rel(before, after)
# 効果量(Cohen s d for paired samples)
diff = after - before
cohen_d = diff.mean() / diff.std(ddof=1) # ddof=1 で標本標準偏差
results = pd.Series(
[{"t": t_stat, "p": p_value, "d": cohen_d}] * len(df)
)
return results
''';
with data as (
-- after のほうが before よりも大きめに出ているダミーデータ
select 70 as before, 74 as after
union all select 68, 71
union all select 65, 68
union all select 72, 75
union all select 71, 75
union all select 69, 70
union all select 67, 72
union all select 70, 73
union all select 73, 76
union all select 66, 67
union all select 68, 69
union all select 69, 72
union all select 65, 67
union all select 67, 68
union all select 72, 74
union all select 74, 76
union all select 70, 72
union all select 66, 66
union all select 68, 69
union all select 71, 71
)
select distinct
sandbox.ttest(before, after)
from
data
;
実行結果はこちら↓
ちゃんとscipyを使ってt検定ができています。すばらしい!
ポイント1: Nレコードに対して1つの結果を出すためのテクニック
ポイントは
results = pd.Series(
[{"t": t_stat, "p": p_value, "d": cohen_d}] * len(df)
)
と
select distinct
sandbox.ttest(before, after)
の部分です。
BigQueryのUDFは1レコードに対して1つ結果を返すものなので、たとえPythonの処理内容がNレコードに対して1つ結果を得るようなものだったとしても、関数の戻り値は入力と同じ行数になっている必要があります。 そのため、Python側では * len(df)
とし、ただそのままだとレコード数分同じ結果が複製されてしまうので、それをSQL側の distinct
で重複排除する、という意図です。[2]
仮に入出力のレコード数が不一致の場合、以下のようなエラーをお見舞いされます。これが入出力レコード数の不一致エラーだとはわからんよ・・・
ちなみに、知名度がそんなに高くない印象ですが、BigQueryにはUDFの集計関数版 UDAF があります。で、JavaScriptはこれに対応しているのですが、Pythonは残念ながらまだ未対応でした。
Python UDF自体のGAもまだなので、UDAF対応は少し先になる気はしますが、もしUDAFに対応すれば今回のような小細工は不要になり、使い勝手が向上すると思います。
ポイント2: STRUCTで返すときはキー名を明記する
これはPython UDFに限らないBigQuery UDFのマナーですが、以下のようにキー名を指定する必要があります。
returns struct<t float64, p float64, d float64>
仮に以下のように書いてしまうと、実行前にエラーは出ない&実行自体は成功するものの、値がすべてnullになってしまうのでご注意を。
returns struct<float64, float64, float64>
ポイント3: main関数の戻り値の型
STRUCT型で返すためには、Pythonの関数の戻り値は dict の pd.Series になっている必要があります。return df
や return df[["sum", "multiple"]]
ではダメで、それだとこんな感じで怒られてしまいます↓
速度関連の落とし穴
使ってみて分かったのですが、Python UDFは速度面にわりとツラみがあります。
どこに時間がかかっているのか
公式ドキュメントには以下のように記載があります。
CREATE FUNCTION ステートメントを使用して Python UDF を作成すると、BigQuery はベースイメージに基づくコンテナ イメージを作成または更新します。コンテナは、コードと指定されたパッケージ依存関係を使用してベースイメージ上にビルドされます。コンテナの作成は、実行時間が長いプロセスです。CREATE FUNCTION ステートメント実行後の最初のクエリは、イメージの作成を自動的に待機する場合があります。外部依存関係がない場合、通常、コンテナ イメージは 1 分以内に作成されます。
こちらを一読したうえで、冒頭のクエリの処理結果を見てましょう↓
これを見ると、以下のように読み取る方が多いのではないでしょうか?
- 全体: 3分12秒
- UDF作成(1段目の処理): 12秒ぐらい
- UDF実行(2段目の処理): 3分ぐらい
かくいう私も「ドキュメントの書きっぷり的にビルドもっと時間かかると思ってたけど、案外速いじゃん」と思いました。が、実はこれが罠で、大半の時間はこのステップには現れない1.5段目的な部分にかかる時間です。 初回起動時はコンテナの立ち上げに時間がかかること自体は納得ですが、その時間はUDF実行側の時間として計上される仕様のようです。
その証拠に、UDF作成&一度呼び出した後であれば、以下の処理は数秒で終わります。
with data as (
select 1 as x, 1 as y
union all select 3, 3
union all select 5, 5
)
select
sandbox.python_udf_sample(x, y)
from
data
毎回UDFを CREATE OR REPLACE するケースもまずありえないと思いますが、Python UDFではビルドの部分が時間的な最大のボトルネックになることは、しっかり意識しないといけませんね。
実行速度のボラティリティ
ビルドさえ終われば実行は秒で・・ともいかず、そのスピードにもぼちぼちのばらつきがあります。しかも、そのばらつきはBigQueryの機嫌次第😇
試しに、先ほどと同じく2カラムの足し算をするPython UDF(作成&一度呼び出し済)を題材に、冒頭利用したダミーデータ(3レコード)と、BigQuery公開データセット nataly(138Mレコード)それぞれに対して、何度かUDFを実行し、その処理速度をまとめた結果が以下になります。
3レコードの足し算 [s] | 138Mレコードの足し算 [s] | |
---|---|---|
1回目 | 13 | 28 |
2回目 | 0.6 | 17 |
3回目 | 0.7 | 34 |
4回目 | 1 | 24 |
5回目 | 0.5 | 41 |
レコード数の多寡に依らず一定のボラがあり、レコード数が増えるとそれが10数秒レベルになることがわかります。[3]
ここで、3レコードの足し算のケースを見ると、1回目だけ時間がかかって2回目以降は早くなっとるやん、と思われたかもしれません。が、これはただの偶然です。 というのも、Python UDFではキャッシュが効かない旨がばっちり公式ドキュメントでも言及されています。
Python UDF の戻り値は常に非確定的であると想定されるため、Python UDF を呼び出すクエリの結果はキャッシュに保存されません。
138Mレコードの足し算のケースでは同様の傾向になっていないことも、偶然であることを裏付けていますね。
まとめ
速度面の気になりポイントや、BigQueryのエディタ上での開発体験の悪さ(実行前にエラーに気付けない、実行してからエラーに気づくまでに時間がかかる)などの課題はあるものの、これらをクリアできれば、あるいは気にしなければ、無限の可能性を秘めた機能であることは間違いありません[4]。今後の本機能の展望に期待です!
We're hiring!
なんでもやっていきなデータエンジニア募集中です!

株式会社マイベストのテックブログです! 採用情報はこちら > notion.so/mybestcom/mybest-information-for-Engineers-8beadd9c91ef4dc2b21171d48a4b0c49
Discussion