ASOF Join の使いどころ
Intro
Snowflake でもようやく ASOF Join が public preview になりました。
私的には非常に便利な機能だと思っているのですが、イマイチピンとこない人もいると思うので、これがどういう機能で、どういうときに使えるかを解説していきます。
ASOF Join とは何か?
ドキュメントによると、
An ASOF JOIN operation combines rows from two tables based on timestamp values that closely follow each other, precede each other, or match exactly.
となっています。
つまり 「テーブル A のある時刻列の値に対して、テーブル B の時刻列の中で、テーブル A の時刻列の値より小さい/大きい値のうち、最も値が近いものを結合する」 という機能になります。
下記のドキュメントの例では、trades
テーブルの trade_time
列の各値に対して、quotes
テーブルから各値より小さい中で最大の quote_time
を持つ行を結合しています。
+--------------+-------------------------+----------+-------------------------+--------------+
| STOCK_SYMBOL | TRADE_TIME | QUANTITY | QUOTE_TIME | PRICE |
|--------------+-------------------------+----------+-------------------------+--------------|
| AAPL | 2023-10-01 09:00:05.000 | 2000 | 2023-10-01 09:00:03.000 | 139.00000000 |
| SNOW | 2023-10-01 09:00:05.000 | 1000 | 2023-10-01 09:00:02.000 | 163.00000000 |
| SNOW | 2023-10-01 09:00:10.000 | 1500 | 2023-10-01 09:00:08.000 | 165.00000000 |
+--------------+-------------------------+----------+-------------------------+--------------+
この記事では、より practical な例として「マルチクラスターウェアハウスがスケールアウト/スケールインする直前の、仮想ウェアハウスの負荷を取得する」というクエリについて考えていきます。
このクエリは、
-
WAREHOUSE_EVENTS_HISTORY
ビューからスケールアウト/スケールインのイベント (EVENT_NAME IN ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
) を抽出する - 各イベントのタイムスタンプより小さく、かつ、その中で最大のタイムスタンプを持つ仮想ウェアハウスの負荷の記録を
WAREHOUSE_LOAD_HISTORY
ビューから抽出して結合する
という形になります。
従来の方法
ASOF Join が実装される前から、こういった処理を SQL で記述する方法はいくつかありました。
下記はその例になります。
1 - 相関サブクエリ
WAREHOUSE_LOAD_HISTORY
ビューを結合する条件として、end_time
が「各イベントの timestamp
より小さく、かつ、その中で最大の end_time
」を記述した相関サブクエリの結果と一致する行を抽出するようにする方法です。
記述は複雑ですが、比較的理解しやすい方法になると思います。
select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
left join warehouse_load_history l
on e.warehouse_id = l.warehouse_id
and l.end_time = (
select max(end_time)
from warehouse_load_history
where true
and warehouse_id = e.warehouse_id
and end_time <= e.timestamp
)
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by e.warehouse_name, e.timestamp desc
;
2 - MAX_BY 関数
昨年実装された MAX_BY
関数と範囲結合条件 e.timestamp >= l.end_time
の組み合わせになります。
これが ASOF Join 以外で最もシンプルな解法になりそうです。
select
e.timestamp, any_value(e.warehouse_name) warehouse_name, e.event_name,
max(l.end_time) end_time,
max_by(l.avg_running, l.end_time) avg_running,
max_by(l.avg_queued_load, l.end_time) avg_queued_load,
max_by(l.avg_queued_provisioning, l.end_time) avg_queued_provisioning,
max_by(l.avg_blocked, l.end_time) avg_blocked
from warehouse_events_history e
left join warehouse_load_history l
on e.warehouse_id = l.warehouse_id and e.timestamp >= l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
group by e.warehouse_id, e.timestamp, e.event_name
order by warehouse_name, e.timestamp desc
;
3 - MAX 集約 + 自己結合
あらかじめ、それぞれのイベントに対応する「各イベントの timestamp
より小さく、かつ、その中で最大の end_time
」をサブクエリ内で計算しておき、これを WAREHOUSE_EVENTS_HISTORY
ビューに結合した上で、WAREHOUSE_LOAD_HISTORY
ビューの end_time
を「各イベントの timestamp
より小さく、かつ、その中で最大の end_time
」に対して結合する方法です。
CTE を多用してクエリを書いていると、意図せずしてこの形になるケースがありそうです。
with
aggregated as (
select e.timestamp, e.warehouse_id, e.event_name, max(l.end_time) max_load_end_time
from warehouse_events_history e
left join warehouse_load_history l on e.warehouse_id = l.warehouse_id and e.timestamp >= l.end_time
group by e.timestamp, e.warehouse_id, e.event_name
)
select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
join aggregated a
on e.warehouse_id = a.warehouse_id and e.timestamp = a.timestamp and e.event_name = a.event_name
left join warehouse_load_history l
on a.warehouse_id = l.warehouse_id and a.max_load_end_time = l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by e.warehouse_name, e.timestamp desc
;
4 - ROW_NUMBER/RANK ウィンドウ関数
各イベントをパーティションとして、ウィンドウ関数で l.end_time
を並び替え、最大の行だけ取り出す方法になります。
これも重複排除のメジャーな方法になるので、QUALIFY
に慣れている人のクエリには出現しやすそうな形になります。
select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
left join warehouse_load_history l
on e.warehouse_id = l.warehouse_id and e.timestamp >= l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
qualify row_number() over (partition by e.timestamp, e.warehouse_id, e.event_name order by l.end_time desc) = 1
order by warehouse_name, e.timestamp desc
;
5 - LEAD/LAG ウィンドウ関数
あらかじめ WAREHOUSE_LOAD_HISTORY
ビュー側で「次の end_time
」を LEAD
関数で計算しておき、e.timestamp
がちょうど end_time
と「次の end_time
」に挟み込まれる行で結合する方法になります。
トリッキーな方法ですが、パーティションの行数によっては他の方法よりもパフォーマンスがよくなります。
select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
left join (
select *, nvl(lead(end_time) over (partition by warehouse_id order by end_time), '9999-12-31') next_end_time
from warehouse_load_history
) l
on e.warehouse_id = l.warehouse_id and l.next_end_time > e.timestamp and e.timestamp >= l.end_time
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by warehouse_name, timestamp desc
;
新しい方法 - ASOF Join
下記が ASOF Join を使って当該処理を記述したクエリです。
select e.timestamp, e.warehouse_name, e.event_name, l.end_time, l.* ilike 'AVG_%'
from warehouse_events_history e
asof join warehouse_load_history l
match_condition (e.timestamp >= l.end_time)
on e.warehouse_id = l.warehouse_id
where true
and e.event_name in ('SPINUP_CLUSTER', 'SPINDOWN_CLUSTER')
and e.event_state = 'STARTED'
and e.timestamp >= '2024-01-01'
order by warehouse_name, timestamp desc
;
MATCH_CONDITION
句に大小関係を、ON
句に残りの結合条件を書くだけなので、書くのも楽だし可読性も上がります。
また、記述がシンプルになることで、内部的な処理の量や複雑性も軽減されるため、パフォーマンスについてもメリットがあります。
下記は同じデータに対して、従来の方法と ASOF Join を XLARGE 仮想ウェアハウスで実行したときの実行時間になります。
方法 | 実行時間(秒) |
---|---|
1 - 相関サブクエリ | 20 |
2 - MAX_BY | 23 |
3 - MAX + 自己結合 | 95 |
4 - ROW_NUMBER/RANK | 50 |
5 - LEAD/LAG | 17 |
ASOF Join | 11 |
データ量としては WAREHOUSE_LOAD_HISTORY
が 3,000,000 行ほど、WAREHOUSE_EVENT_HISTORY
から抽出したイベント数が 500-600 行ほどになります。
上記のように、他の従来の方法と比べても ASOF Join が高速に処理することができることがわかるかと思います。
まとめ
こういったファジーな条件での時系列の結合は、ファイナンスや IoT などのワークロードで比較的よく発生するシチュエーションになるかなと思います。
もし「従来の方法」にあるようなクエリを見つけたら ASOF Join をぜひ試してみてください。
Discussion