Synapse パイプラインのスクリプト アクティビティで Synapse サーバーレス SQL を実行する
はじめに
Synapse サーバーレス SQL のスクリプトを Synapse パイプラインのスクリプト アクティビティとして実行する方法についてまとめる。
1. Synapse サーバーレス SQL のデータベースを作成
Synapse ワークスペースで SQL スクリプトを新規作成し、データベースを作成する。
以下は例。
USE master;
-- Create database if not exists
IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = 'demo_data')
CREATE DATABASE demo_data;
ELSE
PRINT('The database exists.')
GO
2. Synapse サーバーレス SQL のリンク サービスを作成
Azure ポータルの Synapse の [概要] ページでサーバーレス SQL エンドポイントの FQDN をコピー
Synapse ワークスペースの [管理ハブ] > [新規] > [Azure Synapse Analytics] を選択
[新しいリンク サービス] 画面で以下の必要事項を入力して [テスト接続] をクリック > 接続成功したら [コミット] をクリック
- 名前: 任意の名前
- アカウントの選択方法: [手動で入力] にチェック
- 完全修飾ドメイン名: 上記でコピーしたサーバーレス SQL の FQDN をペースト
- データベース名: 1 で作成したサーバーレス SQL のデータベース名を入力
- 認証の種類: 任意の種類を選択 (ここではシステム割り当てマネージド ID を選択)
3. Synapse パイプラインを作成
- Synapse ワークスペースの [統合] ハブで新規パイプラインを作成
- アクティビティ一覧のうち、[全般] > [スクリプト] をキャンバスにドラッグ & ドロップ
スクリプト アクティビティの設定
- 全般
- 名前: 任意の名前を入力
- 設定
- リンク サービス: 2 で作成したリンク サービスを選択
- スクリプト: NonQuery を選択、テキスト ボックスに実行したい T-SQL を入力
- (お好みで) 詳細 > ログを有効にするにチェック & アクティビティの出力にチェック
パイプラインの例
以下は作成したパイプラインの例。
この例では、冪等性を持たせるためにスクリプト アクティビティの前に削除アクティビティを実行している。削除アクティビティで Synapse サーバーレス SQL の CETAS (Create External Table As Select) ステートメントの書き込み先ディレクトリを削除している。
スクリプト例
以下はスクリプト例。
外部データ ソースを作成 → 外部ファイル フォーマットを作成 → 外部テーブルを一旦ドロップ → CETAS ステートメントで外部テーブルを作成という流れ。
ストレージ アカウントやパスはダミーの内容のため、適宜置き換えること。
-- Create external data source if not exists
IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'DemoData')
CREATE EXTERNAL DATA SOURCE DemoData WITH (
LOCATION = 'https://sthinadsdemodfs.dfs.core.windows.net/demo-data'
)
ELSE
PRINT('The external data source exists.')
-- Create external file format if not exists
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'Parquet')
CREATE EXTERNAL FILE FORMAT Parquet WITH (
FORMAT_TYPE = PARQUET
)
ELSE
PRINT('The external file format exists.')
-- Drop external table if exists
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[parquet_olist_customers_dataset]') AND type in (N'U'))
DROP EXTERNAL TABLE [dbo].[parquet_olist_customers_dataset]
ELSE
PRINT('The external table does not exist.')
-- Create external tabl
CREATE EXTERNAL TABLE [dbo].[parquet_olist_customers_dataset]
WITH (
LOCATION = 'SynapseServerlessAsPipelineActivity/parquet/olist_customers_dataset/',
DATA_SOURCE = DemoData,
FILE_FORMAT = Parquet
)
AS
SELECT
*
FROM
OPENROWSET(
BULK 'https://sthinadsdemodfs.dfs.core.windows.net/demo-data/SynapseServerlessAsPipelineActivity/csv/olist_customers_dataset.csv',
FORMAT = 'CSV',
HEADER_ROW = TRUE,
PARSER_VERSION = '2.0'
) AS [result]
まとめ
Synapse サーバーレス SQL の CETAS ステートメントによるデータ変換を Synapse パイプラインのスクリプト アクティビティとして組み込めることが分かった。
この記事では扱わなかったが、CETAS ステートメントをストアド プロシージャとして作成しておけば、Synapse パイプラインのストアド プロシージャ アクティビティとして実行することも可能である。
Synapse パイプラインの主要なデータ加工のアクティビティには、コピーやマッピング データ フロー、Synapse Spark ノートブックなどがあるが、比較的手軽に柔軟な処理が行える Synapse サーバーレス SQL の CETAS ステートメントは、既存のデータ加工系アクティビティの間を程よく埋めてくれる存在になりそうだ。
参考サイト