FirestoreからAWSにデータを同期するパイプラインをGitHub Actionsで簡単に作る
背景
- AWSメインでアプリケーションを構築しており、ほとんどのデータはRDS(Aurora)に保存、アプリケーションサーバー(Laravel)もFargate上で動作している
- 一部のデータだけリアルタイム通信が必要だった背景からFirestoreに保存されている
- この度、Firestore上に保存されているデータを集計することで特定のユーザーの行動量を「多い」「普通」「少なめ」の3段階で評価しフロントに表示する要件ができた
- Firestore上に保存されている集計対象のデータは累計70万件を超える
- 当然ながらFirestoreは複雑な集計クエリをサポートしていないため、LaravelアプリケーションからFirestoreに直接接続してクエリするのはデータ量の観点から見ても現実的ではない
- 以上から、Firestore上のデータを何らかの方法で集計クエリ発行可能なデータストアに同期(定期的、またはストリーミング)し、そこに対してLaravelからクエリすることで要件を実現可能にしたい
制約条件・前提条件
- AWSメインでアプリケーションを構築していることから、同期のパイプラインもAWSに寄せたり、GCP特有の仕組みに依存しないようにしたい
- 上記に関連して、本機能は事業にとって最重要かというとそうでもないため、学習コストが高くなるような仕組みにはしたくない(ROIが合わない)
- チーム全体が把握しているインフラ相当の知識として、PHPからのAWS SDKの利用、簡単なシェルスクリプトの実装、GitHub Actionsの利用、SQLの記述、S3やIAMなどメジャーなAWSサービスの利用などがあり、学習コストの観点ではこれらに依存することが望ましい
- 仕様上、即時性の高いデータ反映は不要。具体的には、集計自体は日次で反映されるくらいの遅延を許せる
- Firestore上のデータにもユーザーIDが紐づいているから、RDS上のユーザーデータと突合することで誰のデータか判別可能
構築したパイプライン
全体感
以下のタイムラインで複数のステップを回しています。
- 毎日0時にFirestoreからGCSにExport
- 毎日深夜2時にGitHub Actionsを動かし、BigQueryへのロード、クエリ、結果のS3への吐き出しを実施
- 毎日早朝4時にLaravel上でバッチを動かし、S3からデータを読み込んでRDSへ書き込み
これでRDS上に集計済みFirestoreデータが入るので、あとはフロントからAPIが叩かれたときに適宜JOINして返せばよいです。
Step1: Firestore->GCSに日次Export
Firebase Cloud Functionsから、GCSに対してExportができます。
export default () => {
return functions.pubsub
.schedule('0 0 * * *')
.timeZone('Asia/Tokyo')
.onRun(async _ => {
const databaseName = client.databasePath(functions.config().hoge_export.project, '(default)');
// export先のバケットのフォルダ名の規則
// hogeCollection + yyyymmdd
// 例:hogeCollection20231010
const folderName = `hogeCollection${new Date().toISOString().slice(0, 10).replace(/-/g, '')}`;
return client
.exportDocuments({
name: databaseName,
outputUriPrefix: functions.config().hoge_export.output_uri_prefix + '/' + folderName,
collectionIds: ['hogeCollection'],
})
// 以下略
});
};
弊社の場合は既存コードで類似した処理をやっている事例があったのでここは踏襲しましたが、gcloudコマンドでもExportできるみたいなので、後述するGitHub Actionsとの統合をどのみちやるなら、そちらに処理を統一してもいいかもしれません。
Step2: [GitHub Actions] bq loadコマンドでGCS内のバックアップをBigQuery上に読み込み
ここからいきなりGitHub Actions上での対応になります。
GitHub Actions内でGCPに対して認証を行う方法は後述します(IAMではなくWorkload Identity Federationを使いました)。
以下2つのコマンドを実行することで読み込みをします。愚直ですがExport時のバケット名にYYYYMMDDを足すことで、毎日Exportしていても当日のExportを読み込めるようにします。
YESTERDAY=$(date '+%Y%m%d')
TABLE_NAME="hogeCollection$YESTERDAY"
echo "TABLE_NAME=$TABLE_NAME" >> $GITHUB_ENV
bq load --source_format=DATASTORE_BACKUP --replace firestore_export.hoge_collection gs://${{ matrix.gcp_bucket }}/$TABLE_NAME/all_namespaces/kind_messages/all_namespaces_kind_messages.export_metadata
GCSに吐き出されたFirestoreバックアップはCSVやJSON形式なのかなと思ったら当然ながらそんなはずもなく独自バイナリのようでした。絶望していたのですが--source_format=DATASTORE_BACKUP
オプションをつけることでBigQueryへの読み込みができました。
Step3: [GitHub Actions] bq queryコマンドで集計クエリを実行
bq loadコマンドの実行が終わるとBigQuery上にテーブルが展開されているので、その流れでbq queryを実行しちゃいます。
リファレンス: https://cloud.google.com/bigquery/docs/reference/bq-cli-reference?hl=ja#bq_query
DATASET_NAME="firestore_export.hoge_collection"
SQL=$(cat << EOF
SELECT
// 中略
FROM
\`${{ matrix.gcp_project }}.$DATASET_NAME\`
WHERE
// 中略
AND TIMESTAMP(createTime) > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL XXX DAY)
GROUP BY
// 中略
ORDER BY
// 中略;
EOF
)
bq query --use_legacy_sql=false -n=100000 --format=csv "$SQL" > result.csv
ちょっと力技で解決した罠として、-n
オプションを指定しないとデフォルトでは100行しか返ってこないという点があります。集計データの特性上、事業が相当先のフェーズにならないと10万行なんて超えそうにも無いなと思ったので一旦10万を指定しました。
クエリ結果はとりあえずCSV形式で吐き出したうえでresult.csvに置きます。
Step4: [GitHub Actions] S3へのアップロード
とにかくデータをAWS内に持ってきて終わらせたいのでS3にアップロードします。割愛していますがもちろんaws-actions/configure-aws-credentials
を使った認証を行います。
aws s3 cp result.csv s3://${{ matrix.aws_s3_bucket }}/$TABLE_NAME.csv
Step5: [GitHub Actions] 成功失敗の通知
こういう処理が無言で落ちると怖いので、以下のように成功失敗の通知をSlackに流すようにしておきました。
- name: Notify Failure
if: ${{ failure() }}
uses: 8398a7/action-slack@v3.14.0
with:
status: ${{ job.status }}
author_name: Firestore -> S3へのチャットインポートバッチが失敗しました
fields: repo,message,commit,action
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_DEPLOY_NOTIFY_WEBHOOK_URL }}
Step6: [Laravel] Artisan Commandの実装
最後はLaravel依存の解説にはなってしまいますが、Artisan CommandからS3上のファイルをダウンロードし、最後にもう一度アプリケーション用途に合わせた集計をして結果をテーブルに入れています。BigQuery上の集計はデータ整形の目的が強く、それをRAWデータとしてArtisan Command上でよりアプリケーション仕様に合わせた集計をするように二層構造にしました。
public function handle(): void
{
$date = Carbon::yesterday()->format('Ymd');
$path = "hogeCollection{$date}.csv";
$contents = Storage::disk('hoge_bucket')->get($path);
$rows = array_map('str_getcsv', explode("\n", $contents));
array_shift($rows);
$dataToInsert = [];
foreach ($rows as $row) {
if (count($row) === 4) {
$dataToInsert[] = [
// 中略
];
}
}
if (empty($dataToInsert)) {
$this->error('No data to insert!');
Log::error('Error');
return Command::FAILURE;
}
DB::transaction(function () use ($dataToInsert) {
DB::table('table_name')->delete();
DB::table('table_name')->insert($dataToInsert);
});
SaveQueryResultToTable::execute(
'table_name2',
<<<'SQL'
// 中略
SQL
);
$this->info('Data imported successfully!');
return Command::SUCCESS;
}
上記クエリで言うところのtable_name2
テーブルがフロントから呼ばれるAPIを実行したときにユーザー側テーブルとJOINされるテーブルです。
解説
GitHub Actions上でWorkload Identity Federationを利用した認証
CIのためにIAMキーをいちいち発行しているとキリがないし、まあまあ強い権限のキーが各所に散らばることになり怖いので、Workload Identity Federationという仕組みを使うことがおすすめです。
公式ドキュメント
AWSでも類似の仕組みがあります。
公式ドキュメントの説明を読みながら進めると意外と簡単に終わります。
最終的にGitHub Actionsでの認証は以下のようになります。
jobs:
main:
strategy:
matrix:
include:
- gcp_project: hoge
gcp_project_id: hoge
gcp_bucket: hoge
gcp_pool_id: hoge
gcp_service_account: hoge
aws_s3_bucket: hoge
- gcp_project: hoge
gcp_project_id: hoge
gcp_bucket: hoge
gcp_pool_id: hoge
gcp_service_account: hoge
aws_s3_bucket: hoge
runs-on: ubuntu-latest
permissions:
contents: 'read'
id-token: 'write'
steps:
- id: 'auth'
uses: 'google-github-actions/auth@v1'
with:
workload_identity_provider: 'projects/${{ matrix.gcp_project_id }}/locations/global/workloadIdentityPools/${{ matrix.gcp_pool_id }}/providers/github-actions'
service_account: ${{ matrix.gcp_service_account }}
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v1'
with:
version: '>= 363.0.0'
- name: Use gcloud CLI
run: gcloud info
取り立ててSecretの必要なくパイプラインを構築することができました。
課題点
これにてFirestoreのデータを集計してAWS上で稼働するアプリケーションから取得することができるようになりましたが、以下の課題点は残っていると思います。
- データがリアルタイム同期できない
- 今回は要件的に日次でよかったのでこれで行けましたが、リアルタイム性が求められるとFirestoreからBqへの同期をStreamingにした上で、BigQueryにGoogle DataStreamを用いてRDSから逆にBigQueryにデータをStreamingでコピーし、LaravelからBigQueryを叩いて要件を満たすといった力技が必要になるかもしれません
- CIの不足
- GitHub Actionsが明確に落ちた場合は検知できるものの、落ちてはいないがデータがなんかおかしくなった、みたいなケースを検知するのがこのままだと難しいです。具体的にいえばFirestore上のデータ形式がなんらかの仕様変更で変わったときに、当然ながら今回のパイプラインも影響範囲ですが、そのときに誰がそれを思い出すのか、という課題があります
検討した他の選択肢と回避した理由
FirestoreからAWSにデータを持っていきたいなどという頻出過ぎるであろう要件に対して、愚直にGitHub Actionsからコマンドを叩くような原始的な方法しか無いわけがなく、調べたところ色々な方法がヒットしました。ただ、いずれもこの施策のためにわざわざ新しい仕組みを入れるというところと費用対効果の観点で納得の行くものではなく(本施策はサービス内での重要度が全然高くなく、今回の実装が起因してエンジニアのベースラインを上げることはしたくなかった)、GitHub Actionsでゴリゴリやることにしました。
際どかったのはS3にデータを置いた後Athenaを通してS3上のデータをクエリするという選択肢で、Athena自体の利用実績もあったので全然やってよかったのですが、最終的にRDS上のデータをJOINしないといけない要件だったのでRDSに突っ込むことになるんだよなぁと思うとわざわざAthenaを介する気もしなくなりました。
Amazon Athena Google BigQuery コネクタ
AthenaからBigQueryにクエリが飛ばせるらしいです。便利ですね。bq loadまではGitHub Actionsで組んで、そこから先はLaravelからAthenaクエリをコールする選択肢もあったかもしれません。
ただ、理論的にはAWSからGCP上にアクセス権限をつける必要があるはずで、それがどうもデータソースコネクタというものみたいなのですが、ここの概念から学習するとなると手間の割に得られるメリットが感じられないなぁと思いました。
クエリ結果を Amazon S3 にエクスポートする
BigQueryからS3に接続させる設定を完了していると、BigQueryのクエリ結果をS3にエクスポートできるらしいです。S3に置くことができたらば、それをLaravelバッチから読んでもいいし、Athenaを通して読んでもいいですからね。
ただこれもさっきの逆バージョンでGCPからAWSへのアクセス権限を付ける方法が専用であるらしく、こんなことをやるなら要件的にはGitHub Actionsから双方にアクセスで良いんですよねと思ってしまいました。
BigQueryから再度GCSにおいて、それをS3に持っていく
もうここまで来ると冗長なだけなのではという気もするのですが一応検討しました。
選択肢一覧
検討した選択肢の全体感は以下の通りです。
オンライン家庭教師マナリンクを運営するスタートアップNoSchoolのテックブログです。 manalink.jp/ 創業以来年次200%前後で売上成長しつつ、技術面・組織面での課題に日々向き合っています。 カジュアル面談はこちら! forms.gle/fGAk3vDqKv4Dg2MN7
Discussion