Cloud Workflows 内でコールしたCloud Functions, Cloud Runのログを紐付けて調査効率を高める
モチベーション
Cloud Workflows内でコールしたCloud FunctionsやCloud RunなどのAPIログは各々独立しているため、各ログがどのWorkflowの実行によるものかLogEntryの時間情報くらいしか紐付ける情報がないため、API横断で流れを追う調査をする際に苦労することがありました。
そこで、API側で構造化ログ機能を用いてLogEntryにWorkflowの実行IDをを仕込むことで、横断でログを閲覧できるようにして調査効率を高める工夫をした際の備忘録です。
検証アプリ
Cloud Function、Cloud Run を順に1回コールするだけのシンプルなWorkflowを作って検証しました。各APIはログを出力するだけのPython API(Flask)です。
(構造化ログに対応していればPythonじゃなくても応用できると思います。)
全ソースコード
検証アプリ Cloud Workflows
workflowsのソースコードです。
workflows/v1.yml
main:
params: [input]
steps:
- set_trace_id:
assign:
- execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- api_url_function: "https://asia-northeast1-xxx.cloudfunctions.net/logging-example-v1" # TODO set your environment
- api_url_cloudrun: "https://logging-example-v1-xxx-an.a.run.app" # TODO set your environment
- call_function:
call: http.post
args:
url: ${api_url_function}
body:
workflow_execution_id: ${execution_id}
auth:
type: OIDC
result: result_function
- call_cloudrun:
call: http.post
args:
url: ${api_url_cloudrun}
body:
workflow_execution_id: ${execution_id}
auth:
type: OIDC
result: result_cloudrun
- set_result:
assign:
- result_map:
message: ${execution_id}
function_execution_id: ${result_function.headers["Function-Execution-Id"]}
function_trace_id: ${text.split(result_function.headers["X-Cloud-Trace-Context"], ";o=")[0]}
cloudrun_trace_id: ${text.split(result_cloudrun.headers["X-Cloud-Trace-Context"], ";o=")[0]}
- log_result:
call: sys.log
args:
json: ${result_map}
- return_output:
return: ${result_map}
ポイントは、${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
で実行IDを取得して、各APIのリクエストパラメーターに仕込んでいる点です。
また、調査用途として使えそうな情報として、各APIのレスポンスヘッダー内にTraceID X-Cloud-Trace-Context
が含まれていたので返却値として取得しています。functionsではfunctions側の実行ID Function-Execution-Id
が含まれていました。
検証アプリ Cloud Functions
functionsのソースコードです。
functions/v1/main.py
import json
from flask import jsonify, request
GCP_PROJECT = os.environ.get('GCP_PROJECT')
def main(request):
log('Cloud Functions msg1')
log('Cloud Functions msg1', 'WARNING')
return jsonify(dict(result='ok'))
def log(message, severity='INFO'):
log_fields = {}
# refs: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
if request and request.headers.get("X-Cloud-Trace-Context") and GCP_PROJECT:
trace = request.headers["X-Cloud-Trace-Context"].split("/")
log_fields["logging.googleapis.com/trace"] = f"projects/{GCP_PROJECT}/traces/{trace[0]}"
request_json = request.get_json()
if request_json and request_json.get("workflow_execution_id"):
log_fields["logging.googleapis.com/labels"] = {'workflows.googleapis.com/execution_id': request_json["workflow_execution_id"]}
payload = dict(message=message, severity=severity, **log_fields)
print(json.dumps(payload))
ポイントは、構造化ログ機能を用いて、logging.googleapis.com/labels
に workflows.googleapis.com/execution_id
に 引き渡された workflowsの実行ID の値をセットしています。(TraceIDの設定についてはドキュメント通りです。)
なお、実行IDを仕込むフィールドは、構造化ログの対応特殊フィールドを眺めて決めましたが、より適切な項目があれば教えてほしいです。
labelsのキーに指定した workflows.googleapis.com/execution_id
は Workflows実行時と終了時に自動で吐かれる Execution Logsで用いていたものを指定しています。
検証アプリ Cloud Run
Cloud Runのソースコードです。
cloudruns/v1/main.py
import os
import json
from flask import Flask, jsonify, request
GCP_PROJECT = os.environ.get('GCP_PROJECT')
app = Flask(__name__)
@app.route("/", methods=["POST"])
def main():
log('Cloud Run msg1')
log('Cloud Run msg1', 'WARNING')
return jsonify(dict(result='ok'))
def log(message, severity='INFO'):
log_fields = {}
# refs: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
if request and request.headers.get("X-Cloud-Trace-Context") and GCP_PROJECT:
trace = request.headers["X-Cloud-Trace-Context"].split("/")
log_fields["logging.googleapis.com/trace"] = f"projects/{GCP_PROJECT}/traces/{trace[0]}"
request_json = request.get_json()
if request_json and request_json.get("workflow_execution_id"):
log_fields["logging.googleapis.com/labels"] = {'workflows.googleapis.com/execution_id': request_json["workflow_execution_id"]}
payload = dict(message=message, severity=severity, **log_fields)
print(json.dumps(payload))
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
ポイントは、functionsと同様、構造化ログ機能でworkflowsの実行IDをセットしている点です。
結果
Workflowを実行後、以下の様なログ用の検索クエリでWorkflowの実行IDを指定して横断で検索できるようになりました。
labels."workflows.googleapis.com/execution_id"="<Workflow 実行ID>" OR
labels.execution_id="<Workflow 実行ID>"
少しログ量を増やしたアプリv2(※GitHubソースコード参照)だとこんな感じです。
Cloud Workflows, Cloud Functions, Cloud Run のログが一括で確認できます。
おまけ
クエリ条件をよりシンプルに1行 labels."workflows.googleapis.com/execution_id"="<Workflow 実行ID>"
のみにしたかったので、Cloud Workflows側の sys.log コール時のLogEntryにも同様のlabelsをセットしようと試みましたが、jsonPayload 内に展開されてしまい、条件をシンプルにすることは叶いませんでした。まだ構造化ログには対応していないのかな?
ログ送信時に構造化ログでlabelsを仕込んでみたサブワークフロー
※ ソースコード全体は GitHub v2アプリ参照
log:
params: [message]
steps:
- set_log_data:
assign:
- execution_id_map: {}
- execution_id_map["workflows.googleapis.com/execution_id"]: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- data: {}
- data["message"]: ${message}
- data["logging.googleapis.com/labels"]: ${execution_id_map}
- call_sys_log:
call: sys.log
args:
json: ${data}
LogEntry
以下ポストのように直接 Cloud Logging APIをコールすればできそうですが、未検証です。
参考
- LogEntry
- Cloud Trace でのグルーピング仕様の参考に
-
X-Cloud-Trace-Context
仕様
- 構造化ログ機能(Python) について
Discussion