🐓

BigQuery Python UDF 完全に理解した

に公開

マイベスト データエンジニアの snhryt です。
先月頭に、BigQueryにユーザー定義関数 (UDF) をPythonで書ける機能がプレビュー版でやってきました🎉

少し時間が空いてしまいましたが、自分でもいろいろ触って完全に理解した[1]ので、体験記をまとめます。

Python UDFの基本構文

早速、カラムxとカラムyの足し算をするだけの簡単なPython UDFを書いてみましょう↓

create or replace function sandbox.python_udf_sample(
  x int64,
  y int64
)
returns int64
language python
options (
  runtime_version = "python-3.11",
  entry_point = "main"
)
as r'''
import pandas as pd

def main(df: pd.DataFrame):
    result = df["x"] + df["y"]
    return result
''';

with data as (
  select 1 as x, 1 as y
  union all select 3, 3
  union all select 5, 5
)
select
  sandbox.python_udf_sample(x, y)
from
  data
;

得られる結果はこんな感じ。ちゃんと足し算できてますね。

外部ライブラリの利用

先ほどまでのはお遊びで、実践上期待されているのはここからです。やはり、Python UDFの最大の魅力はPythonの豊富なライブラリ群の恩恵にあやかれる点です。

デフォルトでは NumPy, Pandas, Pyarrow, python-dateutil しか使えませんが、PyPIで公開されているライブラリであれば、バージョン指定のうえでインストールして使えます。しかも packages=["ライブラリ名"] をワンライナー追加するだけで利用可能です。 わざわざ .js ファイルをGCSに置かないと外部のライブラリが使えないJavaScript UDFと比べて、利便性が高くて大変いいですね。

例題

scipy で対応のあるt検定をして、t統計量、p値、効果量 (Cohen's d) の3つをSTRUCT型で返す処理を書いてみます。

create or replace function sandbox.ttest(
  before int64,
  after int64
)
returns struct<t float64, p float64, d float64>
language python
options (
  runtime_version = "python-3.11",
  entry_point = "main",
  packages=["scipy"]
)
as r'''
import pandas as pd
from scipy import stats

def main(df: pd.DataFrame):
    # floatに変換しないとエラーが出る
    before = df["before"].astype(float)
    after = df["after"].astype(float)

    # 対応のあるt検定
    t_stat, p_value = stats.ttest_rel(before, after)

    # 効果量(Cohen s d for paired samples)
    diff = after - before
    cohen_d = diff.mean() / diff.std(ddof=1)  # ddof=1 で標本標準偏差
    results = pd.Series(
        [{"t": t_stat, "p": p_value, "d": cohen_d}] * len(df)
    )
    return results
''';

with data as (
  -- after のほうが before よりも大きめに出ているダミーデータ
  select 70 as before, 74 as after
  union all select 68, 71
  union all select 65, 68
  union all select 72, 75
  union all select 71, 75
  union all select 69, 70
  union all select 67, 72
  union all select 70, 73
  union all select 73, 76
  union all select 66, 67
  union all select 68, 69
  union all select 69, 72
  union all select 65, 67
  union all select 67, 68
  union all select 72, 74
  union all select 74, 76
  union all select 70, 72
  union all select 66, 66
  union all select 68, 69
  union all select 71, 71
)
select distinct
  sandbox.ttest(before, after)
from
  data
;

実行結果はこちら↓

ちゃんとscipyを使ってt検定ができています。すばらしい!

ポイント1: Nレコードに対して1つの結果を出すためのテクニック

ポイントは

results = pd.Series(
    [{"t": t_stat, "p": p_value, "d": cohen_d}] * len(df)
)

select distinct
  sandbox.ttest(before, after)

の部分です。

BigQueryのUDFは1レコードに対して1つ結果を返すものなので、たとえPythonの処理内容がNレコードに対して1つ結果を得るようなものだったとしても、関数の戻り値は入力と同じ行数になっている必要があります。 そのため、Python側では * len(df) とし、ただそのままだとレコード数分同じ結果が複製されてしまうので、それをSQL側の distinct で重複排除する、という意図です。[2]

仮に入出力のレコード数が不一致の場合、以下のようなエラーをお見舞いされます。これが入出力レコード数の不一致エラーだとはわからんよ・・・

ちなみに、知名度がそんなに高くない印象ですが、BigQueryにはUDFの集計関数版 UDAF があります。で、JavaScriptはこれに対応しているのですが、Pythonは残念ながらまだ未対応でした。

Python UDF自体のGAもまだなので、UDAF対応は少し先になる気はしますが、もしUDAFに対応すれば今回のような小細工は不要になり、使い勝手が向上すると思います。

ポイント2: STRUCTで返すときはキー名を明記する

これはPython UDFに限らないBigQuery UDFのマナーですが、以下のようにキー名を指定する必要があります。

returns struct<t float64, p float64, d float64>

仮に以下のように書いてしまうと、実行前にエラーは出ない&実行自体は成功するものの、値がすべてnullになってしまうのでご注意を。

returns struct<float64, float64, float64>

ポイント3: main関数の戻り値の型

STRUCT型で返すためには、Pythonの関数の戻り値は dict の pd.Series になっている必要があります。return dfreturn df[["sum", "multiple"]] ではダメで、それだとこんな感じで怒られてしまいます↓

速度関連の落とし穴

使ってみて分かったのですが、Python UDFは速度面にわりとツラみがあります。

どこに時間がかかっているのか

公式ドキュメントには以下のように記載があります。

CREATE FUNCTION ステートメントを使用して Python UDF を作成すると、BigQuery はベースイメージに基づくコンテナ イメージを作成または更新します。コンテナは、コードと指定されたパッケージ依存関係を使用してベースイメージ上にビルドされます。コンテナの作成は、実行時間が長いプロセスです。CREATE FUNCTION ステートメント実行後の最初のクエリは、イメージの作成を自動的に待機する場合があります。外部依存関係がない場合、通常、コンテナ イメージは 1 分以内に作成されます。

こちらを一読したうえで、冒頭のクエリの処理結果を見てましょう↓

これを見ると、以下のように読み取る方が多いのではないでしょうか?

  • 全体: 3分12秒
  • UDF作成(1段目の処理): 12秒ぐらい
  • UDF実行(2段目の処理): 3分ぐらい

かくいう私も「ドキュメントの書きっぷり的にビルドもっと時間かかると思ってたけど、案外速いじゃん」と思いました。が、実はこれが罠で、大半の時間はこのステップには現れない1.5段目的な部分にかかる時間です。 初回起動時はコンテナの立ち上げに時間がかかること自体は納得ですが、その時間はUDF実行側の時間として計上される仕様のようです。

その証拠に、UDF作成&一度呼び出した後であれば、以下の処理は数秒で終わります。

with data as (
  select 1 as x, 1 as y
  union all select 3, 3
  union all select 5, 5
)
select
  sandbox.python_udf_sample(x, y)
from
  data

毎回UDFを CREATE OR REPLACE するケースもまずありえないと思いますが、Python UDFではビルドの部分が時間的な最大のボトルネックになることは、しっかり意識しないといけませんね。

実行速度のボラティリティ

ビルドさえ終われば実行は秒で・・ともいかず、そのスピードにもぼちぼちのばらつきがあります。しかも、そのばらつきはBigQueryの機嫌次第😇

試しに、先ほどと同じく2カラムの足し算をするPython UDF(作成&一度呼び出し済)を題材に、冒頭利用したダミーデータ(3レコード)と、BigQuery公開データセット nataly(138Mレコード)それぞれに対して、何度かUDFを実行し、その処理速度をまとめた結果が以下になります。

3レコードの足し算 [s] 138Mレコードの足し算 [s]
1回目 13 28
2回目 0.6 17
3回目 0.7 34
4回目 1 24
5回目 0.5 41

レコード数の多寡に依らず一定のボラがあり、レコード数が増えるとそれが10数秒レベルになることがわかります。[3]

ここで、3レコードの足し算のケースを見ると、1回目だけ時間がかかって2回目以降は早くなっとるやん、と思われたかもしれません。が、これはただの偶然です。 というのも、Python UDFではキャッシュが効かない旨がばっちり公式ドキュメントでも言及されています。

Python UDF の戻り値は常に非確定的であると想定されるため、Python UDF を呼び出すクエリの結果はキャッシュに保存されません。

138Mレコードの足し算のケースでは同様の傾向になっていないことも、偶然であることを裏付けていますね。

まとめ

速度面の気になりポイントや、BigQueryのエディタ上での開発体験の悪さ(実行前にエラーに気付けない、実行してからエラーに気づくまでに時間がかかる)などの課題はあるものの、これらをクリアできれば、あるいは気にしなければ、無限の可能性を秘めた機能であることは間違いありません[4]。今後の本機能の展望に期待です!

We're hiring!

なんでもやっていきなデータエンジニア募集中です!
https://open.talentio.com/r/1/c/my-best/pages/99801

脚注
  1. 説明するのも野暮ですが、ダニング=クルーガー効果でいう1つ目の山を指しています。一応。 ↩︎

  2. あるいは、ARRAY_AGG で集約して、UDFにはハナから1行だけのDataFrameとして渡してあげる、というパターンも考えられます。この場合は before = np.array(df["before"].to_list()[0]) のように、index=0に格納されているlistをndarrayに変換してあげればOK。 ↩︎

  3. ポジティブに捉えるならば、レコード数によらず10数秒程度のボラで済んでいる、とも言えるかもしれません。 ↩︎

  4. 今後、HEXのようにSQL/Pythonをシームレスに切り替えれるようになると最高だなーって思います。 ↩︎

Discussion