Snowflake で非同期処理を行うには?
概要
Snowflake にて、SQLを非同期で処理したい要件に遭遇しました。
Snowflake における非同期処理の手段と、各手段の特徴について考察してみました。
非同期処理
大きく、以下の手段で非同期処理を実現できそうです。
- Snowflake Task
- Snowflake Script と ASYNC キーワード
- Snowpark Container Service と ASYNC オプション
それぞれ概要を記載します。
Snowflake Task
以下いずれかの処理を "定期実行" または "イベント駆動" という形で非同期実行できる仕組みです。
- 単一の SQL 文
- ストアドプロシージャの呼び出し
- Snowflake Script
基本は単一の SQL 文ですが、 ストアドプロシージャや Snowflake Script じか書き等と組み合わせることで、複数の SQL 文を実行できます。
注意点
タスクでは、ストアドプロシージャにおける引数のような、処理実行時に値を動的に注入する仕組みが存在しません。
定期実行
CREATE TASK にて タスクを作成する際、 Cron 式による定期実行設定が可能です。
具体例を含む詳細は、以下公式ドキュメントをご参照ください。
イベント駆動
Snowflake では、 ストリーム という仕組みを使用することで、特定のテーブルで発生した変更をリアルタイムにキャプチャできます。
ストリームでキャプチャしたテーブルへの変更の有無は、 SYSTEM$STREAM_HAS_DATA という関数で判別できます。
これらを Snowflake Task と組み合わせることで、「ストリームへのデータ書き込みを起点にタスクを起動する」といった、イベント駆動でのタスク実行が実現できます。
具体例を含む詳細は、以下公式ドキュメントをご参照ください。
ストリームの詳細は、以下公式ドキュメントをご参照ください。
SYSTEM$STREAM_HAS_DATA の詳細は、以下公式ドキュメントをご参照ください。
Snowflake Script と ASYNC キーワード
Snowflake Script では、 ASYNC キーワードを使用することで、SQLを非同期化できます。
処理完了を待つには、 AWAIT キーワードを使用します。
BEGIN
LET res RESULTSET := ASYNC (SELECT * FROM your_table);
AWAIT res;
END;
Snowflake Script の詳細は、以下公式ドキュメントをご参照ください。
注意点
Snowflake Script であるため、基本は Snowflake Task や ストアドプロシージャで処理を定義することになるかと思います。
このとき、 AWAIT で処理完了を待たずに呼び出し元(Snowflake Taskやストアドプロシージャ)が完了した場合、 実行中のSQLは自動的にキャンセルされます。
確実に処理を完了させるためにも、 ASYNC で実行した処理は、 AWAIT での待機が必須になるかと思います。
また、AWAIT ALL 文を使用することで、 複数実行した ASYNC 処理が全て完了するまで待機できます。
CREATE OR REPLACE PROCEDURE test_async_proc()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
ASYNC (INSERT INTO my_table VALUES ('...'));
ASYNC (INSERT INTO my_table VALUES ('...'));
-- すべての非同期ジョブの完了を待つ
AWAIT ALL;
RETURN '完了';
END;
$$;
ASYNC/AWAIT の詳細は、以下をご確認ください
Snowpark Container Service と ASYNC オプション
Snowflake には、 事前にコンテナイメージを登録しておき、これを使用して処理を実行する Snowpark Container Service というサービスが用意されています。
そして、コンテナを起動して処理を開始する EXECUTE JOB SERVICE コマンドでは、コンテナ処理を非同期化できる ASYNC オプションが用意されています。
EXECUTE JOB SERVICE
NAME = 'your_db.your_schema.your_task_name'
ASYNC = TRUE
...
まとめると、 非同期で実行したい処理をコンテナ化して、 EXECUTE JOB SERVICE を ASYNC = TRUE オプション付きで実行することで、非同期処理を実現します。
一連の Snowpark Container Service の詳細は、以下公式ドキュメントをご参照ください。
やっていることは「コンテナを起動して任意の処理を実行させる」なので、コンテナで動作する処理であれば何でも実行できます。
そのため、Snowflake SQL に留まらず、任意のプログラミング言語を使用した複雑な処理を実現できます。
詳細は、以下公式ドキュメントをご参照ください。
特徴考察
再利用性について
Snowflake Task は実行時に値を動的に設定する仕組みが無いため、再利用性は低そうです。
動的に値を設定する必要がある処理は、ストアドプロシージャや Snowpark Container Service で実装することになりそうです。
非同期処理について
Snowflake Script の ASYNC キーワードによる非同期処理について、
Snowflake タスクはもともと非同期がコンセプトなので、 Snowflake Script の ASYNC キーワードを使うメリットは少なそうです。
そのため、使用するならストアドプロシージャ内の Snowflake Script となりそうです。
その際も、 AWAIT キーワードによる処理完了待ちを忘れないようにしたいです。
一方で、 Snowpark Container Service では、 そういった心配はなさそうです。
実装の容易さ
単一の SQL を非同期に実行するだけなら、 Snowflake タスクが最も容易そうです。
複数 SQL を実行させたい場合、(体感)少しクセのある Snowflake Script で書くことになりますが、それでも Snowflake の SQL で完結する分容易だと思います。
Snowpark Container Service は、動作させるまで最も準備が大変かと思います。
やや手は出しにくいですが、一度仕組みを作ってしまえば汎用性は段違いだと思います。
特徴まとめ
| - | Snowflake Task | Snowflake Script と ASYNC キーワード | Snowpark Container Service と ASYNC オプション |
|---|---|---|---|
| 実行できる処理 | 複数のSQL文 | 複数のSQL文、Javascript、Python | コンテナで動くものなら何でも |
| 親タスク終了後も動作するか | ✅ | ❌ | ✅ |
| 実行時引数の受け渡し | ❌ | ✅ (ストアドプロシージャ引数) | ✅ (specification template) |
| 事前準備 | 無し | 無し | コンテナイメージの用意&EXECUTE JOB SERVICE 用の spec ファイルの用意 |
| 強み | 定期実行・イベント駆動 | 引数による柔軟性・再利用性 | 大抵の処理を実行できる |
| 使い分け | 定期実行・イベントのパブリッシュ | 手動・自動で実行したい処理 | ストアドプロシージャでは事足りない中・大規模処理 |
参考文献
- https://docs.snowflake.com/ja/sql-reference/sql/create-task#examples
- https://docs.snowflake.com/ja/user-guide/tasks-intro
- https://docs.snowflake.com/ja/user-guide/streams-intro
- https://docs.snowflake.com/ja/sql-reference/functions/system_stream_has_data
- https://docs.snowflake.com/en/sql-reference/snowflake-scripting/await
- https://docs.snowflake.com/ja/developer-guide/snowflake-scripting/index
- https://docs.snowflake.com/ja/developer-guide/snowpark-container-services/working-with-services
- https://docs.snowflake.com/en/sql-reference/sql/execute-job-service
Discussion