Google Cloud Application Integrationで実行前後にコネクタを一時停止/再開する
はじめに
Google Cloud Advent Calendar 2023 (通常版) の 12/10 の記事です。
皆さん、Application Integration 使っていますか!?
Application Integration は Google Cloud に2023年7月にGAされた新しいサービスで、GUI上でローコードでシステム連携やデータ処理を行える、俗にいうiPaaS (Integration Platform as a Serivce) カテゴリに属するものです。
現在コネクタはGoogle Cloudの各種サービスからSaaS、汎用的なプロトコル(HTTP,FTP,SMTPなど)100以上提供されていて、複数のシステム間でデータ変換を伴う連携を実現するのに役立ちます。Google Cloudのサービスらしく、ローコードツールながら低頻度な実行からハイトランザクションな領域までスケールアウトできる良いサービスだと思うので、まだ触ったことがない方はぜひ試してみてください。
この投稿では、Application Integration をまだ使いはじめたばかりであったり、1日に数回程度の低頻度でまずは利用してみたい!!という際にコストを抑えるTipsをご紹介します。
より気軽に Application Integration を使ってみたい
Application Integration自体の実行は 1実行 (最大100タスク) あたり$0.0005 (2025/01/22現在)と、Cloud Functions のようなServerlessサービスと同様に実行された分だけ費用が発生し、実行しなければ費用は発生しません。
しかしインテグレーション実行内で各種システムやサービスと接続を行う Integration Connectors (以下、コネクタ) は処理を実行していなくてもノードが稼働しているだけで一定の費用が発生します。
コネクタを利用することで認証の管理やデータをクエリした際のページング処理、接続先システムのメタデータやスキーマ情報の自動取得など、様々な機能が使えて便利なのですが、例えば1日1回短時間だけ実行するような軽度なインテグレーションユースケースの場合だと、何もしてない大半の時間にもそこそこ費用がかかるという課題があります。 (接続先が BigQuery など Google Cloud のサービスの場合はGoogle Cloud組織ごとに2ノードまで無料)
そこで、コネクタ接続にある一時停止/再開する機能を利用して
- pre-process 1. インテグレーション処理実行前にコネクタ接続へ再開指示する
- pre-process 2. コネクタ接続のステータスを確認し、有効になるのを待つ
- process 対象となるインテグレーションを実行する
- post-process 1. インテグレーションの実行完了後にコネクタ接続の一時停止を指示する
- post-process 2. コネクタ接続のステータスを確認し、停止するのを確認する
の実現方法をご紹介します。
コネクタ接続の一時停止/再開方法
Integration Connectorsの一時停止/再開の方法自体は特に難しいことはありません。
まずGoogle CloudのコンソールからGUIでコネクタ接続ごとに一時停止/再開を制御できます。
またGUIで設定する以外にも一時停止/再開を行うためのAPIが用意されています。
今回はこちらを利用します。
コネクタ接続の一時停止/再開のAPI
一時停止/再開はコネクタ接続のリソースに対してPATCHリクエストを送ることで行えます。suspendedをtrueとした場合は一時停止、falseにした場合は再開となります。
# 例: my-prjプロジェクト、Tokyoリージョンのmysfdcconnectionという名称の接続を一時停止
$ curl --location --request PATCH 'https://connectors.googleapis.com/v1/projects/my-prj/locations/asia-northeast1/connections/mysfdcconnection?updateMask=suspended' \
--header 'Content-Type: application/json' \
--header 'Authorization: <<Token>>' \
--data '{"suspended": true}'
コネクタ接続のステータスを確認するAPI
上記コネクタ接続の一時停止/再開のAPIはすぐにレスポンスを返すのですが、APIの呼び出しが完了したからといって実際にコネクタ接続の一時停止/再開が完了しているわけではありません。
コネクタ接続のステータスを確認する場合はリソースに対してgetリクエストを行うことで、レスポンス内にステータスが含まれて返却されるので、そちらを確認する必要があります。
# 例: my-prjプロジェクト、Tokyoリージョンのmysfdcconnectionとい名称の接続のステータスを確認
$ curl --location --request GET 'https://connectors.googleapis.com/v1/projects/my-prj/locations/asia-northeast1/connections/mysfdcconnection' \
--header 'Content-Type: application/json' \
--header 'Authorization: <<Token>>'
Application Integrationからコネクタ接続のステータスを操作する
APIがあるならShellなり任意のプログラムなりから叩けばいいのでは?と思うかもしれませんが、せっかくローコードツールを使っているので、この処理自体もマネージドに構築したいですよね。
幸いApplication Integrationにはコネクタではなく組み込み機能(コネクタの費用がかからない)で Call REST Endpoint タスクが提供されていて、シンプルなものであれば任意の公開(グローバルIPを持つ) HTTPエンドポイントを呼び出すことが可能です。
上記コネクタ接続の操作を行うAPIをCall REST Endpoint タスクから呼び出すことで、Application Integration自身でコネクタを一時停止/再開することが可能です。
具体的には以下のような感じでフローを組むと実現できます。
要所を見ていきましょう。
メインの実行
メインのAPIトリガーの役割はシンプルです。どのプロジェクトのどのコネクタ接続を一時停止/再開するかを受け取り、対象コネクタ接続分だけ For Each Parallel タスクで並列ループ処理を行います。コネクタ接続のAPIがエラーステータスを返してくるケース (Integration実行のエラーではなく) をハンドリングして任意の後続処理(担当者にメール通知など)を行うといったことも可能です。
フローの中で数多くの変数が定義されていますが、Application Integration では For Each や While ループは別の API トリガー(もしくはPrivateトリガー) を呼び出すことが前提となっています。
API トリガーのループ内呼び出しは同一インテグレーション内であっても呼び出し元の変数値を呼び出し先は参照できない(別の実行扱い)ので 変数の値をループ処理系タスクの設定で受け渡します。
この辺りは Application Integration を触るにあたって最初のつまづきポイントなので知っておくと良いでしょう。
コネクタ接続の一時停止・再開処理
コネクタ接続の一時停止・再開処理の責務はApplication IntegrationのAPIを叩いて実際にコネクタ接続のステータスを更新 (依頼)をすることと、コネクタ接続のステータスチェックをループで呼び出し完了まで待つことです。
コネクタの接続URLや条件分岐の部分には $valName$ の形式で変数を使用できるので、実行元からもらったプロジェクト名やコネクタ接続名の変数をもとにURLを組み立ててPATCHリクエストを行います。
Call REST Endpoint タスクでは認証プロファイル (Google OIDCやサービスアカウントの利用、OAuth2 AccessToken取得などを裏で行ってくれる機能)や、Integration自体に設定するサービスアカウントを使って呼び出し時に認証情報を渡すことができるのでこれを利用すれば良いでしょう。
$toSuspend$ がtrueかfalseかによって期待するコネクタ接続のステータスが異なるので、フォーク/ジョインで分岐を行います。
このフォーク/ジョインは並列処理時にも利用する想定のため、条件分岐の場合はジョイン後のタスクの実行条件を「When any task succeeds」などにしておかないと、条件が成立せず後続タスクが実行されない点は注意が必要です。
コネクタ接続のステータス確認
コネクタ接続のステータスチェックも例によってCall REST Endpoint タスクを使って行います。
レスポンスはJSONで帰ってくるのですが、肝心のステータスまでは多少の階層があるのでData Mapper Taskを使ってステータスを抽出します。
ステータスが完了 (一時停止ならSUSPENDED,再開ならACTIVE) と一致するものだった場合は呼び出し元のループを抜けるので問題ないですが、まだ処理中だった場合はそのまま終了するとすぐ次のループ処理が開始され無駄に大量のAPIコールが発生するため、5秒待つ処理を入れています。
これにより、5秒ごとにループ実行され、ステータスが完了したらループを抜け処理が完了します。
あとはこのフローを実際の業務処理を行うインテグレーションフローの前と後で呼び出せば、業務処理の前後でコネクタの一時停止/再開が可能です。
コードサンプル
GitLab Snippetsに置いておいたので、このJSONをImportしてサービスアカウントとか認証周りを調整すればすぐ動かすことができると思います。
難易度的には Application Integraiton の実装方法を覚えるのにちょうど良い要素が色々あるので、お時間ある方はぜひご自身で実装してみて下さい。
注意点など
コネクタ接続の一時停止/再開は通常だいたい1分程度で完了するのですが、状況によっては時間がかかることもあります。
Appplication Integration はデフォルトの同期実行の場合だと最大2分でタイムアウトするという制限があるので、コネクタ接続のステータス変更完了を確実に捕捉したい場合は最大10分まで処理可能な非同期実行を検討しても良いかもしれません。
ただしその際はステータス変更が完了したことを何かしらの通知で受け取る様に処理を追加する必要がある点は注意してください。Cloud Pub/Subなどを使うのが一番手早く実現できそうです。
またフローの実行は今回APIトリガーとなっていますが、実行をキックする側もCloud Pub/Sub トリガーを使って行うことも可能です。
注意点としてCloud Pub/Sub トリガー構成すると自動的に該当のTopicに対するSubScrpitionを作成するのですが、このSubscriptionはPush型になっているため現状だと1回限りの配信(Exactly-once delivery)が使用できません。
Cloud Pub/Subからのメッセージ受け取り後の実行が必ず1回となるようにフラグなどを持つ様にするか、仮に複数回処理が行われても問題がない様に作っておくと良いです。
Discussion