🎼

Cloud Composer の Day 2 Operation ベタープラクティス

2023/12/04に公開

この記事は Google Cloud Advent Calendar 2023 (通常版) の 12/4 の記事です。

Google Cloud Japan の RyuSA です。👋

Apache Airflow のフルマネージドサービスである Cloud Composer を使っていますか?構築が複雑な Apache Airflow をパッと利用できるフルマネージドサービスは便利ですよね。私は自宅の Kubernetes クラスター上に Apache Airflow を構築し、その複雑さに構築して少し動かして満足してしまった過去があるので、一時期 Cloud Composer はプライベートでも少し使っていました。

Cloud Composer は構築は簡単ですが、運用の目標を整理したり、簡単なアーキテクチャを理解しておくことで大規模な環境での思わぬ影響を防いだり、実運用を安定的に行うことが出来るようになります。特に大規模になればなるほど、その復旧やインパクトは大きくなります。

このドキュメントでは、Cloud Composer を構築した後の長い長い"運用するフェーズ"に注力していくつかのベタープラクティスを共有していきます。スムーズな実運用にむけての勘所としての参考になりましたら幸いです。

ライフサイクル

Cloud Composer 2 を利用する

Cloud Composer には2つのメジャーバージョンが存在し、2023年11月現在 Google Cloud のユーザーはどちらを利用するべきか選択できます。

  • Cloud Composer 1
  • Cloud Composer 2

それぞれの違いは公式ドキュメントに記載されています。Cloud Composer のバージョニングの概要

しかし、多くのユースケースにおいて2023年11月現在の時点では Composer 2 を使うことがおすすめです。すでに Composer 1 は機能追加がないメンテナンスモードへ移行されており、Apache Airflow のバージョン追従も終了することがアナウンスされています。Cloud Composer のバージョニングの概要

そのため、将来的なワークロードのためには Composer 2 を選び、Composer 1 は既存の Composer 1 環境を Composer 2 への移行するための検証として利用するだけに留めておく方が良いでしょう。

Composerの GKE の設計を理解しておく

Composer はユーザーの環境に GKE Autopilot を作成し、そのクラスター内に Apache Airflow の各種コンポーネントをデプロイします。Composer そのものの多くはユーザーのプロジェクトの GKE クラスターの上で動くことで、ユーザーリソースとの通信をやりやすくしている設計になっています。

長期的な運用をするにあたり、Composer ユーザーは Airflow の一部コンポーネントの運用と共に GKE Autopilot の運用もする必要があります。そのため GKE Autopilot が Composer の中でどのように使われているのかは意識しておき、特にデプロイされるネットワークの設計を整える必要があります。

IP アドレスレンジ
GKE Autopilot を構築する際に Pod の CIDR レンジを設定できるように、Composer でも Pod のための IP アドレスのサイズを設定できます。理論上、Composer を起動するための最小の Pod CIDR レンジは公式ドキュメント Configure Shared VPC networking には /23 と記載されています。しかしこの CIDR レンジで環境を作成してしまうと「スケジューラー1台とワーカー3台」のみの環境からスケールできず、また環境のアップグレードが不安定になる場合があります。実際に運用する際には最低限 /21 よりも大きなレンジを確保し、どれくらいスケールするかに合わせて IP アドレス設計をしましょう。

Private IP Environment
Composer の環境を Private にするかどうかを環境作成前に決定しておく必要があります。Private な環境を作成すると、GKE は限定公開モードとして作成されるため、デフォルトではインターネットへの接続できなくなります。そのため必要に応じて Cloud NAT などのネットワーク設計を先に進めておく必要があります。
後述の Composer の高可用性モードを利用するためにも Private IP Environment が必要になるケースもあるため、必要に応じて確認しておくことを推奨します。

GKE へのアクセス方法の確保
Composer の各コンポーネントは GKE クラスター上にデプロイされているため、トラブルシューティングやairflowコマンドを実行するに GKE のエンドポイントへアクセスする必要があります。そのため、特に Private IP Environment な環境を利用される場合、GKE クラスターのエンドポイントへのアクセス方法を確保しておきましょう。

環境を定期的にアップグレードする

Composer はバージョンリリース後の12ヶ月間はサポートを、18ヶ月間はセキュリティに関する通知を受けられます。しかしながら古い環境を塩漬けして使っているとあっという間にサポート期限を過ぎてしまい、障害のサポートが受けられなくなったりやセキュリティ通知の打ち切りなどが発生してします。

Composer の環境を定期的にアップグレードするか、新しいバージョンの新しい環境を作成して移行するなど対応することを推奨します。

参考:Upgrade environments

メンテナンス中に DAG を起動しない

Composer は週に合計12時間のメンテナンス時間を必要としています。メンテナンスの時間枠を指定する
この時間の間で、GKE Autopilot クラスターの自動アップグレードや Airflow コンポーネントのバージョンアップ、再起動など様々なアクションが行われます。つまりこの時間の間は Composer の環境は不安定になり、DAG が正常に実行できない、または正常に終了しない可能性が高いです。メンテナンス時間は Composer の環境を作成するときに設定できますので、 DAG を起動しなくても問題ない時間帯を選んで設定してください。

しかし「どうしてもメンテナンス中に動かしたい DAG がある」場合については次のアクションを取って注意深く実装して利用してみることが一案です。

  • リトライの設定を入れてメンテナンス中に再起動されても再実行されるようにする
  • いつ中断されていつ再起動されても冪等性を担保できるようにDAGを実装する
    • これは「DAG から起動する 外部のリソースも含めて冪等性にする」という意味です
  • 可能な限りメンテナンス時間外に再実行しても問題ない設計にする

監視 / 可観測性

最低限チェックしておくべきメトリクス

Composer は Apache Airflow の StatsD による各メトリクスを Pod airflow-monitoring で受信し、airflow-monitoring はそのメトリクスを変換して Cloud Monitoring へ転送します。そのため Apache Airflow の様々なメトリクスを Google Cloud の中で利用できるようになっています。

Apache Airflow のメトリクスを含め、これらのメトリクスは何らかの形でチェックしておくことを推奨します。

composer.googleapis.com/environment/healthy

これはヘルスチェックとして定期的に稼働している DAG airflow-monitoring の成否のメトリクスです。いわば、簡単な外形監視そのものです。この値が False の間は「環境内で問題が起きて、DAG が起動できない可能性がある状態」であることを示します。

注意するべきこととして、この値が「 True である = すべてがうまくいっている」とは限らないことです。このメトリクスは Composer がデフォルトで起動しているDAG airflow_monitoring の成否"のみ"を参考にしています。たまたま障害が起きていないワーカーで DAG が実行されただけだったり他の Trigger は失敗していたりなど様々な状況が同時に起こりえます。

このメトリクスを含め、他のメトリクスと合わせて必ず監視しておきましょう。

composer.googleapis.com/environment/task_queue_length

実行待ちのタスクのキューサイズです。この値が高いままの場合はワーカーで何かしら問題が起きているか、ワーカーの数が足りない可能性が高いです。その他メトリクスを確認して、スケール設定を見直すことを推奨します。

スケール設定を見直す際、基本的な次の考え方に基づいてチューニングすると良いと思います。

  • ワーカーの最大スケール数を増やす: これは多くのタスクが重たく、終了までに時間がかかる場合に役に立ちます。小さくすぐ終わるタスクが多い場合、最大のワーカー数を増やしてもリソースを持て余して効果が薄い可能性があります。
  • ワーカーの同時タスク実行数を増やす: これは多くのタスクが小さく、すぐ終了する場合に役に立ちます。しかしタスクの処理が多くのリソースを要求し時間がかかる場合、ワーカーの OOM の原因になったり処理がなかなか捌けずと効果が薄い可能性があります。

スケーリングの詳細を確認したい場合はユーザー自身のDAGがどの頻度で起動されるのか、タスクがどのように実装されているのか、現在のワーカーの CPU /メモリーの使用量など様々なメトリクスを含めてチューニングしてみてください。

参考 : Worker concurrency performance considerations

composer.googleapis.com/environment/dag_processing/total_parse_time

airflow-scheduler は定期的にすべての DAG をパースし、DAG にミスがないか/アップデートがないかなどを確認します。このメトリクスはそのパースにかかった時間を表しています。

airflow-scheduler はデフォルトで300秒のパースタイムアウト値(dag_dir_list_interval)が設定されています。パースタイムアウトが頻発すると DAG の更新ができなくなります。この指標が高いのを放置してしまうと、たとえばバグでDAGが停止した際など速やかに修正しても DAG がパースされず、バックオフ実行がなかなかできないといった状態になりかねません。

この値が大きくなってきた際には注意が必要で、いずれかのアクションを取ることを推奨します。

  • 不要な DAG を削除してパース対象を減らす
  • airflow-scheduler をスケールアップしてパース能力を底上げ
  • dag_dir_list_interval の値を大きくしてDAGパース時間を伸ばす

Cloud Logging で GKE のログを確認する

Apache Airflow の各コンポーネントに関するログは Cloud Logging の resource.type="cloud_composer_environment" のログフィルターで確認できます。

このログフィルターには DAG の実行ログや DAG のワーカーへのアサイン情報などが含まれており、一般的な Apache Airflow に関するトラブルシューティングに活用できます。

一方で、その Apache Airflow がデプロイされている GKE クラスターのログは上記のログフィルターには含まれていません。例えば「ワーカーが OOM で停止してしまった」「ワーカーの数が増えた/減った」などインフラストラクチャーに関する情報は含まれていません。これらの情報は GKE のログから確認できます。

OOMに関するログとワーカーのスケールに関するログフィルターを紹介しておきます。またその他詳細な GKE に関するログは公式ドキュメント View GKE logs | Google Kubernetes Engine を参照してみてください。

オートスケールのログ

GKE の composer-system 名前空間にデプロイされている airflow-worker-set-controller という Pod がAirflow のワーカーのスケールアウト/スケールインを行います。

resource.labels.cluster_name="COMPOSER-GKE-CLUSTER-NAME"
resource.labels.namespace_name="composer-system"
labels.k8s-pod/control-plane="worker-set-controller"

このログを調べることで、いつ/なぜ Composer はワーカーをスケールしたのかを確認できます。また、Composer のスケールはCloud Monitoring上の指標 composer.googleapis.com/environment/worker/scale_factor_target を参照しているため、この指標と上記ログを見比べるとより正確なスケールの理解が深まると思います。

OOM(Out of Memory)ログの確認

Apache Airflow のワーカーを構成する際にメモリー量を指定する必要があります。このメモリー量は GKE の Pod のリソース割り当てとして利用され、この量以上のメモリーをタスクが要求すると Kubernetes の機能によってワーカーの Pod が削除されワーカーは再起動します。

このイベントは GKE ダッシュボードのワークロード画面からも確認できますが、次のログフィルターからでも確認できます。

resource.labels.cluster_name="COMPOSER-GKE-CLUSTER-NAME"
resource.type="k8s_node"
log_id("events")
(jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory"))

可用性を高める機能を利用する

2023年5月より Highly Resilient環境 が Composer に提供されています。この機能を有効にするとバックエンドのデータベースや Apache Airflow の一部コンポーネントが高可用性を持つ構成に変更され、ゾーン障害に対してある程度の耐久性を持てるようになります。各コンポーネントが冗長化されるためコストが増えますが、 Composer の環境の停止がビジネスへ大きな影響をもたらす場合は Highly Resilient 環境を検討してみてください。

ただし Highly Resilient 環境はゾーン障害を軽減する(高可用性を持たせ、フェイルオーバーを可能にする)ものであり、Composer の環境を必ずゾーン障害から復旧できることを保証するものではありません。たとえば障害に巻き込まれたワーカー上の DAG は Failed としてマークされタスクが中断される可能性があります。

そのため、本番環境に利用する前にフェイルオーバーテストを検証し、本番環境で万が一の障害の訓練をしておきましょう。復元性に優れた Cloud Composer 環境のフェイルオーバー テストを実行する

DAG の実装周り

DAG そのものの実装に関しては、 Apache Airflow 公式ドキュメントに記載されている通りのプラクティスに従ってください。Best Practices - Airflow Documentation

Composer 固有ベタープラクティスを以下に紹介します。

dags/ 以外のパスに Python モジュールを配置しない

ワーカーの Pod ではデフォルトで /home/airflow/gcs/ 配下に dags/data/ のディレクトリを Cloud Storage からマウントします。ドキュメントでも言及されていますが、data/ ディレクトリに Python モジュールを保存しないでください。DAG 実行に必要なモジュールは dags/ 配下に配置してください。

Composer は dags/ ディレクトリ以下を定期的にスキャンして効率よく Python スクリプトを起動できるような仕組みを持っています。data/ 以下に Python モジュールを配置して無理やり DAG からモジュールを読み込もうとすると、DAG 起動ごとに Cloud Storage から Python モジュールを取得するようになるため実行時間が伸び、場合によっては DAG の実行が不安定になります。

データ処理そのものは Composer の外で処理する

例として、次のとあるデータパイプラインを考えてみます。

このようなデータ処理のパイプラインを考える際、なるべく DAG でデータ処理そのものを行わないことを推奨します。つまり、外部データベースへのクエリーやその集約、モデルのトレーニングはのワーカー上で実行しない方が良いでしょう。データ処理に大きな計算が必要になる DAG はワーカーの CPU やメモリーなどのリソースを占有してしまいます。その結果、他の DAG が遅延することにつながったり余計なオートスケールが発生する原因につながります。

ベタープラクティスとして、なるべくデータ処理そのものは Composer の環境の外で実行することを推奨します。たとえば上記のパイプラインでは、次のように Dataflow を使ったり Vertex Pipelines を使ったりなど Composer 以外のリソースを利用してデータ処理を行い、 Composer はそれらのオーケストレーションに専念する設計が良いでしょう。

外部リソースから DAG をスケジューリングする

Cloud Functions と Pub/Sub を利用して、好きなタイミングでキックすることが可能です。Cloud Functions と Airflow REST API を使用して Cloud Composer DAG をトリガーする

事前にある程度のセットアップが必要になりますが、この機能を使うと Pub/Sub と Cloud Functions を接続して外部サービスとの連携が簡単に実装できます。たとえば動画サイトのチャンネル更新のタイミングや小売店の在庫変動のタイミングなど、様々な通知チャンネルを通じて任意のタイミングで DAG を起動してパイプラインを起動できます。断面データセットの作成やデータリフレッシュなどに利用できると思います。

おわりに

Composer は Apache Airflow の多くの複雑度を解消できるマネージドサービスです。しかしそれでもまだユーザー自身で管理しないといけない領域はあります。
もし Composer を利用する機会があれば、ぜひこのドキュメントを見直してもらえると幸いです。

Google Cloud Japan

Discussion