🫡

モダンなタスク管理を可能にするSnowflake Python API

2024/03/15に公開

結論

Taskを管理するならSnowflake Python APIを使おう

Snowflake Python APIとは

Snowflake公式のPythonのオブジェクト管理ライブラリ「snowflake.core」のことです(Public Beta)。Snowflake Python Connectorとは全く別物です。

https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-managing-tasks

Snowflake Python APIを使用すると、Pythonコードを使ってSnowflakeのリソース(Table、Warehouse、Task、Snowpark Container ServiceのCompute Poolなど)を管理することができます。

本記事では、Snowflake Python APIを使ってSnowflakeのタスクとDAG(Directed Acyclic Graph)を管理する方法を詳しく解説します。これにより、StreamlitなどからSQLを直接実行するのではなく、Pythonの型ヒントや補完の恩恵を受けながらタスクを管理できるようになります。

インストールと接続

まず、以下のコマンドでパッケージをインストールします。

pip install snowflake -U

次に、SnowparkまたはSnowflake Python Connectorの接続情報を使ってRootオブジェクトを作成します。ここでは、Snowparkのセッションを使う方法を紹介します。

from snowflake.core import Root
from snowflake.snowpark.context import get_active_session

session = get_active_session()
root = Root(session)

シンプルなタスクの作成と管理

単発のタスクを作成するには、TaskオブジェクトとTaskCollectionオブジェクトを使用します。以下は、my_taskというタスクを作成する例です。
schedule引数にsnowflake.core.task.Cronオブジェクトを指定することで、例えばschedule=Cron("*/10 * * * *", "Asia/Tokyo")のように、10分ごとに実行されるタスクを作成することもできます。
select 1を実行するだけの単純なタスクですが、簡単にタスクが作成できることがわかります。

from datetime import timedelta
from snowflake.core.task import Task
schema = root.databases['my_db'].schemas['my_schema']
tasks = schema.tasks
my_task = Task(name='my_task', definition='select 1', schedule=timedelta(hours=1))
tasks.create(my_task)

タスクの実行、停止、再開などの操作にはTaskResourceオブジェクトを使用します。以下のコードは、既存のmy_taskという名前のタスクを即時実行、停止、再開、削除する例です。

from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()

ローカルに存在するPython関数を定期的に実行するタスクを作成する

Snowflake Python APIの真価が発揮されるのはここからです。ストアドプロシージャとしてPython関数を実行するタスクを作成することができます。

以下のコードは、StoredProcedureCallオブジェクトで表されるローカルの関数(Snowparkによって自動的に新しいストアドプロシージャとしてアップロードされます)を定期的に実行するmy_task2というタスクを作成する例です。is_permanent=Trueを指定しないと、タスクが実行される時にストアドプロシージャが消えてしまうので注意してください。

from snowflake.core.task import StoredProcedureCall, Task
from snowflake.snowpark.functions import sproc

def hello_world(session:Session)->str:

    return "Hello World"

stored_procedure_object = sproc(
        func=hello_world,
        is_permanent=True,
        stage_location="@EXAMPLE_STAGE",
        replace=True,
        packages=['snowflake-snowpark-python'],  
        external_access_integrations=['YOUR_EAI_NAME'],
    )

my_task2 = Task(
  StoredProcedureCall(
      stored_procedure_object
  ),
  warehouse="test_warehouse",
  schedule=Cron("10 * * * *", "Asia/Tokyo")
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)

タスクグラフの作成

タスクグラフを使うと、複数のタスクを組み合わせて、それらの依存関係を定義することができます。タスクグラフは最初に実行されるルートタスクと、その終了後に実行される追加タスクのフローです。
以下のコードは、10分ごとにtask1がselect 1を実行し、task1が終了するとtask2がselect 2を実行する、dag_taskという名前のDAGTaskオブジェクトを作成する例です。

schema = root.databases["my_db"].schemas["my_schema"]
dag = DAG(
    name='dag_task',
    schedule=Cron("*/10 * * * *", "Asia/Tokyo"),
)
with dag:
    task1 = DAGTask(
        name="task1",
        definition="select 1",
        warehouse="EXAMPLE_WH",
    )

    task2 = DAGTask(
        name="task2",
        definition="select 2",
        warehouse="EXAMPLE_WH",
    )
    task1 >> task2
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)

Pythonの関数をDAGとして登録する

前述のタスクグラフをPythonコードで表現することも可能です。以下のコードでは、sproc関数でバインドされたExternal Network Accessを用いて、task1がHTTPリクエストを送信し、そのレスポンスのJSONデータを戻り値として設定します。task2はtask1のリターン値を受け取り、そのJSONデータをパースします。

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, CreateMode
from snowflake.core.task import Cron

schema = root.databases["my_db"].schemas["my_schema"]

def task1_func(session: Session) -> None:
    import requests
    context = TaskContext(session)
    response = requests.get("https://httpbin.org/get")
    json_data = response.json()
    context.set_return_value(str(json_data))

def task2_func(session: Session) -> None:
    import json
    context = TaskContext(session)
    task1_value: str = context.get_predecessor_return_value("task1")
    data = json.loads(task1_value)

    # data = {
    # {
    # "args": {}, 
    # "headers": {
    #     "Accept": "*/*", 
    #     "Host": "httpbin.org"
    #     ...
    #   } 
    # }

stored_procedure_object_1 = sproc(
    func=task1_func,
    is_permanent=True,
    stage_location="@EXAMPLE_STAGE",
    replace=True,
    packages=['snowflake-snowpark-python', 'requests'],
    external_access_integrations=['YOUR_EAI_NAME'],
)
stored_procedure_object_2 = sproc(
    func=task2_func,
    is_permanent=True,
    stage_location="@EXAMPLE_STAGE",
    replace=True,
    packages=['snowflake-snowpark-python'],
)
dag = DAG(
    name='dag_task',
    schedule=Cron("*/10 * * * *", "Asia/Tokyo"),
    use_func_return_value=True
)
with dag:
    task1 = DAGTask(
        name="task1",
        definition=StoredProcedureCall(stored_procedure_object_1),
        warehouse="EXAMPLE_WH",
    )   
    task2 = DAGTask(
        name="task2",
        definition=StoredProcedureCall(stored_procedure_object_2),
        warehouse="EXAMPLE_WH",
    )
    task1 >> task2

op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)

DAG内で共通のConfigを渡す

ストアドプロシージャを作り直すことなく動作を変更したい場合に便利な機能があります。例えばクエリパラメータのみを変えたDAGを作成したい時などは、DAGのconfigパラメータに辞書を渡すことができます。
ストアドプロシージャの引数として実装する方法もありますが、複数のタスクで同様のパラメータを参照する場合、引数を何度も渡すのは面倒な上、センシティブな引数がストアドプロシージャの実行履歴に残ってしまったりするため、configパラメータを優先する方が良いでしょう。
以下のタスクは、DAGに渡したconfigによってアクセスするクエリパラメータを変更する例です。

def task1_func(session: Session) -> None:
    import requests
    
    context = TaskContext(session)
    # configを受け取れる
    config=context.get_task_graph_config()
    
    response = requests.get(f"https://httpbin.org/get?query={config['query_params']}")
    json_data = response.json()
    context.set_return_value(str(json_data))
...
dag = DAG(
    name='dag_task',
    schedule=Cron("10 * * * *", "Asia/Tokyo"),
    # 追加
    config={"query_params":"value"}
)

with dag:
    task1 = DAGTask(
        name="task1",
        definition=StoredProcedureCall(stored_procedure_object_1),
        warehouse="EXAMPLE_WH",
    )
... 

おわりに

Snowflake Python APIを使うことで、タスクグラフを使ってタスクの依存関係を指定したり、Python関数をタスクとして登録したりすることが可能となり、複雑なワークフローを手軽に作成できます。
これまではPythonからSnowflakeのタスクを管理するには、Snowparkのsession.sql("CREATE TASK")...を繰り返し呼び出す必要がありました。しかし、このAPIの登場によって、Airflowなどの外部ツール上でPythonを用いて定義されたタスクを、Snowflakeのタスクへ容易に移行できそうです。

Snowflake Data Heroes

Discussion