COTENのデータ分析基盤とその運用を支える技術
はじめに
こんにちは、COTENで世界史データベースを作っている寺澤です。世界史データベースのデータ品質管理を行っています。
世界史データベースは、メインデータベースにPostgreSQLを使っています。データ入力用のアプリケーションは、このPostgreSQLとやり取りして世界史のデータを登録しています。
さらに、もう一つのデータベースとしてSnowflakeも導入しています。こちらはDWH(データウェアハウス)として、次のような目的で活用しています。
- データの拡充状況をチーム内外で共有する
- 時系列でデータを比較して品質を管理する
このDWHと、データを取り込むパイプラインなどの分析基盤は、私が運用を担当しています。私のメインの仕事は品質管理戦略の策定と実行ですが、分析基盤の運用も兼務している形になります。そのため、できるだけ運用の手間を減らせるよう、様々な工夫を取り入れています。
本記事では、データ分析基盤のアーキテクチャと、その運用を支える技術についてご紹介します。
データ基盤の全体像
まずは、データ基盤の全体像をご覧ください。

主要な技術スタック
- RDBMS: PostgreSQL, Amazon RDS
- Data Lake: Amazon S3
- DWH: Snowflake
- ETL/ELT: dbt, AWS Step Functions, Amazon ECS(Fargate), Amazon EventBridge
- Document Hosting: Amazon S3, Amazon CloudFront
- CI/CD: GitHub Actions
- IaC(Infrastructure as Code): Terraform
主なデータの流れ
データの流れは、本番サービスに影響を与えず、かつ安定的に実行されるように設計されています。
- データ抽出: まず、
Product Accountにある本番データベースのDaily Backupを、Analytics AccountのS3バケットへエクスポートします。これにより、本番サービスに一切負荷をかけることなく、安全にデータを分析環境へ同期できます。 - データ変換・格納: S3に保存された生データは、dbtが分析しやすい形に変換・加工し、データウェアハウスであるSnowflakeに格納します。
- 実行環境: このdbtの処理は、コンテナ実行環境であるECS上で動かしています。さらに、ECSの実行管理にはStep Functionsを導入しており、Event-Drivenに処理を開始するスケジューラー、重複実行の防止、エラー発生時の自動リトライといった、データパイプラインに不可欠な機能の実装に利用しています。
コードによるインフラ管理(IaC)
私たちのデータ基盤では、手作業による設定ミスや属人化を防ぐため、インフラの構成をコードで管理する IaCの考え方を採用しています。
- Terraform: AWSアカウントやSnowflakeアカウントといった、システム全体の土台となるインフラリソースの管理にはTerraformを使用しています。
- dbt: データウェアハウス内のテーブル、ビュー、UDF(ユーザー定義関数)など、データに関する資源の管理にはdbtを活用しています。
特に、サービスデータベースと分析基盤とのデータ構造をスムーズに連携させる仕組みが特徴です。プロダクト開発では、ORM(Object-Relational Mapping)ツールとしてPrismaを利用しています。このPrismaのスキーマ定義ファイルをSource of Truthとして、dbtのモデルを自動生成しています。
この仕組みにより、プロダクト開発チームがデータベースのスキーマを変更した際も、分析チームが手作業で対応することなく、データモデルが自動で追従することが可能になっています。
DWHの3層アーキテクチャ
データ基盤の全体的な見通しを良くし、保守性を高めるために、Snowflake上を Raw, DWH, Martの3つのレイヤーに分割しています。
- Raw Layer: S3からロードした生データを取り込み、テーブル内でのカラム名や型の変更などを吸収する Wrapper Viewを提供する層です。再加工が必要になった際の元データとして機能します。エンジニアのみが触る想定です。
- DWH Layer: 共通して利用される指標やビジネスロジックを定義し、セマンティックレイヤーのようなものとして用意しています。非エンジニアでSQLを書くメンバーはDWH Layer以降を触れる想定です。
- DataMart Layer: ダッシュボードやレポーティングで利用するような特化したデータセットを用意する層です。
Ubieさんのこちらを参考にさせてもらいました。
データパイプラインの詳細
データの流れに沿って、ETLの仕組みを2つのステップで解説します。
Step 1: RDSからData Lake (S3)へのデータ連携
前述したように、RDSのAutomated Snapshot機能をトリガーに、EventBridge経由でStep Functionsを起動します。Step FunctionsがRDSのS3エクスポート機能を実行し、データをParquet形式でS3に日次でダンプします。
また、RDSのバージョンアップ時など、Daily Backup以外にもAutomated Snapshotが作成されるタイミングがあります。そういった場合、S3エクスポートの重複実行を防止しないと、同日のデータが重複して取り込まれてしまいます。これは日付によってS3上のKeyspaceを区切っておき、既に同日のKeyが存在する場合は実行をスキップすることで多重実行を防いでいます。この仕組みは専用のLambdaを用意する必要がなく、Step Functionsのサービス統合に完結して実装できるため、メンテナンス性が高いと思います。
この際、RDSはプロダクトアカウント、S3は分析基盤アカウントにそれぞれ配置されているので、Cross-AccountでExportしたデータに対する暗号化ができるようにKMSのアクセス制御を行っています。
Step 2: Data Lake (S3)からSnowflakeへのロードと変換
同様に、S3に格納された生データをSnowflakeに取り込み、3層アーキテクチャに従って整形・加工します。
EventBridgeをトリガーに、Step FunctionsがECS上でdbtコンテナを起動します。
dbtがS3の生データをSnowflakeのLake Layerにロードします。
その後、dbtのモデルに従ってLake -> DWH -> Martの順にデータの変換処理を実行します。
dbt Models のメンテナンス
前述したように、世界史データベースはメインのDBとしてPostgreSQLを採用しており、アプリケーションからはORMのPrismaを介して利用しています。
PrismaはDSL(Domain Specific Language)でSchema Fileを記述し、そこからデータベースのDDL (Data Definition Language)を生成します。COTENではこのSchema Fileからdbt Modelsを同様に生成することで、Snowflakeのスキーマ追従の自動化を図っています。
@loancrate/prisma-schema-parserという非公式パーサーを利用することで、Schema FileからAST(Abstract Syntax Tree)を取得できます。このASTを元にdbt Modelsを生成するスクリプトを作成しています。あとはGitHub Actionsで、Schema Fileへの変更が合った時にこのスクリプトを実行すれば、自動でスキーマ追従することができます。
PrismaのTypeからDWHのTypeへのコンバートだけ調整してやれば、大体のDWH同様の仕組みが利用できると思います。
function convertUnsupportedType(typ: string): string {
if (typ === "geometry") {
return "String";
} else if (typ === "ltree") {
return "String";
}
throw Error(`new unsupported type in prisma schema: ${typ}`);
}
function convertTypeFromPrismaToSnowflake(prismaScalarType: string): string {
let typ = "";
switch (prismaScalarType) {
case "String": {
typ = "VARCHAR";
break;
}
case "Bytes": {
typ = "BINARY";
break;
}
case "BigInt":
case "Int":
case "Float":
case "Decimal": {
typ = "NUMBER";
break;
}
case "Boolean": {
typ = "BOOLEAN";
break;
}
case "DateTime": {
typ = "TIMESTAMP_TZ";
break;
}
case "List": {
typ = "ARRAY";
break;
}
case "Json": {
typ = "OBJECT";
break;
}
default:
throw Error("cannot convert type from prisma to snowflake");
}
return typ;
}
この仕組みで、S3のデータをそのまま保存するRaw Schema Modelsと、型付けを行ったInteraface Schema Modelsを提供しています。
# Raw Schema Models
models:
- name: ${tableName}
description: ${tableName} table raw data.
columns:
- name: dt
tests:
- not_null
- name: data
tests:
- not_null
- name: _metadata_filename
tests:
- not_null
- name: _metadata_file_row_number
tests:
- not_null
- name: _loaded_at
tests:
- not_null
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- dt
- _metadata_filename
- _metadata_file_row_number
# Interaface Schema Models
#
# columns, pkTest, compositeUniqueTest, uniqueColumnTestなどは
# 後述する「Stateファイル」を元に動的に生成している
models:
- name: if_${tableName}
description: near raw interface for ${tableName} table.
columns:
- name: dt
tests:
- not_null
${columns.join("\n")}
tests:
${[pkTest, compositeUniqueTest, uniqueColumnTest].filter((str) => str.length > 0).join("\n")}
dbt Modelsを管理する上で、テーブルの特性に応じてデータテストを細かく設定する必要があります。
シンプルなケースは、Prismaのスキーマ定義ファイル(schema.yml)から素朴にdbt Modelsを生成することで対応できます。
- 主要テーブル:単一IDを主キーに設定
- 中間テーブル:複数の外部キーを複合主キーに設定
しかし、複雑なケースには対応が困難です。例えば:
- カラム名が変更された際、変更前後の両方のカラムに非NULL制約テストを個別に適用する
こういった複雑なケースに対応するため、独自のStateファイルという仕組みを導入しています。
- 従来:最新のスキーマをそのまま同期
- Stateファイル:スキーマ変更(CREATE TABLE、ALTER TABLEなど)を検知し、差分を追記していく
StateファイルはGitでバージョン管理されており、このファイルをもとにdbtモデルの変更PRを自動で作成します。データテストの設定を詳細にカスタマイズしたい場合は、このStateファイルに設けてあるoverride用のパラメータを直接編集します。これにより、スキーマ定義ファイルとは独立してdbt Modelsを管理しつつ、スキーマファイルの更新時にはその差分を自動で吸収できる、柔軟性のある仕組みを整えています。
また、テーブルやカラムの削除(DROP)命令は、Stateファイルには追記しません。該当のテーブルやカラムはDWH上に物理的に残し、Nullableに変更されたものとして扱います。これにより、過去データへの影響を防いでいます。
ただし、このスキーマ自動追従の仕組みを適用しているのは、Snowflakeの生データ層(Raw Layer)までです。その先のデータウェアハウス層(DWH Layer)以降は、意図しない変更を防ぐため手動でメンテナンスしています。例えば、データモデルのリファクタリングや、カラム名変更に伴う新旧データのマージといった処理は、DWH Layerで手動で実装しており、依然最も手間がかかる作業となっています。
データモデルDocumentのホスティング
dbt Modelsが更新されたPRにおいて、dbt Docsの更新も同時に行っています。
dbt Docs のホスティングにはS3, CloudFrontを利用しており、PRがマージされたタイミングで、GitHub ActionsからS3に最新版のdbt Docsをアップロードしています。
CloudFrontの前段にLambda Edge, Cognitoによる認証を掛けています。Snowflakeのユーザーや権限の管理はTerraformで行っているのですが、そこのマスターデータを元にCognitoのユーザーを作成することで、認証ありの静的サイトが簡単に作れるのでおすすめです。

構成としてはClassmethodさんのこちらの記事とほぼ一緒です。画像もお借りしています。
まとめと今後の展望
構成自体はそんな特殊なものでもないと思うのですが、Prisma Schema Fileからdbt Modelsへの自動変換周りの話はあまり見かけないんじゃないかなと思います。参考になる方がいたら幸いです。
現在は、時系列比較を通じて、データの拡充状況をチーム内外で共有したり、品質を管理するためにSnowflakeを利用しているのですが、今後は複数のクライアントアプリケーションから世界史データを利用するためのHubのような形でも活用していきたいなと考えています。
Discussion