❄️

Snowflake で非同期処理を行うには?

に公開

概要

Snowflake にて、SQLを非同期で処理したい要件に遭遇しました。

Snowflake における非同期処理の手段と、各手段の特徴について考察してみました。

非同期処理

大きく、以下の手段で非同期処理を実現できそうです。

  • Snowflake Task
  • Snowflake Script と ASYNC キーワード
  • Snowpark Container Service と ASYNC オプション

それぞれ概要を記載します。

Snowflake Task

以下いずれかの処理を "定期実行" または "イベント駆動" という形で非同期実行できる仕組みです。

  • 単一の SQL 文
  • ストアドプロシージャの呼び出し
  • Snowflake Script

基本は単一の SQL 文ですが、 ストアドプロシージャや Snowflake Script じか書き等と組み合わせることで、複数の SQL 文を実行できます。

注意点

タスクでは、ストアドプロシージャにおける引数のような、処理実行時に値を動的に注入する仕組みが存在しません。

定期実行

CREATE TASK にて タスクを作成する際、 Cron 式による定期実行設定が可能です。

具体例を含む詳細は、以下公式ドキュメントをご参照ください。

イベント駆動

Snowflake では、 ストリーム という仕組みを使用することで、特定のテーブルで発生した変更をリアルタイムにキャプチャできます。

ストリームでキャプチャしたテーブルへの変更の有無は、 SYSTEM$STREAM_HAS_DATA という関数で判別できます。

これらを Snowflake Task と組み合わせることで、「ストリームへのデータ書き込みを起点にタスクを起動する」といった、イベント駆動でのタスク実行が実現できます。

具体例を含む詳細は、以下公式ドキュメントをご参照ください。

ストリームの詳細は、以下公式ドキュメントをご参照ください。

SYSTEM$STREAM_HAS_DATA の詳細は、以下公式ドキュメントをご参照ください。

Snowflake Script と ASYNC キーワード

Snowflake Script では、 ASYNC キーワードを使用することで、SQLを非同期化できます。
処理完了を待つには、 AWAIT キーワードを使用します。

BEGIN
  LET res RESULTSET := ASYNC (SELECT * FROM your_table);
  AWAIT res;
END;

Snowflake Script の詳細は、以下公式ドキュメントをご参照ください。

注意点

Snowflake Script であるため、基本は Snowflake Task や ストアドプロシージャで処理を定義することになるかと思います。
このとき、 AWAIT で処理完了を待たずに呼び出し元(Snowflake Taskやストアドプロシージャ)が完了した場合、 実行中のSQLは自動的にキャンセルされます

確実に処理を完了させるためにも、 ASYNC で実行した処理は、 AWAIT での待機が必須になるかと思います。

また、AWAIT ALL 文を使用することで、 複数実行した ASYNC 処理が全て完了するまで待機できます。

CREATE OR REPLACE PROCEDURE test_async_proc()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (INSERT INTO my_table VALUES ('...'));
  ASYNC (INSERT INTO my_table VALUES ('...'));

  -- すべての非同期ジョブの完了を待つ
  AWAIT ALL;

  RETURN '完了';
END;
$$;

ASYNC/AWAIT の詳細は、以下をご確認ください

Snowpark Container Service と ASYNC オプション

Snowflake には、 事前にコンテナイメージを登録しておき、これを使用して処理を実行する Snowpark Container Service というサービスが用意されています。

そして、コンテナを起動して処理を開始する EXECUTE JOB SERVICE コマンドでは、コンテナ処理を非同期化できる ASYNC オプションが用意されています。

EXECUTE JOB SERVICE
  NAME = 'your_db.your_schema.your_task_name'
  ASYNC = TRUE
  ...

まとめると、 非同期で実行したい処理をコンテナ化して、 EXECUTE JOB SERVICEASYNC = TRUE オプション付きで実行することで、非同期処理を実現します。

一連の Snowpark Container Service の詳細は、以下公式ドキュメントをご参照ください。

やっていることは「コンテナを起動して任意の処理を実行させる」なので、コンテナで動作する処理であれば何でも実行できます。

そのため、Snowflake SQL に留まらず、任意のプログラミング言語を使用した複雑な処理を実現できます。

詳細は、以下公式ドキュメントをご参照ください。

特徴考察

再利用性について

Snowflake Task は実行時に値を動的に設定する仕組みが無いため、再利用性は低そうです。

動的に値を設定する必要がある処理は、ストアドプロシージャや Snowpark Container Service で実装することになりそうです。

非同期処理について

Snowflake Script の ASYNC キーワードによる非同期処理について、
Snowflake タスクはもともと非同期がコンセプトなので、 Snowflake Script の ASYNC キーワードを使うメリットは少なそうです。

そのため、使用するならストアドプロシージャ内の Snowflake Script となりそうです。

その際も、 AWAIT キーワードによる処理完了待ちを忘れないようにしたいです。

一方で、 Snowpark Container Service では、 そういった心配はなさそうです。

実装の容易さ

単一の SQL を非同期に実行するだけなら、 Snowflake タスクが最も容易そうです。
複数 SQL を実行させたい場合、(体感)少しクセのある Snowflake Script で書くことになりますが、それでも Snowflake の SQL で完結する分容易だと思います。

Snowpark Container Service は、動作させるまで最も準備が大変かと思います。
やや手は出しにくいですが、一度仕組みを作ってしまえば汎用性は段違いだと思います。

特徴まとめ

- Snowflake Task Snowflake Script と ASYNC キーワード Snowpark Container Service と ASYNC オプション
実行できる処理 複数のSQL文 複数のSQL文、Javascript、Python コンテナで動くものなら何でも
親タスク終了後も動作するか
実行時引数の受け渡し ✅ (ストアドプロシージャ引数) ✅ (specification template)
事前準備 無し 無し コンテナイメージの用意&EXECUTE JOB SERVICE 用の spec ファイルの用意
強み 定期実行・イベント駆動 引数による柔軟性・再利用性 大抵の処理を実行できる
使い分け 定期実行・イベントのパブリッシュ 手動・自動で実行したい処理 ストアドプロシージャでは事足りない中・大規模処理

参考文献

Discussion