💻

Kubernetes + SQL Server で Spark を使い始める

2020/12/02に公開

この記事について

この記事は、Azure SQL & Synapse Analytics Advent Calendar 2020 2 日目の記事です。

この記事では、SQL Server 2019 で追加された Spark 機能を使う方法について記載しています。
SQL Server 2019 では、Kubernetes 上で稼働する SQL Server として、SQL Server Big Data Cluster が新登場しました。
この SQL Server Big Data Cluster によって、SQL Server や Spark、HDFS のコンテナーを Kubernetes 上にスケーラブルに展開することができ、ビックデータの読み取り・書き込み・処理を行うことができます。

SQL Server で Spark を使うメリット

ビッグデータ環境の管理が容易になります。
Microsoft が提供する HDFS と Spark、分析ツールなど、データレイクを作成するために必要なすべてのものが付属しており、すべて SQL Server と緊密に統合され、Microsoft によって完全にサポートされています。
これで、構造化データと非構造化データに対してアプリ、分析、AI を実行できます。使い慣れた T-SQL クエリを使用するか、Spark に精通している人が Python、R、Scala、または Java を同じ統合クラスター内で使用してデータの準備や分析のために Spark ジョブを実行できます。
(Microsoft Docs より引用)

インストール

BIg Data Cluster のインストールは、基本的に以下の記事を参考にしています。

環境

この記事では、以下の環境を使っています。

  • OS: Windows 10 Pro 10.0.19042 N/A ビルド 19042

Python のインストール

以下のダウンロードサイトより、Python のインストーラーをダウンロードし、インストールします。

Azure Data CLI (azdata) のインストール

Azure Data CLI (azdata) は、REST API 経由でデータ サービスをブートストラップし管理できる、Python で記述されたコマンドライン ユーティリティです。
以前の azdata は pip 経由でのインストールでした。今は Windows インストーラー (.msi) を使用します。
古い azdata が存在している場合は、削除を行います。

pip list --format columns
pip freeze | grep azdata-* | xargs pip uninstall -y

その後、Windows インストーラーを使用して、azdata をインストールします。

Azure Data Studio のインストール

Azure Data Studio がまだ未インストールの場合は、インストールします。

Data Virtualization 拡張機能のインストール

Azure Data Studio の拡張機能として、Data Virtualization をインストールします。

SQL Server Big Data Cluster のインストール

今回は、Azure Kubernetes Service を使用していきます。
AKS 以外の Kubernetes 環境にもデプロイは可能です。詳しくは、以下のサイトを確認してください。

Azure Data Studio を開きます。Azure Data Studio の ターミナル で、以下のコマンドを実行し、Azure にログインします。

az login

ブラウザでログイン情報の入力を求められるので、入力を行います。
認証に成功したら、以下のような内容が Azure Data Studio に表示されるはずです。

You have logged in. Now let us find all the subscriptions to which you have access...
...(以下、認証情報の JSON)

リソースグループを作成し、AKS を構築し、接続した後、AKS のノードの状態を確認します。
詳しくは、先述の Qiita 記事を見てください。

az group create --name sqlbdc-rg --location japaneast
az aks create -n sqlbdc -g sqlbdc-rg --generate-ssh-key --node-vm-size Standard_L8s -c 1
az aks get-credentials -n sqlbdc -g sqlbdc-rg
kubectl get nodes

3 つ目のコマンドで、C:\Users\<your username>\.kube\config 配下に kubeconfig ファイルを作成します。
上書きすることで、他の環境への接続に影響しないよう、他で Kubernetes を使用している人は注意してください。

最後までコマンドが正常に実行されれば、以下のような AKS のノードの情報が返ってくるはずです。

NAME                                STATUS   ROLES   AGE   VERSION
aks-nodepool1-22556056-vmss000000   Ready    agent   10m   v1.18.10

AKS の構築が完了したら、SQL Server Big Data Cluster をインストールしていきます。
今回は、既定で用意されている構成設定のうち、基本となる構成の aks-dev-test を使用します。

SET AZDATA_USERNAME=admin
SET AZDATA_PASSWORD=<複雑なパスワード>

複雑なパスワードは、!&' は使用できません。また、@ の利用は検証用途などであれば可能な限り避けた方が作業がスムーズです。

azdata bdc create --config aks-dev-test --accept-eula yes

コマンドを入力すると、以下のような文章が出てきます。

NOTE: Cluster creation can take a significant amount of time depending on configuration, network speed, and the number of nodes in the cluster.

Starting cluster deployment.
Waiting for cluster controller to start.

SQL Server Big Data Cluster の構築には時間がかかるということなので、15 ~ 30 分ぐらい待ちます。

Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Cluster controller endpoint is available at xxx.xxx.xxx.xxx:30080.
Cluster control plane is ready.
Data pool is ready. 
Storage pool is ready. 
Cluster 'mssql-cluster' is not ready after 15.0 minutes. Check controller logs for more details.
Master pool is ready. 
Compute pool is ready. 
Cluster 'mssql-cluster' deployed successfully.

Cluster 'mssql-cluster' deployed successfully. の出力が行われれば、SQL Server Big Data Cluster のインストールは完了です。

SQL Server Big Data Cluster の設定

インストールファイルの確認

azdata bdc config init --source aks-dev-test --path <構成ファイルを出力したいフォルダのパス>
start <構成ファイルを出力したいフォルダのパス>

その他、インストールされたコンポーネントや、ポッド、サービスの確認方法については、先述の Qiita 記事を見てください。

Azure Data Studio で接続

SQL Server Big Data Cluster に Azure Data Studio で接続します。
まずは、kubectl config get-contexts で sqlbdc の AUTHINFO の設定値を確認します。

kubectl config get-contexts sqlbdc
(出力結果例)
CURRENT   NAME     CLUSTER   AUTHINFO                       NAMESPACE
*         sqlbdc   sqlbdc    clusterUser_sqlbdc-rg_sqlbdc

新しく名前空間を指定して、コンテキストを作成します。

kubectl config set-context mssql-cluster --namespace=mssql-cluster --cluster=sqlbdc --user=<確認した AUTHINFO 値>
(実行例)
kubectl config set-context mssql-cluster --namespace=mssql-cluster --cluster=sqlbdc --user=clusterUser_sqlbdc-rg_sqlbdc

コンテキストを切り替えます。

kubectl config use-context mssql-cluster

kubectl get svc を実行して、service/master-svc-externalEXTERNAL-IPPORT(S)を確認します。
ポート番号は、既定で 31433 が設定されているはずです。

kubectl get svc

NAME                      TYPE           CLUSTER-IP     EXTERNAL-IP      PORT(S)
...(省略)...
master-svc-external       LoadBalancer   10.0.73.95     xxx.xxx.xxx.xxx       31433:30793/TCP

Azure Data Studio で、SQL Server Big Data Cluster に接続します。
入力する情報は、以下の通りです。

  • 接続の種類: Microsoft SQL Server
  • Server: <前項で確認した EXTERNAL-IP>,31433
  • User name: admin
  • Password: AZDATA_PASSWORD で指定したパスワード
  • Database: <既定>
  • サーバーグループ: <既定>
  • Name (optional): ※任意で好きな名前を設定してください

サンプルデータの挿入

Spark で処理を行うために、SQL Server Big Data Cluster にサンプルデータを挿入します。

Windows コマンドプロンプトを開き、以下のコマンドを実行します。

cd <サンプルデータを保存する任意のディレクトリ>
curl -o bootstrap-sample-db.cmd "https://raw.githubusercontent.com/Microsoft/sql-server-samples/master/samples/features/sql-big-data-cluster/bootstrap-sample-db.cmd"
curl -o bootstrap-sample-db.sql "https://raw.githubusercontent.com/Microsoft/sql-server-samples/master/samples/features/sql-big-data-cluster/bootstrap-sample-db.sql"

指定したディレクトリに、以下のファイルが出力されるはずです。

  • bootstrap-sample-db.cmd
  • bootstrap-sample-db.sql

こちらのバッチファイル (.bat) を使用して、サンプルデータを挿入します。
バッチファイル実行のために、追加で指定が必要となる Knox (gateway-svc-external) のエンドポイント (EXTERNAL-IP) 値を確認します。
Azure Data Studio で SQL Server Big Data Cluster に接続する際に確認したように、改めて kubectl get svc で情報を確認します。

kubectl config get-contexts sqlbdc
kubectl config use-context mssql-cluster
kubectl get svc master-svc-external gateway-svc-external

NAME                   TYPE           CLUSTER-IP     EXTERNAL-IP     PORT(S)           AGE
master-svc-external    LoadBalancer   10.0.73.95     xxx.xxx.xxx.xxx 31433:30793/TCP   10h
gateway-svc-external   LoadBalancer   10.0.170.105   xxx.xxx.xxx.xxx 30443:32618/TCP   10h

取得したバッチファイルの実行には、AZDATA_USERNAME および AZDATA_PASSWORD の 2 つの環境変数を利用します。
SQL Server Big Data Cluster のインストール時に実施した事と同じ内容を、再度 Windows コマンドプロンプトでも実施します。

SET AZDATA_USERNAME=admin
SET AZDATA_PASSWORD=<AKS の admin ユーザーの管理パスワード>

環境変数を設定したら、以下のコマンドを実行して、SQL Server Big Data Cluster にサンプルデータを投入します。

.\bootstrap-sample-db.cmd mssql-cluster <master-svc-external の EXTERNAL-IP 値> <gateway-svc-external の EXTERNAL-IP 値>

このバッチ処理を初回実行すると、AppData\Local\Temp の下に一時ファイルをダウンロードします。20 分程度時間がかかるので、完了まで待ちます。
Azure Data Studio で SQL Server Big Data Cluster に接続し、Sales データベースが存在していることを確認してください。
また、HDFS の下に clickstream_data フォルダが作成され web_clickstream.csv ファイルが存在していることを確認してください。

Spark 実行環境の設定

Azure Data Studio の SERVERS ウインドウで、作成した SQL Server Big Data Cluster の接続情報を選択し、右クリックマスターインスタンスのサーバーダッシュボード を表示します。

New Query選択 して、クエリ画面を開きます。
MSSQL-Spark コネクタのアクセス許可を作成します。

USE sales
CREATE LOGIN sample_user  WITH PASSWORD ='password123!#' 
CREATE USER sample_user FROM LOGIN sample_user

-- To create external tables in data pools
GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;

-- To create external tables
GRANT CREATE TABLE TO sample_user;
GRANT ALTER ANY SCHEMA TO sample_user;

-- To view database state for Sales
GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;

ALTER ROLE [db_datareader] ADD MEMBER sample_user
ALTER ROLE [db_datawriter] ADD MEMBER sample_user

設定が終わったら、データプールへの外部データソースを作成します。

USE sales
GO
IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
  CREATE EXTERNAL DATA SOURCE SqlDataPool
  WITH (LOCATION = 'sqldatapool://controller-svc/default');

データプールで、web_clickstreams_spark_results という名前の外部テーブルを作成します。

USE sales
GO
IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
   CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
   ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
   WITH
   (
      DATA_SOURCE = SqlDataPool,
      DISTRIBUTION = ROUND_ROBIN
   );

実行が完了したら、データプールのログインを作成し、ユーザーにアクセス許可を与えます。

EXECUTE('CREATE LOGIN sample_user  WITH PASSWORD = ''password123!#'' ;') AT DATA_SOURCE SqlDataPool;
EXECUTE('CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user;  ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;

Spark 実行

Azure Data Studio の SERVERS ウインドウで、作成した SQL Server Big Data Cluster の接続情報を選択し、右クリックマスターインスタンスのサーバーダッシュボード を表示します。

New Notebook選択 して、ノートブック画面を開きます。
Kernel 欄で Spark | Scala を選択します。

Configure Python Rumtime の画面が表示された場合は、Installation TypeUse existing Python installation を選択の上、Next を選択し、次の Install Dependencies 画面で、Install を選択してください。

上記を実施すると、バックグラウンドでモジュールの追加インストールが行われます。
インストールは TASKS ウインドウで進捗を確認できます。
インストール処理が完了するまで、待ってください。

import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}

// Change per your installation
val user= "sample_user"
val password= "password123!#"
val database =  "MyTestDatabase"
val sourceDir = "/clickstream_data"
val datapool_table = "web_clickstreams_spark_results"
val datasource_name = "SqlDataPool"
val schema = StructType(Seq(
StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), 
StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
))

val hostname = "master-p-svc"
val port = 1433
val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}

val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
          batchDF.write
           .format("com.microsoft.sqlserver.jdbc.spark")
           .mode("append")
            .option("url", url)
            .option("dbtable", datapool_table)
            .option("user", user)
            .option("password", password)
            .option("dataPoolDataSource",datasource_name).save()
         }.start()

query.awaitTermination(40000)
query.stop()

データの確認

USE sales
GO
SELECT count(*) FROM [web_clickstreams_spark_results];
SELECT TOP 10 * FROM [web_clickstreams_spark_results];

参考情報

Microsoft Docs

Discussion