🥰
Scala から Databricks の Delta Sharing を Apache Spark 経由で使う
TL;DR;
- Delta Sharing は Scala でもできる.
- ローカルでシングルノードの Spark を動かすには Scala CLI があればいい.
- json4s と jackson のバージョンを 3.5.x 系にしないと動かない. Scala 2.12.x の場合は 3.5.3 を Scala 2.13.x の場合は 3.5.5 を利用する.
- Java 8、11、17 で動作確認した. Java 9 以上の場合、モジュールの
--add-opens
指定が必要. - Python と異なり DeltaSharingRestClient はパブリックには公開されていない.
- UDT の含まれるテーブルを受信しようとするとエラーになる. つらい.
Delta Sharing は、簡単にいうと Databricks のウェアハウスに保存したデータを Pandas, Polars などのDataFrameライブラリや PowerBI、Tableau などの BI ツールから取得できるようにするプロトコル. テーブルのバッチ読み込みはもちろんストリーミング読み込み(マイクロバッチ)もできる.
公式のサンプルには Python の例しかない が、Scala(や Java) でも利用できるのでここでは Scala を利用してクエリする例を紹介する.
//> using scala "2.13.12"
//> using options "-Xlint"
//> using dep "org.scala-lang.modules::scala-collection-compat:2.11.0"
//> using dep "org.apache.spark::spark-core:3.5.0"
//> using dep "org.apache.spark::spark-streaming:3.5.0"
//> using dep "org.apache.spark::spark-sql:3.5.0"
//> using dep "org.apache.spark::spark-catalyst:3.5.0"
//> using dep "io.delta::delta-core:2.4.0"
//> using dep "io.delta::delta-sharing-spark:0.7.5"
//> using dep "org.codehaus.jackson:jackson-mapper-asl:1.9.13"
//> using dep "com.fasterxml.jackson.core:jackson-core:2.15.2"
// Scala 2.12.X
///> using dep "org.json4s::json4s-ast:3.5.3"
///> using dep "org.json4s::json4s-core:3.5.3"
///> using dep "org.json4s::json4s-jackson:3.5.3"
// Scala 2.13.X
//> using dep "org.json4s::json4s-ast:3.5.5"
//> using dep "org.json4s::json4s-core:3.5.5"
//> using dep "org.json4s::json4s-jackson:3.5.5"
//> using dep "org.apache.httpcomponents:httpclient:4.5.14"
// Java 9+ 向けの設定
//> using javaOptions "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
//> using javaOptions "--add-opens=java.base/sun.security.action=ALL-UNNAMED"
// hack to violate encapsulation to use DeltaSharingRestClient
package io.delta.sharing.spark
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.streaming.Trigger
object App extends DeltaSharingConfig {
def main(args: Array[String]): Unit = {
println(client.listAllTables().mkString("\n"))
val spark = SparkSession
.builder()
.appName("example")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val df = spark.read
.format("deltaSharing")
.load(table)
.limit(1000)
df.show(10)
}
def _main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
import scala.concurrent.duration._
spark.readStream
.format("deltaSharing")
.load(logTable)
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime(30.seconds))
.outputMode("append")
.start()
.awaitTermination()
}
}
trait DeltaSharingConfig {
val share = "config.share"
val table = share + "/#<share name>.<schema name>.<table name>"
val logTable = share + "/#<share name>.<schema name>.<table name>"
val provider: DeltaSharingProfileProvider =
new io.delta.sharing.spark.DeltaSharingFileProfileProvider(
new Configuration(),
share
)
val client = new io.delta.sharing.spark.DeltaSharingRestClient(provider)
}
Discussion