Google Cloud Workflowsを用いたバッチ処理の構成

2022/12/01に公開

Uniposアドベントカレンダー2022

こんにちは!! 
Uniposアドベントカレンダー20221日目です!
僕は現在UniposでSREチームでSREエンジニアとして働いています。
初日は僕がここ1年弱着手していた、バッチ処理の改善で行ったことを書いていこうと思います!

この記事について

この記事ではGoogle Cloud Workflowsを利用してバッチ処理の制御を行う場合の構成の一例として僕が考えた構成を紹介しています。
そのため、Workflowsの細かな機能や、yamlファイルの書き方には触れません。
Workflowsを触ってみた、という情報は結構あるのですが実際にどういう構成で運用しているのか、という記事は少ないような気がしたのでここでで取り扱ってみることにしました。
Workflowsの構成をどのようにしたらよいか迷っている人の一助になればと思います。

Workflowsについて

以下ドキュメントより

信頼性の高いワークフローの実行
Cloud Functions から非公開 API やサードパーティ API まで、あらゆるサービスを呼び出すことができます。コネクタを使用すると、リクエストのフォーマット、再試行、長時間実行オペレーションの完了の待機が処理され、Google Cloud が非常に使いやすくなります。
強力な実行制御
式と関数を使用してレスポンス データを変換し、リクエスト入力を準備します。入力とサービスのレスポンスに基づいて条件を自動化します。再試行ポリシーとエラー処理を指定します。ポーリングとコールバックを使用して非同期のオペレーションやイベントを待機します。

Workflowsでは、stepという概念に基づいてhttpリクエストの実行や簡単なロジックの実行、Workflows固有のconnectorの実行などを順を追って実行していける機能です。
最近並列実行もGAになり、より使いやすくなりました。

具体的には、

  • アプリの統合とマイクロサービス オーケストレーション
  • バッチ処理のオーケストレーション
  • データと ML パイプライン

など様々な用途で使えます。
今回はバッチ処理のオーケストレーションツールとして利用しました。

ドキュメント

https://cloud.google.com/workflows?hl=ja

バッチ処理の概要

今回改善したバッチ処理概要

今回改善したバッチ処理はざっくり以下のような感じです。

  1. 深夜にバッチ処理がスタート
  2. Cloud Spannerをデータストアとして利用しており、そのデータをそのままBigQueryへexportする(エクスポートデータと呼びます)
  3. エクスポートデータをもとに、集計をしやすいような形に整形したデータをBigQueryへ作成する(集計用データと呼びます)
  4. 作成された集計用データをもとに、Uniposの機能を提供するための集計処理を行います。(Uniposの利用状況などを可視化するための機能があるので、そういったデータの集計を行います)

改善前の集計処理の問題点

  1. 深夜に集計がスタートし1~4の処理を行っていくのですが、それらがすべて時間ベースで実行されていました。2のエクスポートは2時までには終わってるはずだから、2時から3の処理を初めて、3の処理は6時までには終わっているはずだから、6時から4を初めて…といった感じ。
    これの問題点としては、想定した時間までに集計処理が終わらないと後続の処理に影響が出てしまいます。こういう問題を「バッチの突き抜け」といいます。
  2. 集計に失敗した時の復旧用のエンドポイントは存在するが、トークンが必要だったり誰でも簡単に復旧できるわけではなかった。
  3. 集計やデータのエクスポートに冪等性がない場合が多く、復旧するのに影響範囲を詳しく調べる必要があった。
  4. 時間ベースでschedulerが起動して処理を開始していたので、処理の流れの全体像をひと目で把握出来るコードやドキュメントが整備されていなかった。

そうなってしまった背景には集計やバッチ処理がここまで大きくなる想定ではなかったことがあったのですが、流石にまずいということで改善することになりました。

バッチ処理の改善内容

参考文献

バッチ処理の構成を考えるにあたって以下の記事を主に参考にしています。先に目を通しておいてもらえるとわかりやすいと思います。
https://engineering.mercari.com/blog/entry/2019-02-27-112650/

https://engineering.mercari.com/blog/entry/2019-03-20-152608/

バッチ処理の構成を考える上で意識したこと

上の記事でも5つのバッチ処理の考慮点があげられていますが、今回僕がUniposのバッチ処理の改善を行う上で特に意識したことは

  • 回復可能か
  • 多重起動可能か
  • バッチの突き抜け対策

の3つです。処理時間と処理件数に関しては、既存のバッチ処理からある程度想定がついていたので、上記の3に注目して設計を考えました。

回復可能か

今までのUniposのバッチ処理では、冪等性のない処理が多かったり、復旧用のエンドポイントの実行が前提知識が無いと出来なかったりと、一度失敗するともとに戻すのが難しく、"容易に"回復が可能ではありませんでした。
なので、「誰でも、簡単に、間違えること無く回復できる」バッチになることを目指すことにしました。

多重起動可能か

今までのUniposのバッチ処理は、あまり並列実行などはされていなかったので、並列実行も簡単にできる状態を目指しました。また、並列実行が出来るようになると実行時間短縮にも繋がり、突き抜け対策にも繋がります。

バッチの突き抜け対策

これに関しては、最終的な完成物でも対策自体はあまりしていません。しかし、並列化を進めることで、全体の処理時間がかなり短くなったので、直近はそこまで気にする必要がなくなっています。今後はそのモニタリングを強化して、突き抜けに対してすぐにアクションを取れる体制を作っていきたいです(願望)。
目指していたところとしては、元々が時間ベースで動いており、突き抜けを起こすとモノスゴイめんどくさかったので、その状態をまずは脱却することを目指しました。

以上がバッチ改善を行う上で僕が主に意識したことになります。

改善後の構成

最終的に出来上がった設計を先に紹介します。
ここで出てくるノードはすべて一つのワークフローになります。
(実際のものはもう少し複雑ですが、簡略化して書いています)

概要説明

ワークフローを多段にしてワークフローからワークフローを呼び出すようにすることで、並列実行しても問題ない処理と、前提が必要になる処理の順序の制御を行うようにしています。また、こうすることで、再実行をしたいワークフローを指定して、細やかに再実行を出来るようにしています。

ノードの説明

batch_job_conductor

このワークフローが深夜にcloud schedulerから起動されます。
これがバッチ処理の流れを管理する大元のワークフローとなります。バッチ処理の順序を管理やretry制御を行っています。

export_conductor

このワークフローは batch_job_conductor から呼び出され、データベースのデータをBigQueryへexportする処理の流れを管理します。
UniposはデータストアとしてCloud Spannerを採用しており、マイクロサービスごとにデータベースがあるので、マイクロサービスごとのexport処理はこいつが並列に実行していきます。
このワークフローは引数で再実行したい処理を受け付けるので、失敗したワークフローだけを呼び出す、といったことが出来るようにしています。

microservice(1,2)_spanner_export

このワークフローは export_conductor から呼び出され、実際のexport処理を行うワークフローになります。実際のexport処理は、Workflowsのconnector機能とCloud Spannerのpoint-in-time-recovery機能を利用して、BigQeuryへexportしています。そのおかげで、export処理すべてに冪等性をもたせられるようになりました。

create_aggregate_data

このワークフローは batch_job_conductor から呼び出され、集計用のBigQueryのデータを作成するワークフローです。このワークフローではconnectorを利用し、BigQueryに直接クエリを実行してデータを作成しています。
exportされたデータに冪等性があるため、集計用データの作成も冪等性がある形になりました。

aggregation_conductor

このワークフローは batch_job_conductor から呼び出され、集計処理の流れを管理します。
今の所、Uniposの集計処理は現時点では並列実行出来るものしかないのでこのワークフローが集計処理を行うワークフローを並列に呼び出します。
このワークフローは引数で再実行したい処理を受け付けるので、失敗したワークフローだけを呼び出す、といったことが出来るようにしています。

aggregation1,2

このワークフローは aggregation_conductor から呼出され、実際に集計処理を行うエンドポイントにhttpリクエストを送ります。集計自体はもともと冪等性があるように作られていました。

補足

お気づきの方もいるかも知れませんが、conductorという名前がついているワークフローは実際にはワークフローを実行しているだけで、実際の処理は何もやっていません。呼び出すワークフローの順序や並列実行を管理しています。そういう意味で「指揮者」という意味の conductor という命名にしています。

最初の問題点は解消されたのか

最初にあげた問題点と順番がリンクしています

  1. 実行順をワークフローで制御して、前段の処理が終われば次の処理に行くようになったので、時間ベースでの「終わっているだろう」実行は解消されました。
  2. 最終的に、すべての集計処理で冪等性をもたせられるたので、大元の daily_batch_conductor を再実行するだけで、常に正しく復旧が出来るように誰でも簡単に出来るようになりました。
  3. それぞれの処理で、WorkflowsやCloud Spannerの機能を使い冪等性があるように作成したので、再実行する人は結果がどうなるかを気にせず、単純にボタンポチで再実行するだけでよくなりました。
  4. ワークフローの定義のyaml自体が実行順の記述になっているので、ワークフローの実装を見れば深夜に実行されているバッチ処理を把握できるようになりました。

まとめ

最初にあげていた問題点はかなり克服できるようになったと思います。
個人的に、もともと復旧がとにかく大変だったので、誰でも簡単にストレス無く復旧できる形を作りたいと思っていたのですが、かなり実現できたように思います。
最終的に失敗したときは、 daily_batch_conductor に実行時間のunix_timeを渡して、画面からボタンポチするだけで良い。という所まで作れたので、僕の目指す状態を作れたかなと思います。
更に実行時間も、もともとは5時間近くかかっていたのが並列処理と、処理が終わったらすぐに次の処理が始まるようになったことで、50分程度まで短縮出来ました。
デメリットとしては、ワークフローを多段にしたことで処理フローの制御はしやすくなったのですが、ネットワーク通信等が増えたため障害を起こす点が増えてしまったというのがデメリットだったなと思います。
Workflowsの安定性の問題もあるのですが、以前の形よりも失敗すること自体は増えてしまいました。(ただ、復旧は簡単なので以前よりは心理的負担は下がったと思います。)

もう一日書く予定なので、そっちではこの改善を行って得た学びとバッチ処理の運用の浸透について書こうかなと思います!

Discussion