Kubernetes + SQL Server で Spark を使い始める
この記事について
この記事は、Azure SQL & Synapse Analytics Advent Calendar 2020 2 日目の記事です。
この記事では、SQL Server 2019 で追加された Spark 機能を使う方法について記載しています。
SQL Server 2019 では、Kubernetes 上で稼働する SQL Server として、SQL Server Big Data Cluster
が新登場しました。
この SQL Server Big Data Cluster によって、SQL Server や Spark、HDFS のコンテナーを Kubernetes 上にスケーラブルに展開することができ、ビックデータの読み取り・書き込み・処理を行うことができます。
SQL Server で Spark を使うメリット
ビッグデータ環境の管理が容易になります。
Microsoft が提供する HDFS と Spark、分析ツールなど、データレイクを作成するために必要なすべてのものが付属しており、すべて SQL Server と緊密に統合され、Microsoft によって完全にサポートされています。
これで、構造化データと非構造化データに対してアプリ、分析、AI を実行できます。使い慣れた T-SQL クエリを使用するか、Spark に精通している人が Python、R、Scala、または Java を同じ統合クラスター内で使用してデータの準備や分析のために Spark ジョブを実行できます。
(Microsoft Docs より引用)
インストール
BIg Data Cluster のインストールは、基本的に以下の記事を参考にしています。
環境
この記事では、以下の環境を使っています。
- OS: Windows 10 Pro 10.0.19042 N/A ビルド 19042
Python のインストール
以下のダウンロードサイトより、Python のインストーラーをダウンロードし、インストールします。
Azure Data CLI (azdata) のインストール
Azure Data CLI (azdata) は、REST API 経由でデータ サービスをブートストラップし管理できる、Python で記述されたコマンドライン ユーティリティです。
以前の azdata は pip 経由でのインストールでした。今は Windows インストーラー (.msi) を使用します。
古い azdata が存在している場合は、削除を行います。
pip list --format columns
pip freeze | grep azdata-* | xargs pip uninstall -y
その後、Windows インストーラーを使用して、azdata をインストールします。
Azure Data Studio のインストール
Azure Data Studio がまだ未インストールの場合は、インストールします。
Data Virtualization 拡張機能のインストール
Azure Data Studio の拡張機能として、Data Virtualization をインストールします。
SQL Server Big Data Cluster のインストール
今回は、Azure Kubernetes Service を使用していきます。
AKS 以外の Kubernetes 環境にもデプロイは可能です。詳しくは、以下のサイトを確認してください。
Azure Data Studio を開きます。Azure Data Studio の ターミナル
で、以下のコマンドを実行し、Azure にログインします。
az login
ブラウザでログイン情報の入力を求められるので、入力を行います。
認証に成功したら、以下のような内容が Azure Data Studio に表示されるはずです。
You have logged in. Now let us find all the subscriptions to which you have access...
...(以下、認証情報の JSON)
リソースグループを作成し、AKS を構築し、接続した後、AKS のノードの状態を確認します。
詳しくは、先述の Qiita 記事を見てください。
az group create --name sqlbdc-rg --location japaneast
az aks create -n sqlbdc -g sqlbdc-rg --generate-ssh-key --node-vm-size Standard_L8s -c 1
az aks get-credentials -n sqlbdc -g sqlbdc-rg
kubectl get nodes
3 つ目のコマンドで、C:\Users\<your username>\.kube\config
配下に kubeconfig ファイルを作成します。
上書きすることで、他の環境への接続に影響しないよう、他で Kubernetes を使用している人は注意してください。
最後までコマンドが正常に実行されれば、以下のような AKS のノードの情報が返ってくるはずです。
NAME STATUS ROLES AGE VERSION
aks-nodepool1-22556056-vmss000000 Ready agent 10m v1.18.10
AKS の構築が完了したら、SQL Server Big Data Cluster をインストールしていきます。
今回は、既定で用意されている構成設定のうち、基本となる構成の aks-dev-test
を使用します。
SET AZDATA_USERNAME=admin
SET AZDATA_PASSWORD=<複雑なパスワード>
※複雑なパスワード
は、!
や &
や '
は使用できません。また、@
の利用は検証用途などであれば可能な限り避けた方が作業がスムーズです。
azdata bdc create --config aks-dev-test --accept-eula yes
コマンドを入力すると、以下のような文章が出てきます。
NOTE: Cluster creation can take a significant amount of time depending on configuration, network speed, and the number of nodes in the cluster.
Starting cluster deployment.
Waiting for cluster controller to start.
SQL Server Big Data Cluster の構築には時間がかかるということなので、15 ~ 30 分ぐらい待ちます。
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Waiting for cluster controller to start.
Cluster controller endpoint is available at xxx.xxx.xxx.xxx:30080.
Cluster control plane is ready.
Data pool is ready.
Storage pool is ready.
Cluster 'mssql-cluster' is not ready after 15.0 minutes. Check controller logs for more details.
Master pool is ready.
Compute pool is ready.
Cluster 'mssql-cluster' deployed successfully.
Cluster 'mssql-cluster' deployed successfully.
の出力が行われれば、SQL Server Big Data Cluster のインストールは完了です。
SQL Server Big Data Cluster の設定
インストールファイルの確認
azdata bdc config init --source aks-dev-test --path <構成ファイルを出力したいフォルダのパス>
start <構成ファイルを出力したいフォルダのパス>
その他、インストールされたコンポーネントや、ポッド、サービスの確認方法については、先述の Qiita 記事を見てください。
Azure Data Studio で接続
SQL Server Big Data Cluster に Azure Data Studio で接続します。
まずは、kubectl config get-contexts で sqlbdc の AUTHINFO
の設定値を確認します。
kubectl config get-contexts sqlbdc
(出力結果例)
CURRENT NAME CLUSTER AUTHINFO NAMESPACE
* sqlbdc sqlbdc clusterUser_sqlbdc-rg_sqlbdc
新しく名前空間を指定して、コンテキストを作成します。
kubectl config set-context mssql-cluster --namespace=mssql-cluster --cluster=sqlbdc --user=<確認した AUTHINFO 値>
(実行例)
kubectl config set-context mssql-cluster --namespace=mssql-cluster --cluster=sqlbdc --user=clusterUser_sqlbdc-rg_sqlbdc
コンテキストを切り替えます。
kubectl config use-context mssql-cluster
kubectl get svc を実行して、service/master-svc-external
の EXTERNAL-IP
と PORT(S)
を確認します。
ポート番号は、既定で 31433
が設定されているはずです。
kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
...(省略)...
master-svc-external LoadBalancer 10.0.73.95 xxx.xxx.xxx.xxx 31433:30793/TCP
Azure Data Studio で、SQL Server Big Data Cluster に接続します。
入力する情報は、以下の通りです。
- 接続の種類: Microsoft SQL Server
- Server: <前項で確認した EXTERNAL-IP>,31433
- User name: admin
- Password: AZDATA_PASSWORD で指定したパスワード
- Database: <既定>
- サーバーグループ: <既定>
- Name (optional): ※任意で好きな名前を設定してください
サンプルデータの挿入
Spark で処理を行うために、SQL Server Big Data Cluster にサンプルデータを挿入します。
Windows コマンドプロンプトを開き、以下のコマンドを実行します。
cd <サンプルデータを保存する任意のディレクトリ>
curl -o bootstrap-sample-db.cmd "https://raw.githubusercontent.com/Microsoft/sql-server-samples/master/samples/features/sql-big-data-cluster/bootstrap-sample-db.cmd"
curl -o bootstrap-sample-db.sql "https://raw.githubusercontent.com/Microsoft/sql-server-samples/master/samples/features/sql-big-data-cluster/bootstrap-sample-db.sql"
指定したディレクトリに、以下のファイルが出力されるはずです。
- bootstrap-sample-db.cmd
- bootstrap-sample-db.sql
こちらのバッチファイル (.bat)
を使用して、サンプルデータを挿入します。
バッチファイル実行のために、追加で指定が必要となる Knox (gateway-svc-external)
のエンドポイント (EXTERNAL-IP
) 値を確認します。
Azure Data Studio で SQL Server Big Data Cluster に接続する際に確認したように、改めて kubectl get svc
で情報を確認します。
kubectl config get-contexts sqlbdc
kubectl config use-context mssql-cluster
kubectl get svc master-svc-external gateway-svc-external
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
master-svc-external LoadBalancer 10.0.73.95 xxx.xxx.xxx.xxx 31433:30793/TCP 10h
gateway-svc-external LoadBalancer 10.0.170.105 xxx.xxx.xxx.xxx 30443:32618/TCP 10h
取得したバッチファイルの実行には、AZDATA_USERNAME
および AZDATA_PASSWORD
の 2 つの環境変数を利用します。
SQL Server Big Data Cluster のインストール時に実施した事と同じ内容を、再度 Windows コマンドプロンプトでも実施します。
SET AZDATA_USERNAME=admin
SET AZDATA_PASSWORD=<AKS の admin ユーザーの管理パスワード>
環境変数を設定したら、以下のコマンドを実行して、SQL Server Big Data Cluster にサンプルデータを投入します。
.\bootstrap-sample-db.cmd mssql-cluster <master-svc-external の EXTERNAL-IP 値> <gateway-svc-external の EXTERNAL-IP 値>
このバッチ処理を初回実行すると、AppData\Local\Temp
の下に一時ファイルをダウンロードします。20 分程度時間がかかるので、完了まで待ちます。
Azure Data Studio で SQL Server Big Data Cluster に接続し、Sales
データベースが存在していることを確認してください。
また、HDFS の下に clickstream_data
フォルダが作成され web_clickstream.csv
ファイルが存在していることを確認してください。
Spark 実行環境の設定
Azure Data Studio の SERVERS
ウインドウで、作成した SQL Server Big Data Cluster の接続情報を選択し、右クリック で マスターインスタンスのサーバーダッシュボード
を表示します。
New Query
を選択 して、クエリ画面を開きます。
MSSQL-Spark
コネクタのアクセス許可を作成します。
USE sales
CREATE LOGIN sample_user WITH PASSWORD ='password123!#'
CREATE USER sample_user FROM LOGIN sample_user
-- To create external tables in data pools
GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
-- To create external tables
GRANT CREATE TABLE TO sample_user;
GRANT ALTER ANY SCHEMA TO sample_user;
-- To view database state for Sales
GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
ALTER ROLE [db_datareader] ADD MEMBER sample_user
ALTER ROLE [db_datawriter] ADD MEMBER sample_user
設定が終わったら、データプールへの外部データソースを作成します。
USE sales
GO
IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
CREATE EXTERNAL DATA SOURCE SqlDataPool
WITH (LOCATION = 'sqldatapool://controller-svc/default');
データプールで、web_clickstreams_spark_results という名前の外部テーブルを作成します。
USE sales
GO
IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
WITH
(
DATA_SOURCE = SqlDataPool,
DISTRIBUTION = ROUND_ROBIN
);
実行が完了したら、データプールのログインを作成し、ユーザーにアクセス許可を与えます。
EXECUTE('CREATE LOGIN sample_user WITH PASSWORD = ''password123!#'' ;') AT DATA_SOURCE SqlDataPool;
EXECUTE('CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user; ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
Spark 実行
Azure Data Studio の SERVERS
ウインドウで、作成した SQL Server Big Data Cluster の接続情報を選択し、右クリック で マスターインスタンスのサーバーダッシュボード
を表示します。
New Notebook
を選択 して、ノートブック画面を開きます。
Kernel
欄で Spark | Scala を選択します。
Configure Python Rumtime
の画面が表示された場合は、Installation Type
で Use existing Python installation を選択の上、Next を選択し、次の Install Dependencies
画面で、Install を選択してください。
上記を実施すると、バックグラウンドでモジュールの追加インストールが行われます。
インストールは TASKS
ウインドウで進捗を確認できます。
インストール処理が完了するまで、待ってください。
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
// Change per your installation
val user= "sample_user"
val password= "password123!#"
val database = "MyTestDatabase"
val sourceDir = "/clickstream_data"
val datapool_table = "web_clickstreams_spark_results"
val datasource_name = "SqlDataPool"
val schema = StructType(Seq(
StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true),
StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
))
val hostname = "master-p-svc"
val port = 1433
val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("com.microsoft.sqlserver.jdbc.spark")
.mode("append")
.option("url", url)
.option("dbtable", datapool_table)
.option("user", user)
.option("password", password)
.option("dataPoolDataSource",datasource_name).save()
}.start()
query.awaitTermination(40000)
query.stop()
データの確認
USE sales
GO
SELECT count(*) FROM [web_clickstreams_spark_results];
SELECT TOP 10 * FROM [web_clickstreams_spark_results];
Discussion