セマンティックモデルのインポートトリガーをAPIで行うケースと方法
はじめに
私はたまにSnowflakeとPowerBIを用いて開発することがありますが、この記事ではAPIを用いて、PowerBIのセマンティックモデル(データセット)の更新をトリガーする方法を共有します。
この方法はPowerBIでインポートモードを使用しているケースが対象です。インポートモードとDirect Queryモードの違いはほかに多くの記事があるのでそちらを参照してください。最近はOnelake関連でDirect Lakeモードというこれらの中間のような方法もでていますね。
さらに以下のようなニーズがある場合に検討されると考えます。
Case 1 先行するETL処理の終了直後にデータセットの更新をしたいとき
いくらかのSaaSではETL処理更新後、マネージドにPowerBIのセマンティックモデルのインポートをトリガーする機能も公開されているようですが、私が使用するツールでは対応していなく、
コード化することで解決しました。
データウェアハウス内でのETLのオーケストレーションにMWAA等を持ちいる場合、以下のような構成でデータインジェストからBIのデータ更新まで一連の処理を管理できます。インポートモードを使いたいが、データの鮮度を求めるケースにも対応できますね。
Case 2 セマンティックモデル更新のリトライを実施したいとき
GUIから設定するインポートの設定ではリトライの設定は不可能です。もしデータ量が多くなり、更新の失敗が続く場合は、API経由でのデータセット更新を試してみる価値はあるかもしれません。
Case 3 複雑な増分更新を利用したいとき
2のケースと同じく、データ量が多い場合にセマンティックモデルの増分更新を検討するケースもあるでしょう。
増分更新では下の2つのデータの考え方が存在します。
更新の設定方法ですが、増分期間を「実行日からn{日・月・年}前」とPowerBI Desktopから指定することができます。
※アーカイブデータの期間も指定できます
結果、テーブルにdate型の列(日付列と呼ぶ)が存在するという条件のもので
実行日-増分期間 < 日付列 < 実行日
となる行が更新される仕様となっています。(やや複雑ですね...)
通常の増分更新を実行する際はPowerBI Desktopから設定し、PowerBI Serviceにあげておけば実行できるのですが、以下のようなケースは通常の更新では対応できません。
- 増分更新と全行更新を切り替えたいとき
例)Rawデータの洗い替え等で増分更新対象になっていない期間の元データが更新された場合、BIに反映するために全行更新を実施したいとき -
実行日を別の日付で上書きしたいとき
例)予算管理の目的等で未来の日付分もテーブルにあり、実行日だと更新したい範囲が含まれないとき
この場合はAPI経由での増分更新を検討する価値はありそうです。
実装
準備
今回はサービスプリンパルを利用しログインします。以下の情報を事前に準備し、環境変数等に格納しておきます。
- TenantId
- ApplicationId
- Secret
コード
Pythonを利用します。
ワークスペース名、セマンティックモデルの名称を与え、ワークスペース、セマンティックモデルのIDを特定し更新をトリガーするという流れです。
(通名はセマンティックモデルになりましたが、url中はdatasetsのままなのはご愛敬ですね)
コード
import os
import json
import requests
from azure.identity import ClientSecretCredential
# 認証情報
TENANT_ID = os.environ['TenantId']
APP_ID = os.environ['ApplicationId']
APP_SECRET = os.environ['Secret']
API_SCOPE = 'https://analysis.windows.net/powerbi/api/.default'
# 対象
TARGET_WORKSPACE = 'ABC_WORKSPACE'
TARGET_DATASET_NAME = 'XYZ_DATA'
BASE_URL = 'https://api.powerbi.com/v1.0/myorg/'
def get_access_token():
"""Azure ADを利用してアクセストークンを取得"""
credential = ClientSecretCredential(
authority='https://login.microsoftonline.com/',
tenant_id=TENANT_ID,
client_id=APP_ID,
client_secret=APP_SECRET
)
token = credential.get_token(API_SCOPE).token
return token
def get_workspace_id(headers, target_workspace_name):
"""指定された名前のワークスペースIDを取得"""
response = requests.get(BASE_URL + 'groups', headers=headers)
workspaces = response.json().get('value', [])
for workspace in workspaces:
if workspace['name'] == target_workspace_name:
return workspace['id']
return None
def get_dataset_id(headers, workspace_id, target_name):
"""指定された名前に一致するデータセットIDを取得"""
datasets_url = f"{BASE_URL}groups/{workspace_id}/datasets"
response = requests.get(datasets_url, headers=headers)
datasets = response.json().get('value', [])
for dataset in datasets:
if dataset['name'] == target_name:
return dataset['id']
return None
def refresh_dataset(headers, workspace_id, dataset_id):
"""指定されたデータセットの更新をリクエスト"""
refresh_url = f"{BASE_URL}groups/{workspace_id}/datasets/{dataset_id}/refreshes"
data = {
"type": "full",
"commitMode": "transactional",
"maxParallelism": 2
}
response = requests.post(refresh_url, headers=headers, json=data)
return response
def main():
# アクセストークンの取得
access_token = get_access_token()
headers = {'Authorization': f'Bearer {access_token}'}
# ワークスペースIDの取得
workspace_id = get_workspace_id(headers, TARGET_WORKSPACE)
if not workspace_id:
print(f"Workspace '{TARGET_WORKSPACE}' not found.")
return
# データセットIDの取得(完全一致)
dataset_id = get_dataset_id(headers, workspace_id, TARGET_DATASET_NAME)
if not dataset_id:
print(f"Dataset '{TARGET_DATASET_NAME}' not found.")
return
# データセット更新のリクエスト
response = refresh_dataset(headers, workspace_id, dataset_id)
if response.status_code == 202:
print("Dataset refresh initiated successfully.")
else:
print(f"Failed to initiate dataset refresh: {response.status_code} - {response.text}")
if __name__ == "__main__":
main()
上の例のデータセットの更新をリクエストする箇所にdataを渡していますが、実際は以下のパラメータも渡すことができます。
パラメータ | 用途 | デフォルト |
---|---|---|
retryCount | リトライ回数の設定 | 0 |
applyRefreshPolicy | 増分・全行更新の切り替え | true (増分更新設定がある場合に適用) |
effectiveDate | 増分更新時の実行日(期間末尾)の上書き | 実行日 |
他にもパラメータはあります、詳細はドキュメントを。
こうして実行し、PowerBI Serviceでセマンティックモデルの更新履歴を確認すると、以下のように進行中と表示されています。
考察
ここまでPythonを用いて、API経由でPowerBIのセマンティックモデルの更新をトリガーする方法を紹介しました。
ここで今更ですが、そもそもセマンティックモデルをインポートモードではなく、Direct Queryモードを用いる場合には、上記のような開発は全く不要です。
特に上であげた
2. セマンティックモデル更新のリトライを実施したいとき
3. 複雑な増分更新を利用したいとき
の場合はデータ量が多いことが起因でこのアプローチに至ったことでしょう。
データ量が多いケースではインポートよりもかえってDirect Queryモードのほうが安定したり、速度もそこそこ期待できるケースも多いため、インポートモードでしか使用できないDAX関数が必要な場合を除きDirect Queryモードの利用も視野にいれてみてはいかがでしょうか?
なので実際にAPIでセマンティックモデルを更新すべきケースは
1.ETL処理の終了直後にデータセットの更新をしたい
だけになるのかなと考えています。
また余談ですがPowerBIのREST APIをPythonで利用する日本語記事も少なく感じてます。上のコードをセマンティックモデルの更新以外の用途にも転用していただくことも容易かなと考えています。
ここでまで読んでいただきありがとうございました。
Discussion