📝

FirestoreからAWSにデータを同期するパイプラインをGitHub Actionsで簡単に作る

2023/11/20に公開

背景

  • AWSメインでアプリケーションを構築しており、ほとんどのデータはRDS(Aurora)に保存、アプリケーションサーバー(Laravel)もFargate上で動作している
  • 一部のデータだけリアルタイム通信が必要だった背景からFirestoreに保存されている
  • この度、Firestore上に保存されているデータを集計することで特定のユーザーの行動量を「多い」「普通」「少なめ」の3段階で評価しフロントに表示する要件ができた
  • Firestore上に保存されている集計対象のデータは累計70万件を超える
  • 当然ながらFirestoreは複雑な集計クエリをサポートしていないため、LaravelアプリケーションからFirestoreに直接接続してクエリするのはデータ量の観点から見ても現実的ではない
  • 以上から、Firestore上のデータを何らかの方法で集計クエリ発行可能なデータストアに同期(定期的、またはストリーミング)し、そこに対してLaravelからクエリすることで要件を実現可能にしたい

制約条件・前提条件

  • AWSメインでアプリケーションを構築していることから、同期のパイプラインもAWSに寄せたり、GCP特有の仕組みに依存しないようにしたい
  • 上記に関連して、本機能は事業にとって最重要かというとそうでもないため、学習コストが高くなるような仕組みにはしたくない(ROIが合わない)
  • チーム全体が把握しているインフラ相当の知識として、PHPからのAWS SDKの利用、簡単なシェルスクリプトの実装、GitHub Actionsの利用、SQLの記述、S3やIAMなどメジャーなAWSサービスの利用などがあり、学習コストの観点ではこれらに依存することが望ましい
  • 仕様上、即時性の高いデータ反映は不要。具体的には、集計自体は日次で反映されるくらいの遅延を許せる
  • Firestore上のデータにもユーザーIDが紐づいているから、RDS上のユーザーデータと突合することで誰のデータか判別可能

構築したパイプライン

全体感

以下のタイムラインで複数のステップを回しています。

  1. 毎日0時にFirestoreからGCSにExport
  2. 毎日深夜2時にGitHub Actionsを動かし、BigQueryへのロード、クエリ、結果のS3への吐き出しを実施
  3. 毎日早朝4時にLaravel上でバッチを動かし、S3からデータを読み込んでRDSへ書き込み

これでRDS上に集計済みFirestoreデータが入るので、あとはフロントからAPIが叩かれたときに適宜JOINして返せばよいです。

Step1: Firestore->GCSに日次Export

Firebase Cloud Functionsから、GCSに対してExportができます。

export default () => {
  return functions.pubsub
    .schedule('0 0 * * *')
    .timeZone('Asia/Tokyo')
    .onRun(async _ => {
      const databaseName = client.databasePath(functions.config().hoge_export.project, '(default)');

      // export先のバケットのフォルダ名の規則
      // hogeCollection + yyyymmdd
      // 例:hogeCollection20231010
      const folderName = `hogeCollection${new Date().toISOString().slice(0, 10).replace(/-/g, '')}`;

      return client
        .exportDocuments({
          name: databaseName,
          outputUriPrefix: functions.config().hoge_export.output_uri_prefix + '/' + folderName,
          collectionIds: ['hogeCollection'],
        })
        // 以下略
    });
};

弊社の場合は既存コードで類似した処理をやっている事例があったのでここは踏襲しましたが、gcloudコマンドでもExportできるみたいなので、後述するGitHub Actionsとの統合をどのみちやるなら、そちらに処理を統一してもいいかもしれません。

https://firebase.google.com/docs/firestore/manage-data/export-import?hl=ja

Step2: [GitHub Actions] bq loadコマンドでGCS内のバックアップをBigQuery上に読み込み

ここからいきなりGitHub Actions上での対応になります。
GitHub Actions内でGCPに対して認証を行う方法は後述します(IAMではなくWorkload Identity Federationを使いました)。

以下2つのコマンドを実行することで読み込みをします。愚直ですがExport時のバケット名にYYYYMMDDを足すことで、毎日Exportしていても当日のExportを読み込めるようにします。

YESTERDAY=$(date '+%Y%m%d')
TABLE_NAME="hogeCollection$YESTERDAY"
echo "TABLE_NAME=$TABLE_NAME" >> $GITHUB_ENV
bq load --source_format=DATASTORE_BACKUP --replace firestore_export.hoge_collection gs://${{ matrix.gcp_bucket }}/$TABLE_NAME/all_namespaces/kind_messages/all_namespaces_kind_messages.export_metadata

GCSに吐き出されたFirestoreバックアップはCSVやJSON形式なのかなと思ったら当然ながらそんなはずもなく独自バイナリのようでした。絶望していたのですが--source_format=DATASTORE_BACKUPオプションをつけることでBigQueryへの読み込みができました。

Step3: [GitHub Actions] bq queryコマンドで集計クエリを実行

bq loadコマンドの実行が終わるとBigQuery上にテーブルが展開されているので、その流れでbq queryを実行しちゃいます。

リファレンス: https://cloud.google.com/bigquery/docs/reference/bq-cli-reference?hl=ja#bq_query

          DATASET_NAME="firestore_export.hoge_collection"
          
          SQL=$(cat << EOF
          SELECT
          // 中略
          FROM
          \`${{ matrix.gcp_project }}.$DATASET_NAME\`
          WHERE
          // 中略
          AND TIMESTAMP(createTime) > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL XXX DAY)
          GROUP BY
          // 中略
          ORDER BY
          // 中略;
          EOF
          )
          
          bq query --use_legacy_sql=false -n=100000 --format=csv "$SQL" > result.csv

ちょっと力技で解決した罠として、-nオプションを指定しないとデフォルトでは100行しか返ってこないという点があります。集計データの特性上、事業が相当先のフェーズにならないと10万行なんて超えそうにも無いなと思ったので一旦10万を指定しました。

クエリ結果はとりあえずCSV形式で吐き出したうえでresult.csvに置きます。

Step4: [GitHub Actions] S3へのアップロード

とにかくデータをAWS内に持ってきて終わらせたいのでS3にアップロードします。割愛していますがもちろんaws-actions/configure-aws-credentialsを使った認証を行います。

aws s3 cp result.csv s3://${{ matrix.aws_s3_bucket }}/$TABLE_NAME.csv

Step5: [GitHub Actions] 成功失敗の通知

こういう処理が無言で落ちると怖いので、以下のように成功失敗の通知をSlackに流すようにしておきました。

      - name: Notify Failure
        if: ${{ failure() }}
        uses: 8398a7/action-slack@v3.14.0
        with:
          status: ${{ job.status }}
          author_name: Firestore -> S3へのチャットインポートバッチが失敗しました
          fields: repo,message,commit,action
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_DEPLOY_NOTIFY_WEBHOOK_URL }}

Step6: [Laravel] Artisan Commandの実装

最後はLaravel依存の解説にはなってしまいますが、Artisan CommandからS3上のファイルをダウンロードし、最後にもう一度アプリケーション用途に合わせた集計をして結果をテーブルに入れています。BigQuery上の集計はデータ整形の目的が強く、それをRAWデータとしてArtisan Command上でよりアプリケーション仕様に合わせた集計をするように二層構造にしました。

    public function handle(): void
    {
        $date = Carbon::yesterday()->format('Ymd');
        $path = "hogeCollection{$date}.csv";

        $contents = Storage::disk('hoge_bucket')->get($path);

        $rows = array_map('str_getcsv', explode("\n", $contents));
        array_shift($rows);

        $dataToInsert = [];
        foreach ($rows as $row) {
            if (count($row) === 4) {
                $dataToInsert[] = [
                    // 中略
                ];
            }
        }

        if (empty($dataToInsert)) {
            $this->error('No data to insert!');
            Log::error('Error');

            return Command::FAILURE;
        }

        DB::transaction(function () use ($dataToInsert) {
            DB::table('table_name')->delete();
            DB::table('table_name')->insert($dataToInsert);
        });

        SaveQueryResultToTable::execute(
            'table_name2',
            <<<'SQL'
                // 中略
                SQL
        );

        $this->info('Data imported successfully!');
        return Command::SUCCESS;
    }

上記クエリで言うところのtable_name2テーブルがフロントから呼ばれるAPIを実行したときにユーザー側テーブルとJOINされるテーブルです。

解説

GitHub Actions上でWorkload Identity Federationを利用した認証

CIのためにIAMキーをいちいち発行しているとキリがないし、まあまあ強い権限のキーが各所に散らばることになり怖いので、Workload Identity Federationという仕組みを使うことがおすすめです。

公式ドキュメント
https://cloud.google.com/iam/docs/workload-identity-federation?hl=ja
https://cloud.google.com/iam/docs/workload-identity-federation-with-deployment-pipelines?hl=ja

AWSでも類似の仕組みがあります。
https://zenn.dev/kou_pg_0131/articles/gh-actions-oidc-aws

公式ドキュメントの説明を読みながら進めると意外と簡単に終わります。
最終的にGitHub Actionsでの認証は以下のようになります。

jobs:
  main:
    strategy:
      matrix:
        include:
          - gcp_project: hoge
            gcp_project_id: hoge
            gcp_bucket: hoge
            gcp_pool_id: hoge
            gcp_service_account: hoge
            aws_s3_bucket: hoge
          - gcp_project: hoge
            gcp_project_id: hoge
            gcp_bucket: hoge
            gcp_pool_id: hoge
            gcp_service_account: hoge
            aws_s3_bucket: hoge

    runs-on: ubuntu-latest
    permissions:
      contents: 'read'
      id-token: 'write'

    steps:
      - id: 'auth'
        uses: 'google-github-actions/auth@v1'
        with:
          workload_identity_provider: 'projects/${{ matrix.gcp_project_id }}/locations/global/workloadIdentityPools/${{ matrix.gcp_pool_id }}/providers/github-actions'
          service_account: ${{ matrix.gcp_service_account }}

      - name: 'Set up Cloud SDK'
        uses: 'google-github-actions/setup-gcloud@v1'
        with:
          version: '>= 363.0.0'

      - name: Use gcloud CLI
        run: gcloud info

取り立ててSecretの必要なくパイプラインを構築することができました。

課題点

これにてFirestoreのデータを集計してAWS上で稼働するアプリケーションから取得することができるようになりましたが、以下の課題点は残っていると思います。

  • データがリアルタイム同期できない
    • 今回は要件的に日次でよかったのでこれで行けましたが、リアルタイム性が求められるとFirestoreからBqへの同期をStreamingにした上で、BigQueryにGoogle DataStreamを用いてRDSから逆にBigQueryにデータをStreamingでコピーし、LaravelからBigQueryを叩いて要件を満たすといった力技が必要になるかもしれません
  • CIの不足
    • GitHub Actionsが明確に落ちた場合は検知できるものの、落ちてはいないがデータがなんかおかしくなった、みたいなケースを検知するのがこのままだと難しいです。具体的にいえばFirestore上のデータ形式がなんらかの仕様変更で変わったときに、当然ながら今回のパイプラインも影響範囲ですが、そのときに誰がそれを思い出すのか、という課題があります

検討した他の選択肢と回避した理由

FirestoreからAWSにデータを持っていきたいなどという頻出過ぎるであろう要件に対して、愚直にGitHub Actionsからコマンドを叩くような原始的な方法しか無いわけがなく、調べたところ色々な方法がヒットしました。ただ、いずれもこの施策のためにわざわざ新しい仕組みを入れるというところと費用対効果の観点で納得の行くものではなく(本施策はサービス内での重要度が全然高くなく、今回の実装が起因してエンジニアのベースラインを上げることはしたくなかった)、GitHub Actionsでゴリゴリやることにしました。

際どかったのはS3にデータを置いた後Athenaを通してS3上のデータをクエリするという選択肢で、Athena自体の利用実績もあったので全然やってよかったのですが、最終的にRDS上のデータをJOINしないといけない要件だったのでRDSに突っ込むことになるんだよなぁと思うとわざわざAthenaを介する気もしなくなりました。

Amazon Athena Google BigQuery コネクタ

AthenaからBigQueryにクエリが飛ばせるらしいです。便利ですね。bq loadまではGitHub Actionsで組んで、そこから先はLaravelからAthenaクエリをコールする選択肢もあったかもしれません。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/connectors-bigquery.html

ただ、理論的にはAWSからGCP上にアクセス権限をつける必要があるはずで、それがどうもデータソースコネクタというものみたいなのですが、ここの概念から学習するとなると手間の割に得られるメリットが感じられないなぁと思いました。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/connect-to-a-data-source-lambda.html

クエリ結果を Amazon S3 にエクスポートする

BigQueryからS3に接続させる設定を完了していると、BigQueryのクエリ結果をS3にエクスポートできるらしいです。S3に置くことができたらば、それをLaravelバッチから読んでもいいし、Athenaを通して読んでもいいですからね。
https://cloud.google.com/bigquery/docs/omni-aws-export-results-to-s3?hl=ja

ただこれもさっきの逆バージョンでGCPからAWSへのアクセス権限を付ける方法が専用であるらしく、こんなことをやるなら要件的にはGitHub Actionsから双方にアクセスで良いんですよねと思ってしまいました。
https://cloud.google.com/bigquery/docs/omni-aws-create-connection?hl=ja

BigQueryから再度GCSにおいて、それをS3に持っていく

もうここまで来ると冗長なだけなのではという気もするのですが一応検討しました。

https://cloud.google.com/bigquery/docs/exporting-data?hl=ja
https://tech.rhythm-corp.com/transfer-bq-data-to-s3-and-run-in-athena/

選択肢一覧

検討した選択肢の全体感は以下の通りです。

マナリンク Tech Blog

Discussion