❄️

ASOF Join の使いどころ

2024/02/29に公開

Intro

Snowflake でもようやく ASOF Join が public preview になりました。

https://docs.snowflake.com/en/sql-reference/constructs/asof-join

私的には非常に便利な機能だと思っているのですが、イマイチピンとこない人もいると思うので、これがどういう機能で、どういうときに使えるかを解説していきます。

ASOF Join とは何か?

ドキュメントによると、

An ASOF JOIN operation combines rows from two tables based on timestamp values that closely follow each other, precede each other, or match exactly.

となっています。

つまり 「テーブル A のある時刻列の値に対して、テーブル B の時刻列の中で、テーブル A の時刻列の値より小さい/大きい値のうち、最も値が近いものを結合する」 という機能になります。

下記のドキュメントの例では、trades テーブルの trade_time 列の各値に対して、quotes テーブルから各値より小さい中で最大の quote_time を持つ行を結合しています。

https://docs.snowflake.com/en/sql-reference/constructs/asof-join#join-with-match-and-on-conditions

+--------------+-------------------------+----------+-------------------------+--------------+
| STOCK_SYMBOL | TRADE_TIME              | QUANTITY | QUOTE_TIME              |        PRICE |
|--------------+-------------------------+----------+-------------------------+--------------|
| AAPL         | 2023-10-01 09:00:05.000 |     2000 | 2023-10-01 09:00:03.000 | 139.00000000 |
| SNOW         | 2023-10-01 09:00:05.000 |     1000 | 2023-10-01 09:00:02.000 | 163.00000000 |
| SNOW         | 2023-10-01 09:00:10.000 |     1500 | 2023-10-01 09:00:08.000 | 165.00000000 |
+--------------+-------------------------+----------+-------------------------+--------------+

この記事では、より practical な例として「マルチクラスターウェアハウスがスケールアウト/スケールインする直前の、仮想ウェアハウスの負荷を取得する」というクエリについて考えていきます。

このクエリは、

  • WAREHOUSE_EVENTS_HISTORY ビューからスケールアウト/スケールインのイベント (EVENT_NAME IN ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')) を抽出する
  • 各イベントのタイムスタンプより小さく、かつ、その中で最大のタイムスタンプを持つ仮想ウェアハウスの負荷の記録を WAREHOUSE_LOAD_HISTORY ビューから抽出して結合する

という形になります。

従来の方法

ASOF Join が実装される前から、こういった処理を SQL で記述する方法はいくつかありました。

下記はその例になります。

1 - 相関サブクエリ

WAREHOUSE_LOAD_HISTORY ビューを結合する条件として、end_time が「各イベントの timestamp より小さく、かつ、その中で最大の end_time」を記述した相関サブクエリの結果と一致する行を抽出するようにする方法です。

記述は複雑ですが、比較的理解しやすい方法になると思います。

select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
left join warehouse_load_history l
    on e.warehouse_id = l.warehouse_id
    and l.end_time = (
      select max(end_time)
      from warehouse_load_history
      where true
      and warehouse_id = e.warehouse_id
      and end_time <= e.timestamp
    )
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by e.warehouse_name, e.timestamp desc
;

2 - MAX_BY 関数

昨年実装された MAX_BY 関数と範囲結合条件 e.timestamp >= l.end_time の組み合わせになります。

これが ASOF Join 以外で最もシンプルな解法になりそうです。

select
    e.timestamp, any_value(e.warehouse_name) warehouse_name, e.event_name,
    max(l.end_time) end_time,
    max_by(l.avg_running, l.end_time) avg_running,
    max_by(l.avg_queued_load, l.end_time) avg_queued_load,
    max_by(l.avg_queued_provisioning, l.end_time) avg_queued_provisioning,
    max_by(l.avg_blocked, l.end_time) avg_blocked
from warehouse_events_history e
left join warehouse_load_history l
    on e.warehouse_id = l.warehouse_id and e.timestamp >= l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
group by e.warehouse_id, e.timestamp, e.event_name
order by warehouse_name, e.timestamp desc
;

3 - MAX 集約 + 自己結合

あらかじめ、それぞれのイベントに対応する「各イベントの timestamp より小さく、かつ、その中で最大の end_time」をサブクエリ内で計算しておき、これを WAREHOUSE_EVENTS_HISTORY ビューに結合した上で、WAREHOUSE_LOAD_HISTORY ビューの end_time を「各イベントの timestamp より小さく、かつ、その中で最大の end_time」に対して結合する方法です。

CTE を多用してクエリを書いていると、意図せずしてこの形になるケースがありそうです。

with
aggregated as (
    select e.timestamp, e.warehouse_id, e.event_name, max(l.end_time) max_load_end_time
    from warehouse_events_history e
    left join warehouse_load_history l on e.warehouse_id = l.warehouse_id and e.timestamp >= l.end_time
    group by e.timestamp, e.warehouse_id, e.event_name
)
select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
join aggregated a
    on e.warehouse_id = a.warehouse_id and e.timestamp = a.timestamp and e.event_name = a.event_name
left join warehouse_load_history l
    on a.warehouse_id = l.warehouse_id and a.max_load_end_time = l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by e.warehouse_name, e.timestamp desc
;

4 - ROW_NUMBER/RANK ウィンドウ関数

各イベントをパーティションとして、ウィンドウ関数で l.end_time を並び替え、最大の行だけ取り出す方法になります。

これも重複排除のメジャーな方法になるので、QUALIFY に慣れている人のクエリには出現しやすそうな形になります。

select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
left join warehouse_load_history l
    on e.warehouse_id = l.warehouse_id and e.timestamp >= l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
qualify row_number() over (partition by e.timestamp, e.warehouse_id, e.event_name order by l.end_time desc) = 1
order by warehouse_name, e.timestamp desc
;

5 - LEAD/LAG ウィンドウ関数

あらかじめ WAREHOUSE_LOAD_HISTORY ビュー側で「次の end_time」を LEAD 関数で計算しておき、e.timestamp がちょうど end_time と「次の end_time」に挟み込まれる行で結合する方法になります。

トリッキーな方法ですが、パーティションの行数によっては他の方法よりもパフォーマンスがよくなります。

select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
left join (
    select *, nvl(lead(end_time) over (partition by warehouse_id order by end_time), '9999-12-31') next_end_time
    from warehouse_load_history
) l
    on e.warehouse_id = l.warehouse_id and l.next_end_time > e.timestamp and e.timestamp >= l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by warehouse_name, timestamp desc
;

新しい方法 - ASOF Join

下記が ASOF Join を使って当該処理を記述したクエリです。

select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
asof join warehouse_load_history l
    match_condition (e.timestamp >= l.end_time)
    on e.warehouse_id = l.warehouse_id
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by warehouse_name, timestamp desc
;

MATCH_CONDITION 句に大小関係を、ON 句に残りの結合条件を書くだけなので、書くのも楽だし可読性も上がります。

また、記述がシンプルになることで、内部的な処理の量や複雑性も軽減されるため、パフォーマンスについてもメリットがあります。

下記は同じデータに対して、従来の方法と ASOF Join を XLARGE 仮想ウェアハウスで実行したときの実行時間になります。

方法 実行時間(秒)
1 - 相関サブクエリ 20
2 - MAX_BY 23
3 - MAX + 自己結合 95
4 - ROW_NUMBER/RANK 50
5 - LEAD/LAG 17
ASOF Join 11

データ量としては WAREHOUSE_LOAD_HISTORY が 3,000,000 行ほど、WAREHOUSE_EVENT_HISTORY から抽出したイベント数が 500-600 行ほどになります。

上記のように、他の従来の方法と比べても ASOF Join が高速に処理することができることがわかるかと思います。

まとめ

こういったファジーな条件での時系列の結合は、ファイナンスや IoT などのワークロードで比較的よく発生するシチュエーションになるかなと思います。

もし「従来の方法」にあるようなクエリを見つけたら ASOF Join をぜひ試してみてください。

Discussion