フェズのデータ基盤を支えるCloud Composer
Cloud Composerで支えるデータパイプライン
データ基盤チームのよしたけです。フェズのデータ基盤の構築、運用を担当しています。
今回はフェズのデータ基盤構成を簡単に紹介しつつ、各タスクのパイプラインを管理、制御するCloud Composerの運用周りについて紹介したいと思います。
Airflow/Cloud Composerとは
フェズではデータ基盤のワークフロー管理にGCPのサービスの一つであるCloud Composerを使用しています。
Cloud ComposerはGCP上で動作するマネージドなAirflowのサービスです。
Airflowはデータパイプラインを管理するのによく利用される、オープンソースのワークフロー管理ツールです。
Airflowでは以下のことができます。
- ジョブのスケジューリング
- cron形式で、開始時間を指定可能
- タスクの依存関係をコード記述
- Pythonでタスク間の依存関係を有向グラフで記述
- Airflow自体もPythonで書かれている
- WebUIによる各種レポートのビジュアライズ
また、Cloud ComposerではIAMを使用したアクセス制御も可能なので、社内で使用しているGoogleアカウントに対して権限付与を行うことも可能です。
フェズのデータ基盤構成
フェズのデータ基盤では、小売事業者様からお預かりしたデータをソースとし、それを加工や整形、集計を行い、その結果をデータウェアハウスやデータマートとして活用しています。
フェズのデータ基盤は主に以下のサービスを用いて構築されています。
- Cloud Storage
- GCPのオブジェクトストレージサービス
- 元データとなるCSVファイルなどを格納するのに使用している
- BigQuery
- GCPのフルマネージドのデータウェアハウス
- フェズではデータレイク、データウェアハウスの役割として使用している
- Dataflow
- フルマネージドのデータ処理サービス
- CSVファイルのデータを加工し、BigQueryのデータレイク領域に格納している
- Cloud Functions
- サーバーレスのランタイム環境
- ETL処理などに使用している
- Cloud Run
- コンテナ化したアプリケーションの実行環境
- データ転送やETLなどのトリガーとして使用している
- 最初はCloud Functionsで実装していたもののうち、時間がかかるものをCloud Runへ移行している
これらのサービスを使った、フェズでのデータフローは以下のような流れになります。
Cloud Composer導入以前
Cloud Composerを導入する以前は、Cloud SchedulerとPub/Subを用いてタスクのスケジュール、実行管理を行っていました。
しかし、以下のような問題が生じていました。
- タスクの実行成否に関わらず、次のタスクがスケジュール実行されてしまう
- 前のタスクのおおよその終了時間を見越して、次のタスクの実行時間をスケジュール設定
- タスク間の依存関係を管理できないため、スケジュール設定のみに頼った煩雑な管理になってしまう
そこで、上記の問題を解決するためにCloud Composerの導入を行いました。
各サービスの呼び出し方
DAGは大体はデータソースごとに分けて管理しています。データソースによって入手タイミングが異なるため、提供される時間に併せて各DAGのスケジュールを設定し、実行されるようにしています。
タスクの流れは前述のようなデータフローに沿ってDAGを組んでいます。
各GCPサービスの呼び出し方は以下の方針で行っています。
Dataflow
-
DataflowStartFlexTemplateOperator
を用いてDataflowを起動しています。 - Flex TemplateファイルはGCSにデプロイされているため、それをパラメータとして与えることで実行されるようになっています。
Cloud Functions
-
CloudFunctionInvokeFunctionOperator
を基本的には利用しています。 - 但し、60秒の制限があるため、処理時間がそれ以上のFunctionについてはタイムアウトになってしまうので
BashOperator
でgcloud functions call
コマンドをBashOperator
で実行しています。
Cloud Run
- コンテナで
Flask
を起動してhttpをトリガーとしているので、PythonOperator
でrequests
ライブラリを用いて認証、リクエスト実行を行っています。 - アクセスするエンドポイントURLは固定値で保持するのではなく、
BashOperator
でgcloud run services describe
を都度実行して取得しています。 - 処理実行に長時間(大体15分以上)かかる場合、終了判定が行われずOperatorが実行中のままストールしてしまう問題があります。
- httpセッションが切れてしまっている?
- 現在はストールに気づいたときにAirflownのコンソールから
Mark Success
に切り替える運用でカバーしています… - Cloud Run側の作りを変更して、実行リクエストの受付とタスク終了ステータス取得のエンドポイントを分けることを検討中
スケジュール実行
運用しているDAGのほとんどは日次でデータを受け取り、加工処理を行っています。そのため各タスクでは、いつのデータに対する処理か、パラメータを受け取るようにしています。
このパラメータはDAG実行時に与えられる execution_date
をもとに生成しています。また、手動実行時に日付指定ができるよう、パラメータで受け取ることができるような実装をしています。DAGの最初のタスクで execution_date
から日付を取るか、パラメータから取得するかを決定し、その日付パラメータをXComに積んでおき、各OperatorではXComから値を取得するようにしています。(現在使用しているAirflowのバージョンが2.1.4なので、 execution_date
を使用しています)
ステージ管理
開発の流れとして、本番、ステージング、QA、開発の4段階に分けており、それぞれのステージでCloud Composerを起動できるようにしています。
ただ、すべてのステージで常時起動するのはコスト的に問題があるので、本番環境のみ常時起動で、それ以外の環境は必要なタイミングで起動し、夜10時になるとスクリプトで自動的に落とす運用にしています。
それぞれのステージの判定は、環境変数で持つようにしています。Operator内で環境変数の値を参照し、GCPのプロジェクトやGCSのバケット名などを切り替えられるような仕組みを実装しています。
テスト
DAGのテストはpytestを利用しています。AirflowのドキュメントにあるBest Practicesを参考に、各Operatorの設定値などに問題がないことをテストしています。各Operatorから呼び出されるGCPサービスのテストは、それぞれの実装を管理しているリポジトリ内で別途テストが用意されているので、ここではそれらの実行内容に関しては重視せず、あくまでDAGが正しいかどうかを見るようにしています。
今後の課題
現在ELT処理で実行するBigQueryのクエリは、Pythonのコード内で管理しており多少煩雑なところがあります。これらのクエリやテーブルスキーマ、データリネージの管理でdbtの利用を検討しています。dbtでの処理もCloud Composerのパイプラインに組み込んでいければいいなと考えてます。
また、今も簡単なデータバリデーションチェックはしているのですが、Great Expectationsを導入してパイプラインに組み込んでいくことで品質向上に繋げられないか、ということも考えています。
その他、リソースの最適化やCloud Composer 2への移行、Airflow 2.2系へのバージョンアップなど、まだまだ課題が多くあるので今後も本ブログでご報告していけたらと思っております。
フェズは、「情報と商品と売場を科学し、リテール産業の新たな常識をつくる。」をミッションに掲げ、リテールメディア事業・リテールDX事業を展開しています。 fez-inc.jp/recruit
Discussion