BigQueryに保存されたGA4のデータをSnowflakeに連携する
こんにちは! スターフェスティバルでインフラエンジニア/データ基盤エンジニアをやっております @koonagiです。
先月Snowflakeユーザー会のオフラインイベントに参加して、その中でGA4のデータをSnowflakeに連携したことについてLTしました! 今回は、LTの内容について、LT話せなかった部分を補足しながらブログにまとめていきたいと思います。
オフラインイベント自体は新しい技術的な発見やSnowflakeユーザーとの繋がりができたりして、本当に有意義で楽しい時間でした🙌
LT資料では、S3からSnowflakeの連携部分をバルクでインサートしていたのですが、snowpipeのほうが楽だったので本ブログではsnowpipeでの実装したことについて記載しています。
目次
① やったこと
② STEP1 BigQuery(GA4) -> S3データ連携【embulk】
③ STEP2 S3 -> Snowflakeデータ連携【snowpipe】
④ まとめ
① やったこと
BigQueryに保存されているGA4のデータを、Snowflakeに連携し、Snowflake上でGA4のデータを閲覧できるようにしました。
ざっくりイメージは以下のような形です。
構成図
Snowflakeへの連携は、S3を経由させているため、2ステップで連携しています。
- STEP1 : BigQueryからS3への連携は、Embulkを使って日次バッチで取り込み
- STEP2 : S3からSnowflakeの連携は、snowpipeを使ってS3に連携されたファイルを適宜Snowflakeに取り込む
直接BigQueryから直接Snowflakeへの連携も可能なのですが、AWS上からサクッとデータ確認したいというときにS3にデータがあればAthenaで確認できるのでS3を挟むようにしてます。
② STEP1 BigQuery(GA4) -> S3データ連携【embulk】
BigQueryからS3への連携は、OSSのembulkを使って、日次で差分連携しています。
BigQueryは日付ごとにテーブルがことなる分割テーブルになっているため、関数を使って取得したい日時のテーブルを指定しています。また、S3への出力も後からAthenaでパーティション化できるように年月日で階層を分けるようにしました。
embulk 設定ファイル
最初の4行が関数を使って年月日を取得している部分です。最新の日付テーブルが二日前のテーブルだったので、二日前の日付を取得しています。
関数を利用するのでymlではなく、liquidファイルを利用します。
{% capture day_before_yesterday %}{{ 'today' | date: '%s' | minus: 172800 | date: '%Y%m%d' }}{% endcapture %}
{% assign year = day_before_yesterday | slice: 0, 4 %}
{% assign month = day_before_yesterday | slice: 4, 2 %}
{% assign day = day_before_yesterday | slice: 6, 2 %}
in:
type: bigquery_extract_files
project: <プロジェクト名>
json_keyfile: '/config/production/keyfile-bigquery-embulk-input.json'
gcs_uri: <gcs uri>
temp_local_path: /tmp/embulk/data
dataset: <データセット名>
table: events_{{ day_before_yesterday }}
file_format: 'NEWLINE_DELIMITED_JSON'
compression: 'GZIP'
decoders:
- {type: gzip}
parser:
type: json
out:
type: s3_parquet
bucket: <bucket_name>
path_prefix: parquet/embulk/ga4_analytics_events/{{ year }}/{{ month }}/{{ day }}/data.
file_ext: snappy.parquet
compression_codec: snappy
default_timezone: Asia/Tokyo
region: ap-northeast-1
Embulkのプラグインは以下と使っています。
GCPへのアクセスはAPIキーをjson_keyfileパラメータに渡す必要があるのですが、APIキーの情報をSecret Managerに保存して、CodeBuildでコンテナをビルドする際に、ファイルを生成するようにしました。
抽出したファイルの中身
抽出したファイルの内容はこんな形になります。JSONデータがパースされずにそのまま1レコードに入ってきています。
③ STEP2 S3 -> Snowflakeデータ連携【snowpipe】
S3に連携されたデータはsnowpipeを使って、随時Snowflakeへ連携していきます。
テーブル定義やsnowpipeの設定は以下のブログを参考にさせていただきました。Snowflakeに取り込むタイミングで加工などはせずに、生データを一旦そのままSnowflakeに入れるようにしています。
テーブル定義
RAW_DATAに、GA4のデータをそのままJSON形式で保管しています。
カラム名 | 説明 | サンプル |
---|---|---|
RAW_DATA | GA4のJSONデータがそのまま入る | {"record": "XXX"} |
_METADATA_FILENAME | S3のファイルパス | /2022/12/22/data.snappy.parquet |
_METADATA_FILE_ROW_NUMBER | ファイルの何行目のデータか | 1 |
_LOAD_AT | ロードされた時間 | 2023-05-25 17:21:21.392 |
_LOAD_DAILY DATE | ロードされた日付 | 2023-05-25 |
create or replace TABLE <テーブル名> (
RAW_DATA VARIANT,
_METADATA_FILENAME VARCHAR(16777216),
_METADATA_FILE_ROW_NUMBER NUMBER(38,0),
_LOAD_AT TIMESTAMP_NTZ(9),
_LOAD_DAILY DATE
);
パイプ定義
snowpipeにはメタデータをいくつかもっており、メタデータを使ってロード元のS3のファイルパスとかロードの時間を取得してきています。
create or replace pipe <snowpipe名> auto_ingest=true as copy into <テーブル名>
FROM(
SELECT
$1::variant as raw_data,
metadata$filename::varchar as _metadata_filename,
metadata$file_row_number::bigint as _metadata_file_row_number,
metadata$start_scan_time::timestamp_ntz as _load_at,
time_slice(_load_at, 1, 'day')::date as _load_daily
FROM @GA4_ANALYTICS_EVENTS_stage
);
snowpipeの実装方法は公式サイトが丁寧にまとめてくださっているのでこちらご参照ください!
データ確認
snowpipeの実行まで終われば、後はEmbulkを定期実行することで、自動でSnowflakeまでデータ連携されるようになります!
保存されたデータは以下のような感じで利用できます👌
後はビュー作ったり、dbtやpythonで加工することで、GA4のデータを可視化、分析できるようになります!
さいごに
初めてSnowflakeを触って、データロードをやってみたのですが、snowpipeを使うとかなり楽にロードできるので、思ったよりもハードルが低かったです。とりあえず、S3やGCSなどにデータが吐かれているシステムはすぐに連携できると思います。
データロードできたので、次はsnowpark for pythonを使った分析に挑戦しようと思います!
Discussion