モダンなタスク管理を可能にするSnowflake Python API
結論
Taskを管理するならSnowflake Python API
を使おう
Snowflake Python APIとは
Snowflake公式のPythonのオブジェクト管理ライブラリ「snowflake.core」のことです(Public Beta)。Snowflake Python Connector
とは全く別物です。
Snowflake Python APIを使用すると、Pythonコードを使ってSnowflakeのリソース(Table、Warehouse、Task、Snowpark Container ServiceのCompute Poolなど)を管理することができます。
本記事では、Snowflake Python APIを使ってSnowflakeのタスクとDAG(Directed Acyclic Graph)を管理する方法を詳しく解説します。これにより、StreamlitなどからSQLを直接実行するのではなく、Pythonの型ヒントや補完の恩恵を受けながらタスクを管理できるようになります。
インストールと接続
まず、以下のコマンドでパッケージをインストールします。
pip install snowflake -U
次に、SnowparkまたはSnowflake Python Connectorの接続情報を使ってRootオブジェクトを作成します。ここでは、Snowparkのセッションを使う方法を紹介します。
from snowflake.core import Root
from snowflake.snowpark.context import get_active_session
session = get_active_session()
root = Root(session)
シンプルなタスクの作成と管理
単発のタスクを作成するには、TaskオブジェクトとTaskCollectionオブジェクトを使用します。以下は、my_taskというタスクを作成する例です。
schedule引数にsnowflake.core.task.Cron
オブジェクトを指定することで、例えばschedule=Cron("*/10 * * * *", "Asia/Tokyo")
のように、10分ごとに実行されるタスクを作成することもできます。
select 1
を実行するだけの単純なタスクですが、簡単にタスクが作成できることがわかります。
from datetime import timedelta
from snowflake.core.task import Task
schema = root.databases['my_db'].schemas['my_schema']
tasks = schema.tasks
my_task = Task(name='my_task', definition='select 1', schedule=timedelta(hours=1))
tasks.create(my_task)
タスクの実行、停止、再開などの操作にはTaskResourceオブジェクトを使用します。以下のコードは、既存のmy_taskという名前のタスクを即時実行、停止、再開、削除する例です。
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
ローカルに存在するPython関数を定期的に実行するタスクを作成する
Snowflake Python APIの真価が発揮されるのはここからです。ストアドプロシージャとしてPython関数を実行するタスクを作成することができます。
以下のコードは、StoredProcedureCallオブジェクトで表されるローカルの関数(Snowparkによって自動的に新しいストアドプロシージャとしてアップロードされます)を定期的に実行するmy_task2というタスクを作成する例です。is_permanent=True
を指定しないと、タスクが実行される時にストアドプロシージャが消えてしまうので注意してください。
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.snowpark.functions import sproc
def hello_world(session:Session)->str:
return "Hello World"
stored_procedure_object = sproc(
func=hello_world,
is_permanent=True,
stage_location="@EXAMPLE_STAGE",
replace=True,
packages=['snowflake-snowpark-python'],
external_access_integrations=['YOUR_EAI_NAME'],
)
my_task2 = Task(
StoredProcedureCall(
stored_procedure_object
),
warehouse="test_warehouse",
schedule=Cron("10 * * * *", "Asia/Tokyo")
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
タスクグラフの作成
タスクグラフを使うと、複数のタスクを組み合わせて、それらの依存関係を定義することができます。タスクグラフは最初に実行されるルートタスクと、その終了後に実行される追加タスクのフローです。
以下のコードは、10分ごとにtask1がselect 1
を実行し、task1が終了するとtask2がselect 2
を実行する、dag_taskという名前のDAGTaskオブジェクトを作成する例です。
schema = root.databases["my_db"].schemas["my_schema"]
dag = DAG(
name='dag_task',
schedule=Cron("*/10 * * * *", "Asia/Tokyo"),
)
with dag:
task1 = DAGTask(
name="task1",
definition="select 1",
warehouse="EXAMPLE_WH",
)
task2 = DAGTask(
name="task2",
definition="select 2",
warehouse="EXAMPLE_WH",
)
task1 >> task2
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
Pythonの関数をDAGとして登録する
前述のタスクグラフをPythonコードで表現することも可能です。以下のコードでは、sproc
関数でバインドされたExternal Network Accessを用いて、task1がHTTPリクエストを送信し、そのレスポンスのJSONデータを戻り値として設定します。task2はtask1のリターン値を受け取り、そのJSONデータをパースします。
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, CreateMode
from snowflake.core.task import Cron
schema = root.databases["my_db"].schemas["my_schema"]
def task1_func(session: Session) -> None:
import requests
context = TaskContext(session)
response = requests.get("https://httpbin.org/get")
json_data = response.json()
context.set_return_value(str(json_data))
def task2_func(session: Session) -> None:
import json
context = TaskContext(session)
task1_value: str = context.get_predecessor_return_value("task1")
data = json.loads(task1_value)
# data = {
# {
# "args": {},
# "headers": {
# "Accept": "*/*",
# "Host": "httpbin.org"
# ...
# }
# }
stored_procedure_object_1 = sproc(
func=task1_func,
is_permanent=True,
stage_location="@EXAMPLE_STAGE",
replace=True,
packages=['snowflake-snowpark-python', 'requests'],
external_access_integrations=['YOUR_EAI_NAME'],
)
stored_procedure_object_2 = sproc(
func=task2_func,
is_permanent=True,
stage_location="@EXAMPLE_STAGE",
replace=True,
packages=['snowflake-snowpark-python'],
)
dag = DAG(
name='dag_task',
schedule=Cron("*/10 * * * *", "Asia/Tokyo"),
use_func_return_value=True
)
with dag:
task1 = DAGTask(
name="task1",
definition=StoredProcedureCall(stored_procedure_object_1),
warehouse="EXAMPLE_WH",
)
task2 = DAGTask(
name="task2",
definition=StoredProcedureCall(stored_procedure_object_2),
warehouse="EXAMPLE_WH",
)
task1 >> task2
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
DAG内で共通のConfigを渡す
ストアドプロシージャを作り直すことなく動作を変更したい場合に便利な機能があります。例えばクエリパラメータのみを変えたDAGを作成したい時などは、DAGのconfigパラメータに辞書を渡すことができます。
ストアドプロシージャの引数として実装する方法もありますが、複数のタスクで同様のパラメータを参照する場合、引数を何度も渡すのは面倒な上、センシティブな引数がストアドプロシージャの実行履歴に残ってしまったりするため、configパラメータを優先する方が良いでしょう。
以下のタスクは、DAGに渡したconfigによってアクセスするクエリパラメータを変更する例です。
def task1_func(session: Session) -> None:
import requests
context = TaskContext(session)
# configを受け取れる
config=context.get_task_graph_config()
response = requests.get(f"https://httpbin.org/get?query={config['query_params']}")
json_data = response.json()
context.set_return_value(str(json_data))
...
dag = DAG(
name='dag_task',
schedule=Cron("10 * * * *", "Asia/Tokyo"),
# 追加
config={"query_params":"value"}
)
with dag:
task1 = DAGTask(
name="task1",
definition=StoredProcedureCall(stored_procedure_object_1),
warehouse="EXAMPLE_WH",
)
...
おわりに
Snowflake Python APIを使うことで、タスクグラフを使ってタスクの依存関係を指定したり、Python関数をタスクとして登録したりすることが可能となり、複雑なワークフローを手軽に作成できます。
これまではPythonからSnowflakeのタスクを管理するには、Snowparkのsession.sql("CREATE TASK")...
を繰り返し呼び出す必要がありました。しかし、このAPIの登場によって、Airflowなどの外部ツール上でPythonを用いて定義されたタスクを、Snowflakeのタスクへ容易に移行できそうです。
Snowlfake データクラウドのユーザ会 SnowVillage のメンバーで運営しています。 Publication参加方法はこちらをご参照ください。 zenn.dev/dataheroes/articles/db5da0959b4bdd
Discussion