🥰

Scala から Databricks の Delta Sharing を Apache Spark 経由で使う

2023/09/13に公開

TL;DR;

  • Delta Sharing は Scala でもできる.
  • ローカルでシングルノードの Spark を動かすには Scala CLI があればいい.
  • json4s と jackson のバージョンを 3.5.x 系にしないと動かない. Scala 2.12.x の場合は 3.5.3 を Scala 2.13.x の場合は 3.5.5 を利用する.
  • Java 8、11、17 で動作確認した. Java 9 以上の場合、モジュールの --add-opens 指定が必要.
  • Python と異なり DeltaSharingRestClient はパブリックには公開されていない.
  • UDT の含まれるテーブルを受信しようとするとエラーになる. つらい.

Delta Sharing は、簡単にいうと Databricks のウェアハウスに保存したデータを Pandas, Polars などのDataFrameライブラリや PowerBI、Tableau などの BI ツールから取得できるようにするプロトコル. テーブルのバッチ読み込みはもちろんストリーミング読み込み(マイクロバッチ)もできる.

公式のサンプルには Python の例しかない が、Scala(や Java) でも利用できるのでここでは Scala を利用してクエリする例を紹介する.

//> using scala "2.13.12"
//> using options "-Xlint"
//> using dep "org.scala-lang.modules::scala-collection-compat:2.11.0"
//> using dep "org.apache.spark::spark-core:3.5.0"
//> using dep "org.apache.spark::spark-streaming:3.5.0"
//> using dep "org.apache.spark::spark-sql:3.5.0"
//> using dep "org.apache.spark::spark-catalyst:3.5.0"
//> using dep "io.delta::delta-core:2.4.0"
//> using dep "io.delta::delta-sharing-spark:0.7.5"
//> using dep "org.codehaus.jackson:jackson-mapper-asl:1.9.13"
//> using dep "com.fasterxml.jackson.core:jackson-core:2.15.2"
// Scala 2.12.X
///> using dep "org.json4s::json4s-ast:3.5.3"
///> using dep "org.json4s::json4s-core:3.5.3"
///> using dep "org.json4s::json4s-jackson:3.5.3"
// Scala 2.13.X
//> using dep "org.json4s::json4s-ast:3.5.5"
//> using dep "org.json4s::json4s-core:3.5.5"
//> using dep "org.json4s::json4s-jackson:3.5.5"
//> using dep "org.apache.httpcomponents:httpclient:4.5.14"

// Java 9+ 向けの設定
//> using javaOptions "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
//> using javaOptions "--add-opens=java.base/sun.security.action=ALL-UNNAMED"

// hack to violate encapsulation to use DeltaSharingRestClient
package io.delta.sharing.spark

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.streaming.Trigger
object App extends DeltaSharingConfig {
  def main(args: Array[String]): Unit = {
    println(client.listAllTables().mkString("\n"))

    val spark = SparkSession
      .builder()
      .appName("example")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._
    val df = spark.read
      .format("deltaSharing")
      .load(table)
      .limit(1000)

    df.show(10)
  }

  def _main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("example")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    import scala.concurrent.duration._
    spark.readStream
      .format("deltaSharing")
      .load(logTable)
      .writeStream
      .format("console")
      .trigger(Trigger.ProcessingTime(30.seconds))
      .outputMode("append")
      .start()
      .awaitTermination()
  }
}

trait DeltaSharingConfig {
  val share = "config.share"
  val table = share + "/#<share name>.<schema name>.<table name>"
  val logTable = share + "/#<share name>.<schema name>.<table name>"

  val provider: DeltaSharingProfileProvider =
    new io.delta.sharing.spark.DeltaSharingFileProfileProvider(
      new Configuration(),
      share
    )
    
  val client = new io.delta.sharing.spark.DeltaSharingRestClient(provider)

}

Discussion