Google Cloud Application Integrationで実行前後にコネクタを一時停止/再開する
はじめに
Google Cloud Advent Calendar 2023 (通常版) の 12/10 の記事です。
皆さん、Application Integration 使っていますか!?
Application Integration は Google Cloud に2023年7月にGAされた新しいサービスで、GUI上でローコードでシステム連携やデータ処理を行える、俗にいうiPaaS (Integration Platform as a Serivce) カテゴリに属するものです。
現在コネクタはGoogle Cloudの各種サービスからSaaS、汎用的なプロトコル(HTTP,FTP,SMTPなど)70以上提供されていて、複数のシステム間でデータ変換を伴う連携を実現するのに役立ちます。Google Cloudのサービスらしく、ローコードツールながら低頻度な実行からハイトランザクションな領域までスケールアウトできる良いサービスだと思うので、まだ触ったことがない方はぜひ試してみてください。
この投稿では、Application Integration をまだ使いはじめたばかりであったり、1日に数回程度の低頻度でまずは利用してみたい!!という際にコストを抑えるTipsをご紹介します。
より気軽に Application Integration を使ってみたい
Application Integration自体の実行は 1実行 (最大100タスク) あたり$0.0005 (2023/12/10現在)と、Cloud Functions のようなServerlessサービスと同様に実行された時だけ費用が発生し、利用してない期間には発生しません。
しかしインテグレーション実行内で各種システムやサービスと接続を行う Integration Connectors (以下、コネクタ) は処理を実行していなくてもノードが稼働しているだけで一定の費用が発生します。
コネクタを利用することで認証の管理や大きめのデータをクエリした際のページング処理、接続先システムのメタデータやスキーマ情報の自動取得など、様々な機能が使えて便利なのですが、例えば1日1回短時間だけ実行するような軽度なインテグレーションユースケースの場合だと、何もしてない大半の時間にもそこそこ費用がかかるという課題があります。 (接続先が BigQuery など Google Cloud のサービスの場合はプロジェクトで2ノードまで無料)
そこで、コネクタ接続にある一時停止/再開する機能を利用して
- pre-process 1. インテグレーション処理実行前にコネクタ接続へ再開指示する
- pre-process 2. コネクタ接続のステータスを確認し、有効になるのを待つ
- process 対象となるインテグレーションを実行する
- post-process 1. インテグレーションの実行完了後にコネクタ接続の一時停止を指示する
- post-process 2. コネクタ接続のステータスを確認し、停止するのを確認する
の実現方法をご紹介します。
コネクタ接続の一時停止/再開方法
Integration Connectorsの一時停止/再開の方法自体は特に難しいことはありません。
まずGoogle CloudのコンソールからGUIでコネクタ接続ごとに一時停止/再開を制御できます。
またGUIで設定する以外にも、この一時停止/再開を行うためのAPIも用意されています。
今回はこちらを活用します。
コネクタ接続の一時停止/再開のAPI
一時停止/再開はコネクタ接続のリソースに対してPATCHリクエストを送ることで行えます。suspendedをtrueとした場合は一時停止、falseにした場合は再開となります。
# 例: my-prjプロジェクト、Tokyoリージョンのmysfdcconnectionという名称の接続を一時停止
$ curl --location --request PATCH 'https://connectors.googleapis.com/v1/projects/my-prj/locations/asia-northeast1/connections/mysfdcconnection?updateMask=suspended' \
--header 'Content-Type: application/json' \
--header 'Authorization: <<Token>>' \
--data '{"suspended": true}'
コネクタ接続のステータスを確認するAPI
コネクタ接続の一時停止/再開のAPI自体はすぐにレスポンスを返すのですが、APIの呼び出しが完了したからといって実際にコネクタ接続の一時停止/再開が完了しているわけではありません。
コネクタ接続のステータスを確認する場合はリソースに対してgetリクエストを行うことで、レスポンス内にステータスが含まれて返却されるので、そちらを確認する必要があります。
# 例: my-prjプロジェクト、Tokyoリージョンのmysfdcconnectionとい名称の接続のステータスを確認
$ curl --location --request GET 'https://connectors.googleapis.com/v1/projects/my-prj/locations/asia-northeast1/connections/mysfdcconnection' \
--header 'Content-Type: application/json' \
--header 'Authorization: <<Token>>' \
--data '{"suspended": true}'
Application Integrationからコネクタ接続のステータスを操作する
APIがあるならShellなりGolangなりから叩けばいいのでは?と思うかもしれませんが、せっかくローコードツールを使っているので、この処理自体もマネージドに構築したいですよね。
幸いApplication Integrationにはコネクタではなく組み込み機能(コネクタの費用がかからない)で Call REST Endpoint タスクが提供されていて、シンプルなものであれば任意の公開(グローバルIPを持つ) HTTPエンドポイントを呼び出すことが可能です。
上記コネクタ接続の操作を行うAPIをこのタスクから呼び出すことで、Application Integration自身でコネクタを一時停止/再開するしたい場合、具体的には以下のような感じでフローを組むと実現できます。
要所を見ていきましょう。
メインの実行
メインのAPIトリガーの役割はシンプルです。どのプロジェクトにあるどのコネクタ接続を一時停止/再開するかを受け取り、対象コネクタ接続分だけ For Each Parallel タスクで並列ループ処理を行います。万が一コネクタ接続自体がエラーを返してくるケース (Integration実行のエラーではなく) に備え、合致した場合はReturn Taskでステータスコード 500 をクライアントに返却するなどもできたりします。
実行時の引数となる変数が数多く定義されていますが、Application Integration では For Each や While ループは別の API トリガーを呼び出すことが前提となっています。
また同一インテグレーション内であっても呼び出し元の中にある変数を呼び出し先から参照できない(別の実行扱い)ので 変数の内容などはループ処理系タスクの設定で受け渡します。
この辺りは Application Integration を触るにあたって最初のつまづきポイントなので知っておくと良いでしょう。
コネクタ接続の一時停止・再開処理
コネクタ接続の一時停止・再開処理の責務はApplication IntegrationのAPIを叩いて実際にコネクタ接続のステータスを更新 (依頼)をすることと、コネクタ接続のステータスチェックをループで呼び出し完了まで待つことです。
コネクタの接続URLや条件分岐の部分には $valName$ の形式で変数を使用できるので、実行元からもらったプロジェクト名やコネクタ接続名の変数をもとにURLを組み立ててPATCHリクエストを行います。
Call REST Endpoint タスクでは認証プロファイル (Google OIDCやサービスアカウントの利用、OAuth2 AccessToken取得などを裏で行ってくれる機能) を選択して呼び出し時に認証情報を渡すことができるのでこれを利用すれば良いでしょう。
suspendedがtrueかfalseかによってどちらの期待するコネクタ接続のステータスが異なるので、フォーク/ジョインとエッジ/エッジ条件で分岐を行います。
ここで利用しているフォークやエッジは並列処理での利用するため、条件分岐の場合はジョイン時の条件を「When any task succeeds」などにしておかないと条件が成立しなかった側のタスクが成功せずに後続タスクが実行されない点は注意が必要です。
コネクタ接続の一時停止・再開処理
コネクタ接続のステータスチェックも例によってCall REST Endpoint Taskを使って行います。
ここでレスポンスはJSONで帰ってくるのですが、肝心のステータスまでは多少の階層があるのでData Mapper Taskを使ってステータス文字列を抽出します。
ステータスが完了 (一時停止ならSUSPENDED,再開ならACTIVE) と一致するものだった場合は呼び出し元のループを抜けるので問題ないですが、まだ処理中だった場合はそのまま終了するとすぐ次のループ処理が開始され無駄に大量のAPIコールが発生するため、5秒待つ処理を入れています。
これにより、5秒ごとにループ実行され、ステータスが完了したらLoopを抜け処理が完了します。
あとは上記を実際のインテグレーション処理の前と後に配備しておけば、実際のビジネス的なデータ処理する前後でコネクタの一時停止/再開が可能です。
コードサンプル
GitLab Snippetsに置いておいたので、このJSONをImportしてサービスアカウントとか認証周りとIntegrationの入れ子呼び出し(Foreach ParallelタスクとかWhileタスクとか)の名称を調整すればすぐ動かすことができると思います。
難易度的には Application Integraiton の実装方法を覚えるのにちょうど良い要素が色々あるので、お時間ある方はぜひご自身で実装してみて下さい。
注意点など
コネクタ接続の一時停止/再開は通常だいたい1分程度で完了するので問題なく実行できますが、状況によっては時間がかかることもあります。
Appplication Integration は同期実行の場合最大2分でタイムアウトするという制限があるので、ステータス変更の完了までより確実に補足したい場合、非同期実行を前提にすれば最大50分まで処理可能です。
その際は完了したことを何かしらの通知で受け取る様に処理を変更する必要がある点は注意してください。Cloud Pub/Subを使うのが一番手早く実現できそうです。
Cloud Pub/Sub トリガーは構成すると自動的に該当のTopicに対するSubScrpitionを作成するのですが、このSubscriptionはPush型になっているため、現状だと1回限りの配信(Exactly-once delivery)は使用できません。
Cloud Pub/Subの停止を受け取り後の実行が必ず1回となるようにフラグなどを持つ様にするか、仮に複数回処理が行われても問題がない様に作っておくと良いです。
Discussion