❄️

troccoで簡易MLOpsをまわしてみる2(Snowflake ver)

2023/05/06に公開

はじめに

  • この記事は前回のこちらからのちょっとした派生です
  • 良かったら先に目を通していただければ嬉しいです😁

今回の利用するサービス

  • Snowflake(DWH)
    • 少し前にpreviewになったPythonのワークシートをメインに使います
  • trocco
    • 前回と同じくデータを取得する部分とワークフローで呼び出す部分を担当します
    • 前回の記事からのUpdateとしてはFreeプランが登場しており、今回はFreeで使える範囲で対応できます

対応内容

  • こちら前回と同じでtitanic問題での利用を想定してみます!(変わり映えがなくてすみません、、)

さっそく実際に動かしてみた

kaggle APIを使ったデータ転送

  • こちらも前回とほぼ同じです
  • troccoを使って、kaggle apiを叩き、snowflakeにデータを転送します
  • trainとtestのデータをそれぞれ同じdatabase>schema配下に格納しました
  • 転送は1経路しか使わない(HTTPS => Snowflake)のでFreeプランでOK

  • しっかりデータもLoadされておりました

元となるPythonコードを用意する

  • 前回はBigQueryMLを使って簡易にSQLで実装するというものでしたが、今回はPythonを使います
  • より開発をイメージした形で行くと、EDA(探索的データ解析)をイメージします
  • ここでtitanicの解法を紹介するのはナンセンスなので(他に死ぬほど記事があるので)3分間クッキング方式でいきます
  • kaggleのコードから拝借してTitanic Data Science Solutionsを使わせていただきます
  • もし日本語で実際に実装を確認されたい方は、むかーしのものですがこちら
  • Downloadコードをさせていただき、.ipynbを入手しました

  • このコードをローカルでPythonファイルに変換します。.pyができました。
jupyter nbconvert --to script titanic-data-science-solutions.ipynb

PythonのコードをProcedure化(task化まで)する

  • SnowflakeでPythonのワークシートを立ち上げます

  • Anacondaベースのライブラリ構成となっており、GUI上でパッケージを追加することが可能です
  • 機械学習周りのライブラリも豊富に揃えられており、今回は非常にBasicなscikit-learnのパッケージ追加のみでOKです
    • NLP(日本語)周りのライブラリなどはないので、このあたりは今後という感じな気持ち

  • 先ほど変換したPythonコードをワークシートにべた張りして、動く形に整えていきます

ちょっとコード変換のポイントだけ

  • 基本コードを大きく変える必要はないです
  • ただ、matplotlibやseabornでの可視化や、printやhead/tailなどでの確認、コメント等などはバーッと削除しました
  • もうちょっとPythonファイル用に最適化された変換ができればよいですけどね
    • Chat-GPTを使えばよかったかも...

データの読み込み部

  • snowparkを使って、Pythonコードを動かす形となります
  • その際、SnowflakeのDataframeを使う場合とpandasのDataframeを使うことができますが、今回はpandasのdataframeを使っています
    • ちょこちょこ関数の引数が違ったりする部分もあるのでSnowflakeのDataframeを使う場合はもう少し換装に時間が必要そうです
  • なので、to_pandas()関数を使用して、pandasのDataframeに変換しています
import snowflake.snowpark as snowpark

train_tableName = 'iwata.titanic."train"'
test_tableName = 'iwata.titanic."test"'

train_df = session.table(train_tableName).to_pandas()
test_df = session.table(test_tableName).to_pandas()

データの掃き出し部

  • 今回は横着して、train(学習) => predict(推論)までを1処理で記載しています
  • したがって、OUTPUTはtestデータに対してのpredict結果となり、テーブルへの格納までをこの処理内で行う想定です
  • 上記の通り、PandasのDataframeで処理が進むので、predict結果をSnowflakeのテーブルに格納する必要があります
  • 方法としては大きく2つです
    • 下記のsubmissionは推論結果のデータが入ったPandasのDataframeとなります
# create_dataframeを使う場合
submission_df = session.create_dataframe(submission)
submission_df.show()
submission_df.write.mode("overwrite").save_as_table("test_predict")
# write_pandasを使う場合
session.write_pandas(submission, table_name="test_predict",database='IWATA', schema='TITANIC',auto_create_table=True, overwrite=True)
  • 基本的にはどちらでも問題なく、動きます(私は後者を選択しました)
    • create_dataframeを使う場合だと、一度SnowflakeのDataframeに書き戻されるので、showなどでコンソール上に表示することも可能です
    • 結果としてはSnowflake上にtest_predictというテーブルが格納されます
    • 公式Docはcreate_dataframewrite_pandas

じゃあ試しに実行だ!

  • 処理の始まりは少し時間がかかりそうですが、Pythonの実処理はサクッと終わりました。
  • 結果としてはreturn「成功」という文言出す形に変更

  • テーブルも作成されていました!predictも入っていそうです。

Procedure化 => Task化へ

  • 問題なく稼働確認が取れたので、Procedure化します
  • 右上の「Deploy」ボタンを押下して、名前を決定するだけ。非常に簡単

  • これでProcedureができました。これを直接callするのもOKなのですが、このあとtrocco側から呼び出しを行うので、Task化まで行っておきます
    • 補足ですが、SQLのProcedureであれば問題なくtroccoのデータマート側から呼び出せるのですが、PythonのProcedureだといくつかエラーが見られたので、一旦Task化して呼び出しました
    • 詳しくは切り分けしてないですが、ウェアハウスのデフォルト指定やcall周りの返りがPythonから来るものとなるので例外が吐かれちゃうとかあるかもしれないです
CREATE or replace TASK titanic_task
  warehouse = 'SELECT_INSERT'
  # pro_train_and_test()はプロシージャ名
  as call pro_train_and_test();
  • 完全な非同期処理となり、単体でのスケジュール実行なども可能です

troccoのデータマートでTaskを呼び出す

  • 最後にtroccoのデータマート上で呼び出しのみを行います。こちらの1行だけの簡単な記載

  • timestampもしっかり更新されてそうでした

最後にワークフロー化へ

  • 本来であれば、学習と推論のパイプラインで分けるべきですが、こんな感じです
  • ここにdbtの処理を混ぜたり、他の転送データやマルチクラウドでのデータマート処理も混ぜられるので、いつもの如く夢が膨らむばかりですね

まとめ

このような感じでしょうか

- SnowflakeのPythonワークシート利用は面白い!ライブラリの互換を見る必要もないし、installやコードのProcedureへDeployも超簡単。
- Pythonのライブラリの拡張(やユーザーでの任意isntall)はより期待したい!
- ML学習の途中結果はStageへの掃き出しやstreamlitでの可視化なども面白いかも
- troccoでの転送やジョブ & ワークフロー管理は安心安定という感じ

引き続き、今後も追いかけていければと思います。

おわり🙇

Discussion