❄️

Frosty Friday Week 33 Hard - Clustering depthをモニタリングせよ!

2024/12/19に公開

こんにちは!KDDIアジャイル開発センターのmakotyoです。

今回は、Snowflakeのクラスタリング深度をモニタリングするタスクを作成する問題に挑戦しました。

Week 33のチャレンジ内容

今回挑んだ問題はこちらです。
https://frostyfriday.org/blog/2023/02/10/week-33-hard/

Frosty Friday社では、クラスタリング深度の監視に関する要件があり、これを監視するためのDAGを構築する必要があります。

まず、以下のようなテーブルを作成します:

create or replace table cluster_depth_monitoring (
    database_name varchar,
    schema_name varchar,
    table_name varchar,
    clustering_depth float,
    inserted_at timestamp,
    inserted_by varchar
);

そして、以下の3つのタスクを作成し、連携させる必要があります:

  1. CM_IDENTIFY_TABLES:

    • データベース内のクラスタ化されたテーブルを特定
    • この情報を一時テーブルに格納
  2. CM_INSERT_DETAILS:

    • Snowpark stored procedureを使用して各テーブルのクラスタリング深度を調査
    • 結果をcluster_depth_monitoringテーブルに挿入
  3. CM_CLEAN_UP:

    • CM_IDENTIFY_TABLESで作成した一時テーブルを削除

重要な要件として、すべてのクエリに'cluster_depth_monitoring'というタグを付ける必要があります。

解答

まずは問題文にある通り監視用のテーブルを作成しましょう:

CREATE OR REPLACE TABLE CLUSTER_DEPTH_MONITORING (
    DATABASE_NAME VARCHAR,
    SCHEMA_NAME VARCHAR,
    TABLE_NAME VARCHAR,
    CLUSTERING_DEPTH FLOAT,
    INSERTED_AT TIMESTAMP,
    INSERTED_BY VARCHAR
);

タスク1: クラスタ化テーブルの特定

CREATE OR REPLACE TASK CM_IDENTIFY_TABLES
AS 
BEGIN
    ALTER SESSION SET QUERY_TAG='cluster_depth_monitoring';
    SHOW TABLES IN DATABASE SNOWFLAKE_SAMPLE_DATA;
    CREATE OR REPLACE TRANSIENT TABLE CLUSTERED_TABLE_INFO AS
    SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) 
    WHERE "cluster_by" != '';
END;

このタスクでは

  • SHOW TABLESコマンドでテーブル一覧を取得
  • クラスタ化されているテーブル(cluster_byが空でない)のみを抽出

します。

タスク2: クラスタリング深度の取得と記録

CREATE OR REPLACE PROCEDURE CM_INSERT_DETAILS()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'main'
EXECUTE AS CALLER
AS
$$
from snowflake import snowpark
from snowflake.snowpark.functions import col, lit, udf, call_builtin, concat_ws, current_timestamp
from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType, FloatType

def get_clustering_depth(session, table_name: str) -> int:
    query = f"SELECT SYSTEM$CLUSTERING_DEPTH('{table_name}') AS depth"
    result = session.sql(query).collect()
    return result[0][0]

def main(session: snowpark.Session): 
    session.query_tag = 'cluster_depth_monitoring'

    # PUBLIC.CLUSTERED_TABLE_INFOテーブルからデータを取得
    tableName = 'PUBLIC.CLUSTERED_TABLE_INFO'
    df = session.table(tableName)
    
    # pandas DataFrameに変換
    df_pd = df.to_pandas()
    
    # クラスタリング深度のリストを作成
    depth_list = []
    for index, row in df_pd.iterrows():
        # 完全修飾テーブル名を作成
        table_name = f'{row["database_name"]}.{row["schema_name"]}.{row["name"]}'
        depth = get_clustering_depth(session, table_name)
        depth_list.append([depth, table_name])
    
    # スキーマを定義
    schema = StructType([
        StructField("clustering_depth", FloatType()), 
        StructField("table_name", StringType())
    ])
    
    # クラスタリング深度のDataFrameを作成
    clustering_depth_df = session.create_dataframe(depth_list, schema)

    
    # 元のDataFrameとクラスタリング深度のDataFrameを結合
    result_df = df.join(clustering_depth_df, 
                        concat_ws(lit('.'), df.col('"database_name"'), df.col('"schema_name"'), df.col('"name"')) == clustering_depth_df.col("table_name"))

    
    # 必要な列を選択
    result_df = result_df.select(
        col('"database_name"').alias("DATABASE_NAME"),
        col('"schema_name"').alias("SCHEMA_NAME"),
        col('"name"').alias("TABLE_NAME"),
        col("CLUSTERING_DEPTH"),
        current_timestamp().alias("INSERTED_AT"),
        call_builtin('system$current_user_task_name').alias("INSERTED_BY")
    ).write.save_as_table("cluster_depth_monitoring", mode="append")
    
    return "OK"
$$;

CREATE OR REPLACE TASK CM_INSERT_DETAILS
    AFTER CM_IDENTIFY_TABLES
AS CALL CM_INSERT_DETAILS();

このタスクの中身をざっくりと説明すると

  1. 最初のタスクで作成したCLUSTERED_TABLE_INFOからクラスタ化されたテーブルのリストを取得
  2. テーブルそれぞれに対してSYSTEM$CLUSTERING_DEPTH関数を実行してクラスタリング深度を取得
  3. 取得したクラスタリング深度をCLUSTER_DEPTH_MONITORINGテーブルに挿入

SYSTEM$CLUSTERING_DEPTH関数は固定値しか引数に取れないため、テーブルのリストをfor文で回してクエリを実行する必要があるのがちょっとややこしいですね。

タスク3: クリーンアップ

CREATE OR REPLACE TASK CM_CLEAN_UP
    AFTER CM_INSERT_DETAILS
AS 
BEGIN
    ALTER SESSION SET QUERY_TAG='cluster_depth_monitoring';
    DROP TABLE IF EXISTS CLUSTERED_TABLE_INFO;
END;

最後に、タスクを有効化して実行:

ALTER TASK CM_CLEAN_UP RESUME;
ALTER TASK CM_INSERT_DETAILS RESUME;
EXECUTE TASK CM_IDENTIFY_TABLES;

実装時のポイント

  1. SYSTEM$CLUSTERING_DEPTH関数の制約:

    • 引数に動的な値を渡せない
    • テーブルごとに個別にクエリを実行する必要がある
  2. クエリタグの設定:

    • 各タスクで明示的に設定
    • ストアドプロシージャ内ではEXECUTE AS CALLERが必要
  3. システム関数の活用:

    • SYSTEM$CLUSTERING_DEPTH
    • SYSTEM$CURRENT_USER_TASK_NAME

まとめ

Snowflakeのみで簡易的なデータパイプラインを作成する方法が学べる良い問題でした。

  • タスクを組み合わせてDAGを構築する方法
  • システム関数の利用
  • クエリタグの設定
  • Snowparkを使用したPythonプログラミング

DAGがうまく動いているのを見るのは爽快です!

Snowflake Data Heroes

Discussion