TROCCOワークフローからdatabricks jobを起動する
この記事はDatabricks Advent calendar 2024の18日目の記事です。
はじめに
TROCCOは2024年からDatabricksとの連携をサポートしました。
主な用途としては、Databricksにデータをdelta lake形式で転送したり(参考: 転送先Databricks)、逆にdatabricksのテーブルを外部のSaaSや別のDWHに書き出すようないわゆるリバースETL(参考: 転送元databricks)をコーディング不要で実現できます。
それぞれの機能については @manabian さんがわかりやすい記事をまとめてくださっているのでぜひそちらを参照ください(いつも本当にありがとうございます…!)。
さて、TROCCOとして公式に機能としてサポートしているdatabricksの連携は以上の2パターンなのですが、今回はもう少しニッチ、あるいはハック的な連携方法を紹介します。
それがタイトルの通り、「TROCCOのワークフローからdatabricks jobを起動する」ユースケースです。
まず具体的に利用する機能と手順を見てから、どんな時に役立ちそうか説明します。
まず大まかな手順としては以下のとおりです。
- Databricks上でノートブックを作成
- 1のノートブックをdatabricks jobに紐づける
- TROCCOのワークフローを作成
- 3のTROCCOのワークフローにHTTPタスクを登録し、2のdatabricks job apiにPOSTリクエストを送り、jobを実行する
これでTROCCOのワークフロー -> databricksのjobを順番に起動することができます。
早速具体的な方法を見ていきましょう。
Databricks側の準備
ノートブックの作成
任意のノートブックを作成し、jobに紐づけましょう。
今回は test
というノートブックを作成します。
今回はセル2つからなる、ウィジェットで値を受け取って、そのまま表示するだけの非常にシンプルなノートブックを定義します。
# ウィジェットを定義
dbutils.widgets.text("trigger", "", "Workflow Trigger (e.g., manual, scheduled, <your-app-name>)")
dbutils.widgets.text("target_date", "", "Target Date (format: YYYY-MM-DD)")
# ウィジェットから値を取得
trigger = dbutils.widgets.get("trigger")
target_date = dbutils.widgets.get("target_date")
# 実行パラメータをログ出力
print("Job executed with the following parameters:")
print(f"Trigger: {trigger}")
print(f"Target Date: {target_date}")
作成したノートブックのJobへの紐づけ
作成したノートブックをjobに紐づけましょう。
jobs画面より設定が可能です。
先ほどのノートブックに定義したウィジェットに値が渡せるよう、job parameters設定もお忘れなく。ジョブ詳細画面の右メニューを少し下にスクロールすると表示されます。
APIを叩くのに必要な情報を取得する
TROCCOからAPIを叩くのに必要な以下の情報を取得しておきましょう。
- API endpoint: お使いのdatabricksのURLです。 e.g.
https://<your-workspace>.cloud.databricks.com
- job id: 設定したジョブのジョブIDです。ジョブ詳細画面の右メニューからコピーできます。
- Access Token: User Settings > Developer > Access tokens 画面より取得できます。
TROCCO側の準備
今回使うTROCCO ワークフローの概観です。ワークフローを新規作成し、HTTPタスクを一つだけ追加した状態です。
HTTPタスクを編集しましょう。
主な設定項目は以下です。
- URL: 先ほど取得したdatabricks workspace URLのホスト名の末尾に
api/2.1/jobs/run-now
を加えてください。 - HTTPメソッド:
POST
を選択してください。 - HTTPパラメータ: 設定不要
- HTTPリクエストボディ: 以下のように設定してください。
{
"job_id": <your-job-id>,
"job_parameters":{
"trigger":"TROCCO",
"target_date":"$target_date$"
}
}
- HTTPヘッダ
- Authorization: 先ほどdatabricksのUIから取得したトークンを以下の形式で格納します
Bearer: <your-token>
- Content-Type:
application/json
- Authorization: 先ほどdatabricksのUIから取得したトークンを以下の形式で格納します
実際の設定画面も載せておきます。
ポイントとしては、databricksに渡せるパラメータ、 target_date
を可変にできるようTROCCOのカスタム変数として定義しています。
TROCCOのカスタム変数については以下も参照ください。
今回はカスタム変数は単純に実行時刻をベースに展開する設定にしていますが、カスタム変数ループ実行と組み合わせることによって、パラメータを変えながらdatabricks jobsを繰り返し実行することも可能になりますね。
さっそく実行してみる
早速実行してみましょう。
正常終了し、カスタム変数は 2024-12-13
という値として展開されたことがわかりますね。
さて、databricks jobが正常に起動しているか見に行きましょう。
databricksの Workflows > Jobs > Runsより履歴を参照すると、無事に正常にじっこうされていることが確認できました。
実行詳細画面を見ると、確かにdatabricksのノートブックにAPIを叩いた際のパラメータが渡っています。
これで一通りやりたいことが実現できました。
もう少し現実的なユースケースを考えてみる
今回は簡単に説明するために余計なパーツを入れませんでしたが、これだけでは当然実業務での利用には耐えません。
実際の業務上考えられるユースケースとしては例えば、kintoneなどのSaaSなど複数のデータをTROCCOの転送先databricksを使って転送した後、databricksで後続のワークフローを実行したいケースなどがあると思います。
databricksに転送したいジョブが一つもしくは少ない場合、databricks上で処理したデータを転送元databricksで参照していわゆるリバースETLを実行したい場合は、逆にdatabricksのnotebookから、TROCCO APIを使ってTROCCOの転送ジョブを実行すると良さそうです。
ちなみにその場合において非常に役に立つ記事をやはり @manabianさんが執筆いただいているのでそちらもぜひご参考まで。
さいごに: 今後の展望
王道ではない?、TROCCOとdatabricksの連携ユースケースを紹介してみました。
ただ察しの良い方はお気づきかも知れませんが、今回のユースケースには限界もあります。
TROCCOのワークフローHTTPタスクは2024年12月現在、タスク内で単一のHTTPリクエストしかできず、またひとつのHTTPタスクの返り値を後続のHTTPタスクで使うこともできません。なので残念ながらTROCCOのワークフロー側からはdatabricksのjobが起動されたことはわかってもジョブの実行状況を確認して後続のタスクを実行したりといったことはできません。。
今回のdatabricks jobs apiはjobを起動した際 run_idを返してくれるので、例えばそれを使ってポーリングし、databricks jobsの実行が正常終了してからTROCCOのワークフローを終了したり、後続のタスクを実行できるとより幅広いユースケースに対応できそうですね。
そちらはぜひ今後のTROCCOのアップデートにご期待ください。
Discussion