🎇

PySpark(Spark 3.4+)でSQL ServerにJDBC接続しCTEを利用する

2023/08/07に公開

はじめに

Spark 3.4からSQL ServerのJDBC接続において、CTEが利用できるようになった。しかし、option(prepareQuery) で指定する形で通常のSQLと異なる形となるので実際に試してみる。
結果だけを知りたい方は、実行結果の最初のコードを参照ください。
実際はCTEを試すための準備が一番大変でした....

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

前提条件

  • Spark 3.4+(下記のPRでCTEの対応がされた)
    https://github.com/apache/spark/pull/36440
  • PySparkはpoetry環境でインストール
  • M1 Macのローカル環境で実行(これはSQL Serverのdockerコンテナに影響)
  • SQL Serverへの接続にはAzure Data Studioを利用

実行内容

JDBCドライバーを準備

  • ./.venv/lib/python3.10/site-packages/pyspark/jarsにJDBCドライバー(mssql-jdbc-12.4.0.jre8.jar)を格納(JDBCドライバーのダウンロード[1])。それにより、別途Path指定をする必要なし。
  • 該当ディレクトリに保管しない場合は、"spark.driver.extraClassPath"、"spark.executor.extraClassPath"、や"spark.jars"などでドライバーのjarを指定する必要があるはず。(未検証)
オプション説明(with ChatGPT)

"spark.driver.extraClassPath"、"spark.executor.extraClassPath"、および"spark.jars"の違いについて説明します。

  1. spark.driver.extraClassPath:
    このオプションは、Sparkのドライバープロセスのクラスパスを設定します。クラスパスは、JavaやScalaのコードを実行する際にクラスやライブラリのロードを指示するパスのリストです。"spark.driver.extraClassPath"を使用することで、特定のJARファイルやディレクトリをドライバーのクラスパスに追加することができます。これにより、ドライバープロセスが必要なクラスやライブラリを見つけることができるようになります。

  2. spark.executor.extraClassPath:
    このオプションは、Sparkのエグゼキュータープロセスのクラスパスを設定します。ドライバーとは異なり、エグゼキューターノードはタスクの実行を担当しており、"spark.executor.extraClassPath"を使用してエグゼキューターのクラスパスに特定のJARファイルやディレクトリを追加することで、タスクが必要なクラスやライブラリを見つけることができるようになります。

  3. spark.jars:
    このオプションは、クラスパスに追加するJARファイルのリストを指定します。これにより、ドライバーおよびエグゼキューターノードの両方で使用するJARファイルを指定できます。"spark.jars"を使用する場合、指定されたJARファイルはドライバーとエグゼキューターの両方のクラスパスに追加されます。

要約すると、これらのオプションはSparkアプリケーションの実行に必要な外部ライブラリやクラスをクラスパスに追加するためのものですが、対象となるプロセス(ドライバーまたはエグゼキューター)および追加するファイルの範囲が異なります。

接続するSQL Serverを準備

  • こちら[2]を参考にM1 Mac上にdockerでSQL serverを構築
  • Azure Data Studioから事前準備のDDLやDMLを実行(データベースはcompany_database)
  • テーブルやデータはChatGPTがCTEのサンプルのために提示してくれたものを利用するため、中身に意味はない
  • 以下はCTEのサンプルを含めたAzure Data Studioでの実行結果
  • これと同様のCTEをJDBC接続で実行する
SELECT *
FROM employees;

WITH
    dept_avg_salary
    AS
    (
        SELECT emp_department, AVG(emp_salary) AS avg_salary
        FROM employees
        GROUP BY emp_department
    ),
    dept_max_salary
    AS
    (
        SELECT emp_department, MAX(emp_salary) AS max_salary
        FROM employees
        GROUP BY emp_department
    )
SELECT d1.emp_department, d1.avg_salary, d2.max_salary
FROM dept_avg_salary d1
    JOIN dept_max_salary d2 ON d1.emp_department = d2.emp_department;

PySparkにてCTEを実行

実行結果のとおり、CTEを実行するにはCTE部分をprepareQueryのoptionで指定し、CTEを利用したSQL部分をqueryのoptionで指定する。

実行結果

# sparkセッション確立は省略

prepare_sql = """
WITH
    dept_avg_salary
    AS
    (
        SELECT emp_department, AVG(emp_salary) AS avg_salary
        FROM employees
        GROUP BY emp_department
    ),
    dept_max_salary
    AS
    (
        SELECT emp_department, MAX(emp_salary) AS max_salary
        FROM employees
        GROUP BY emp_department
    )
"""
sql = """
SELECT d1.emp_department, d1.avg_salary, d2.max_salary
FROM dept_avg_salary d1
    JOIN dept_max_salary d2 ON d1.emp_department = d2.emp_department
"""

username = "SA" # お試しのため、SA(管理者ユーザ)を利用
password = "password" # SQL Serverコンテナ起動時に指定したもの

driver_class = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

df = (
    spark.read.format("jdbc")
    .option(
        "url",
        "jdbc:sqlserver://localhost:1433;databaseName=company_database;trustServerCertificate=true;",
    )
    .option("Driver", driver_class)
    .option("prepareQuery", prepare_sql)
    .option("query", sql)
    .option("user", username)
    .option("password", password)
    .load()
)

df.show()

+--------------+------------+----------+
|emp_department|  avg_salary|max_salary|
+--------------+------------+----------+
|       Finance|52000.000000|  52000.00|
|            HR|49000.000000|  50000.00|
|            IT|57500.000000|  60000.00|
+--------------+------------+----------+

その他補足

urlオプションにてtrustServerCertificate=trueを指定しない場合、下記のエラーとなる。利用するJDBCドライバーのバージョンでは、暗号化モードがデフォルトで"encrypt":true、"trustServerCertificate":false(つまり、暗号化の有効、TLS証明書の検証有効)となっており、お試しのためTLS証明書は認証機関発行のものでないため。[3]

Py4JJavaError: An error occurred while calling o206.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: "encrypt" property is set to "true" and "trustServerCertificate" property is set to "false" but the driver could not establish a secure connection to SQL Server by using Secure Sockets Layer (SSL) encryption: Error: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target.

queryオプションで指定するSQLにWITH句を指定した場合には、syntaxエラーとなる。

Py4JJavaError: An error occurred while calling o45.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the keyword 'WITH'.
脚注
  1. https://learn.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16 ↩︎

  2. https://pine619.hatenablog.com/entry/m1-mac-sql-server ↩︎

  3. https://learn.microsoft.com/ja-jp/sql/connect/jdbc/understanding-ssl-support?view=sql-server-ver16 ↩︎

Discussion