🙆‍♀️

[Snowflake] Streamlitでタスク管理ダッシュボードを作ってみた

2023/12/28に公開

はじめに

私が所属しているライフイズテックのデータ基盤グループでは、ELT処理でSnowflake Taskを使っています。複数のSnowflake Taskを利用するにあたって、Task間の依存関係の解決のためにタスクに有向非巡回グラフ(DAG)を設定しています。
DAG内の子タスクを編集する場合にルートタスクを一時停止にする必要がありますが、ルートタスクの再開することを忘れてしまうという事象が発生したことがありました。

そこで各タスクの状態を確認できるようにする必要が発生しました。
確認する方法で下記の2点の要件を満たすために、Streamlitでダッシュボードをつくることにしました。

  • DBやスキーマが別れていると確認が面倒
  • 条件を柔軟に変えたい

BIツールでのダッシュボードやSnowflake内のダッシュボードをつくることもかんがえましたが、今後のことやStreamlitを使ってみたいと思ったので、Streamlitにしたという点が大きいです。

ここでは実際に試しながら進めたことをまとめます。

実際に進めた手順

タスク一覧を出してみる

タスクのstateで絞り込みができるような一覧を作ろうというゴールを置いて、まずはアカウント内のタスクの情報を取得するところをsession.sqlを用いて出してみました。session.sqlによってDataframeが返ってきますが、クエリの実行のためには、collectを呼ぶ必要があります。
st.writeを使うことで、Streamlit上に情報を表示することができるため、まずは表示してみます。

task_list = session.sql("show tasks in account").collect()
st.write(task_list)

一覧の情報を絞る

show tasks in accountのすべてのカラムを表示すると、budgetowner_role_typeなど今回は必要としていないカラムも多く表示され見づらいことに気づいたので、カラムを絞ることにしました。
task_schemaという変数を作成し表示したいカラムを指定しました。

task_schema = StructType([
    StructField("name", StringType()), 
    StructField("database_name", StringType()), 
    StructField("schema_name", StringType()),
    StructField("owner", StringType()),
    StructField("schedule", StringType()),
    StructField("predecessors", StringType()), 
    StructField("state", StringType())
])
task_list = session.sql("show tasks in account").collect()
task_df = session.create_dataframe(task_list, schema=task_schema)
st.write(task_df)

multiselectを使って動的に表示条件を変更する

表示したいデータが決まったので、最後に表示条件を変更できるようにしてみました。
動的にタスクの状態を選択して、表示を変更するためにstreamlitのst.multiselectを用いて、絞り込みを可能にしました。

options = st.multiselect(
    'Task state',
    ['started', 'suspended'],
    ['started']
)

task_schema = StructType([
    StructField("name", StringType()), 
    StructField("database_name", StringType()), 
    StructField("schema_name", StringType()),
    StructField("owner", StringType()),
    StructField("schedule", StringType()),
    StructField("predecessors", StringType()), 
    StructField("state", StringType())
])
task_list = session.sql("show tasks in account").collect()
task_df = session.create_dataframe(task_list, schema=task_schema)
df = task_df[task_df["state"].isin(options)]
st.write(df)

Streamlitの感想

自分はPythonやSnowparkを初心者なため、これだけ簡単な表の表示でも不安がありました。インタラクティブなエディターをSnowflakeが提供してくれているおかげでトライアンドエラーを繰り返すことができました。
いまはシンプルな表を表示だけなので、st.writeだけで済ませましたが、st.dataframeを使うことで表示に関しても柔軟に設定できるので、できることの可能性も広がりそうです。
なによりもUIを簡単に作れるのが楽しい。

宣伝

ライフイズテック サービス開発部では、月毎に気軽にご参加いただけるカジュアルなイベントを実施しています。開催予定のイベントは、 connpass のグループからご確認ください。興味のあるイベントがあったらぜひ参加登録をお願いいたします。皆さんのご参加をお待ちしています!

Discussion