🔥

Snowflake機械学習:SnowparkでRandomForest推論の実行

2021/08/05に公開


uewtwo: Twitter

本記事の内容

  1. Snowparkを使ってみたい
  2. Snowflakeのコンピューティングリソースで推論がしたい
  3. PMMLを使ってirisをSnowflakeでRandomForestする!

参考記事

環境

  • OS:
    • MacOS BigSur
  • VSCode:
    • Metals: 1.10.6
  • Scala:
    • scala: version 2.12.13
    • snowpark: 0.6.0
    • typesafe: 1.4.1
    • pmml4s: 0.9.11
  • Python:
    • python: 3.8.6
    • pandas: 1.2.3
    • sklearn: 0.24.2
    • sklearn2pmml: 0.73.2
    • pypmml: 0.9.11

前準備

流れ

今回の記事の流れは

  1. sklearnを用いてiris.dataをRandomForestで分類するモデルをPMML形式で作成する
  2. SnowparkからPMMLファイルを利用しiris.dataに対し分類するUDFを作成する
  3. Snowparkを実行し分類モデルをSnowflakeのデータに対し適用する
  4. HAPPY

推論用モデルを作成する

まず最初に、sklearnのRandomForestClassifierを使ってクラスアタリングモデルの実装をやっていきます。
私は試行錯誤しながら作業したかったため、Jupyter Notebookを使用しました。

まずはダウンロードしておいたiris.dataの読み込みから。
ヘッダーは、Snowflakeに取り込んだ際のカラム名と合わせておきます。

create_clasification_model.py
import pandas as pd

headers = [
	"SEPAL_LENGTH",
	"SEPAL_WIDTH",
	"PETAL_LENGTH",
	"PETAL_WIDTH",
	"CLASS"
]
iris_df = pd.read_csv("iris.data", names=headers)
iris_df.head()
SEPAL_LENGTH	SEPAL_WIDTH	PETAL_LENGTH	PETAL_WIDTH	CLASS
0	5.1	3.5	1.4	0.2	Iris-setosa
1	4.9	3.0	1.4	0.2	Iris-setosa
2	4.7	3.2	1.3	0.2	Iris-setosa
3	4.6	3.1	1.5	0.2	Iris-setosa
4	5.0	3.6	1.4	0.2	Iris-setosa

読み込み確認までは簡単。

次にPMML形式でモデルを作成していきます。
データを特徴量とターゲットに分けて

create_clasification_model.py
from sklearn.model_selection import train_test_split

iris_X = iris_df[iris_df.columns.difference(["CLASS"])]
iris_y = iris_df["CLASS"]

X_train, X_test, y_train, y_test = train_test_split(iris_X, iris_y, test_size=0.3) # 70% 

PMML形式で分類モデルを作成し

create_clustering_model.py
from sklearn.ensemble import RandomForestClassifier
from sklearn2pmml.pipeline import PMMLPipeline

pipeline = PMMLPipeline([
    ("classifier", RandomForestClassifier(n_estimators=100))
])
pipeline.fit(X_train, y_train)

一応精度を確認します。

create_clustering_model.py
from sklearn import metrics

y_pred = pipeline.predict(X_test)
metrics.accuracy_score(y_test, y_pred)
0.9111111111111111

ある程度accuracyが出ている分類モデルの作成ができているようですね。
次にpmmlファイルとして書き出します。

create_clustering_model.py
from sklearn2pmml import sklearn2pmml

sklearn2pmml(pipeline, "RandomForestIris.pmml", with_repr = True)

これでカレントディレクトリに RandomForestIris.pmml ファイルができていると思います。
念の為、ちゃんとモデルが生成されているか確認していきましょう。

ファイルを読み込んで、iris_dfに適用してみます。

create_clustering_model.py
from pypmml import Model

model = Model.fromFile("RandomForestIris.pmml")
result = model.predict(iris_df)

result.head()
  probability(Iris-setosa)	probability(Iris-versicolor)	probability(Iris-virginica)
0	1.00	0.00	0.0
1	0.99	0.01	0.0
2	1.00	0.00	0.0
3	1.00	0.00	0.0
4	1.00	0.00	0.0

ちゃんと推論できていそうですね。

依存関係解決のための準備

作成したモデルや各種UDF実行に必要なファイル/ライブラリをjarにしておきます。
配置場所は src/main/resouces 下にしています。

モデル

作成したPMMLモデルをjarファイルにします。
プロジェクトルートより

cd src/main/resouces && jar cvf iris.jar path/to/model/RandomForestIris.pmml

その他

下記ライブラリについてjarファイルをDL/生成

Snowparkを使って、UDFを作成しSnowflakeへアップロードする

ここからは、VSCode上で作業していきます。
Snowflake公式チュートリアルを元にMetalsがインストール済みでhello-world PJが作成してあることを前提とします。
まずはbuild.sbtに依存関係を追加していきます。

build.sbt
# 追記
resolvers += "OSGeo Release Repository" at "https://repo.osgeo.org/repository/release/"

libraryDependencies ++= Seq(
    "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2",
    "com.snowflake" % "snowpark" % "0.6.0",
    "com.typesafe" % "config" % "1.4.1",
    "org.pmml4s" %% "pmml4s" % "0.9.11"
)

変更を保存したらimport changesを押下し、依存パッケージをダウンロードしていきます。

データに適用してみる

チュートリアルにもありますが、まずはSnowflakeのセッションを作成していきます。
今回は趣味でtypesafeのConfigFactoryを使ってconfigを作成していきます。

src/main/resources/application.conf
snowflake {
    url = "https://{YOUR_SNOWFLAKE_ACCOUNT}.snowflakecomputing.com:443",
    user = "{USER}",
    password = "{PASSWORD}",
    role = "{ROLE}",
    warehouse = "{WAREHOUSE}",
    db = "{DATABASE}",
    schema = "{SCHEMA}"
}

セッションを作成し

src/main/scala/Main.scala
object Main {
  def main(args: Array[String]): Unit = {
...

	val conf = ConfigFactory.load
	val configs = Map(
		"URL" -> conf.getString("snowflake.url"),
		"USER" -> conf.getString("snowflake.user"),
		"PASSWORD" -> conf.getString("snowflake.password"),
		"ROLE" -> conf.getString("snowflake.role"),
		"WAREHOUSE" -> conf.getString("snowflake.warehouse"),
		"DB" -> conf.getString("snowflake.db"),
		"SCHEMA" -> conf.getString("snowflake.schema")
	)

	val session = Session.builder.configs(configs).create

...
}

依存関係をセッションに含めます。
この時の注意点として、DataFrames のユーザー定義関数(UDFs)の作成 — UDF からのファイルの読み取りにもある通り

Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。

今回、UDFが学習済みのPMMLモデルファイルを読み込む必要があるため、モデルファイルも依存先に追加していきます。

src/main/scala/Main.scala
object Main {
  def main(args: Array[String]): Unit = {
...

	val libPath = new java.io.File("").getAbsolutePath
	session.addDependency(s"$libPath/src/main/resources/pmml4s_2.12-0.9.11.jar")
	session.addDependency(s"$libPath/src/main/resources/spray-json_2.12-1.3.6.jar")
	session.addDependency(s"$libPath/src/main/resources/scala-xml_2.12-1.2.0.jar")
	session.addDependency(s"$libPath/src/main/resources/iris.jar")

...
}

試しに一部のデータが描画できるか見てみましょう。

src/main/scala/Main.scala
object Main {
  def main(args: Array[String]): Unit = {
...

	val irisSchema = StructType(
		StructField("sepal_length", DoubleType, nullable = true) ::
		StructField("sepal_width", DoubleType, nullable = true) ::
		StructField("petal_length", DoubleType, nullable = true) ::
		StructField("petal_width", DoubleType, nullable = true) ::
		StructField("class", StringType, nullable = true) ::
		Nil
	)
	val df = session.read.schema(irisSchema).table("iris_data")
	println(df.show())

...
}

実行はVSCodeならエディタ上のMain Objectの上に位置する行に run|debug とある run から実行確認を行います。

---------------------------------------------------------------------------------
|"SEPAL_LENGTH"  |"SEPAL_WIDTH"  |"PETAL_LENGTH"  |"PETAL_WIDTH"  |"CLASS"      |
---------------------------------------------------------------------------------
|5.1             |3.5            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.0            |1.4             |0.2            |Iris-setosa  |
|4.7             |3.2            |1.3             |0.2            |Iris-setosa  |
|4.6             |3.1            |1.5             |0.2            |Iris-setosa  |
|5.0             |3.6            |1.4             |0.2            |Iris-setosa  |
|5.4             |3.9            |1.7             |0.4            |Iris-setosa  |
|4.6             |3.4            |1.4             |0.3            |Iris-setosa  |
|5.0             |3.4            |1.5             |0.2            |Iris-setosa  |
|4.4             |2.9            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.1            |1.5             |0.1            |Iris-setosa  |
---------------------------------------------------------------------------------

結果が返ってきました!

次に、モデルを適用するためのUDFを定義していきます。

モデルをsessionに追加したjarファイルから読み込むように、UDF内で定義すると上手くいきます。

var resourceName = "/RandomForestIris.pmml"

注意点が何点かあります。

  • ここではjarファイルの中身を展開した時のPMMLファイルパスを指定するようにします。
  • model.predict()List[Any]を返すので数値にCASTし、関数が扱えるようにしています。
  • RandomForestIris.pmmlファイルを見てみると、schemaとして定義されているカラムの順番がpetal -> sepalの順になっています。(もしかしたら学習時にそのようなカラム順で指定していたかもしれないですが)それに合わせ、modelに渡すカラムを並び替えています。
    • 流石に勝手に並び替えることは恐らくないので、作業手順の誤りな気がしています
  • RandomForestは分類したいクラスごとにprobabilityがあるため、それらの数値のうちmaxとなるindexでModelクラス内に保持しているclassesから実際の分類先を引きます。
    • もっと適した方法があれば教えて欲しいです
src/main/scala/Main.scala
class SerTestFunc extends Serializable {
  val rfFunc = (
    petal_length: Double,
    petal_width: Double,
    sepal_length: Double,
    sepal_width: Double) => {
      import java.io._
      var resourceName = "/RandomForestIris.pmml"
      var inputStream = classOf[com.snowflake.snowpark.DataFrame]
        .getResourceAsStream(resourceName)
      val model = Model.fromInputStream(inputStream)
      val v = Array[Double](petal_length, petal_width, sepal_length, sepal_width)
      val pred = model.predict(v).map(_.asInstanceOf[Double])
      model.classes(pred.indices.maxBy(pred)).toString()
    }
}

さて、いよいよ実際にモデルを適用してみましょう!
Spark DataFrameと同様に遅延評価されるので

println(df)

のようなことをしても、データフレームの中身が評価され値が表示されることはありません。

src/main/scala/Main.scala
object Main {
  def main(args: Array[String]): Unit = {
    val df = getIrisDf(session)

    val transformationFunc = new SerTransformationFunc()
    val irisTransformationUDF = udf(transformationFunc.rfFunc)
  
    val dfFitted = df.withColumn(
      "label", irisTransformationUDF(
        col("petal_length"), col("petal_width"), col("sepal_length"), col("sepal_width"))
    )
    println(dfFitted.show(150))

...
|4.6             |3.4            |1.4             |0.3            |Iris-setosa      |Iris-setosa      |
|5.0             |3.4            |1.5             |0.2            |Iris-setosa      |Iris-setosa      |
|4.4             |2.9            |1.4             |0.2            |Iris-setosa      |Iris-setosa      |
|4.9             |3.1            |1.5             |0.1            |Iris-setosa      |Iris-setosa      |
|5.4             |3.7            |1.5             |0.2            |Iris-setosa      |Iris-setosa      |
|4.8             |3.4            |1.6             |0.2            |Iris-setosa      |Iris-setosa      |
|4.8             |3.0            |1.4             |0.1            |Iris-setosa      |Iris-setosa      |
|4.3             |3.0            |1.1             |0.1            |Iris-setosa      |Iris-setosa      |
|5.8             |4.0            |1.2             |0.2            |Iris-setosa      |Iris-setosa      |
|5.7             |4.4            |1.5             |0.4            |Iris-setosa      |Iris-setosa      |
|5.4             |3.9            |1.3             |0.4            |Iris-setosa      |Iris-setosa      |
...

おーーーーーーーー!!
CLASSとLABELがそこそこ一致しています!!できていそう!!

最近少しScalaを触り始めて難しいなと思いながらやっていきます。

Discussion