🌟

Export Firebase Analytics from BigQuery to Snowflake

2021/12/27に公開

https://medium.com/snowflake/google-analytics-4-in-snowflake-sql-cookbook-d455087882c9

先日、このような記事が上がっていて、やってみました。
※かなり久々に書いてるので、クラメソさんみたいな凄まじく美しいブログは書けぬことを痛感した、まじすごいです、アレ。一旦、細かいところでの記載やスクショが整備できていない状態ですが、こちらで一旦公開します。

前半に、BigQuery上にあるGA4のRAWデータをSnowflakeに持ってくる仕組みが書いています。

  • BigQueryからGCSへエクスポートする
    • サービスアカウントを作り、権限を付与する(BigQueryのDatasetの閲覧、GCSのBucket管理者権限)
    • Cloud FunctionsでBigQueryJob(EXPORT DATA句)を実行
    • Cloud PubSubをつくり、トリガーにする(httpトリガーではできなかった)
    • Cloud Scheduleでスケジュール実行(翌日の13時頃が良さそう) ※後述
  • Snowflakeで統合ストレージを構成し、発行された読み込み用のサービスアカウントにGCSバケットの読み込み権限を付与する
  • 外部ステージを構成し、COPY INTOでSnowflakeに取り込む
  • 取り込みをSnowpipeで自動化する
    • CREATE  PIPEする
    • Cloud Pubsubでトピックを作る
    • Cloud Pubsubでサブスクリプションを作る
    • PIPE作成時に生成されたサービスアカウントに↑のサブスクリプションにPub/Subサブスクライバー権限を付与する

業務で、SnowflakeでCDP構築をしていて、すべてSnowflakeに寄せたい!
Firebase Analyticsももっていかねば・・・なんて思っているところだったので、渡りに船でした。

Export from BigQuery to GCS

bq extract --destination_format=PARQUET --compression=SNAPPY 
bigquery-public-data:ga4_obfuscated_sample_ecommerce.events_20201202  
gs://your-bucket/yourprefix/ga4sample-20201202-*

このようなbqコマンドをたたいて、GCSにExportする・・・のですが、これを毎日のバッチで自動化したいとおもいます。
やり方としては

  • bqコマンドを動かすGAEなどのインスタンスを用意して、cronなりで動かす
  • BigQueryのEXPORT DATA句を使って、GCSへエクスポートする
    可能な限りManagedサービスで動かしたかったので、EXPORT DATA句を用いる方法をとります
EXPORT DATA OPTIONS(
  uri='gs://$(bucket_name)/firebase_analytics/events-20211224-*',
  format='PARQUET',
  compression='SNAPPY',
  overwrite=true
  ) AS 
  SELECT * FROM $(project_id).$(dataset_name).events_20211224;

これを毎日前日分を動かす形にします。
一応、Pythonで作って、Cloud Functionsで実装、Cloud Schedulerで動かすことにします。
https://github.com/tashirogakuca/export_firebase_to_snowflake

BigQueryのスケジュールドクエリでやる方法もあると思います。ただ、上記のように「前日分」を指定で処理をしたかったので、Python実装することにしました
・・・って作った後に、スケジュールドクエリに「@run_time」って入れる方もあるらしいとしった・・・orz
参考:https://qiita.com/hogeta_/items/5582d7d7ea7393a5cacb

【追記】結局、下記のようなSQL(Scripting)で実現しました。
ワイルドカードテーブルに、変数を使えないとか色々あったので、2つのSQLを使ってる形です。

DECLARE
  uri string;
DECLARE
  target_date string;
  -- 下記は環境依存だが、テーブル名に変数を埋め込むのが大変なので、ハードコーディングにする。
  -- declare target_dataset string;
  -- declare target_gcs string;
  -- set target_dataset = '(PROJECT_ID).analytics_*********'; # prd
  -- set target_gcs = 'gs://export_firebase2gcs/firebase_analytics'
SET
  target_date = FORMAT_DATE("%Y%m%d", CURRENT_DATE() - 2);
SET
  uri = CONCAT('gs://export_firebase2gcs/firebase_analytics/events-', target_date, '-*');
CREATE OR REPLACE TABLE
  `(PROJECT_ID).analytics_*********.events_target_date` AS
SELECT
  *
FROM
  `(PROJECT_ID).analytics_*********.events_*`
WHERE
--  _TABLE_SUFFIX = FORMAT_DATE("%Y%m%d", CURRENT_DATE() - 2);
_TABLE_SUFFIX = target_date;
  -- sleep 120sec
CALL
  `(PROJECT_ID).analytics_*********`.sleep(60);
  -- export data
EXPORT DATA
  OPTIONS( uri=(uri),
    format='PARQUET',
    compression='SNAPPY',
    overwrite=TRUE ) AS
SELECT
  *
FROM
  `(PROJECT_ID).analytics_*********.events_target_date`;

sleep

BEGIN
  DECLARE now TIMESTAMP DEFAULT CURRENT_TIMESTAMP();
  DECLARE finish TIMESTAMP DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL sec SECOND);

  WHILE now < finish DO
    SET now = CURRENT_TIMESTAMP();
  END WHILE;
END

Prepare your Snowflake account to read from GCS

https://docs.snowflake.com/ja/user-guide/data-load-gcs-config.html

  • storage integrationの作成
  • Snowflakeアカウント用のGCPサービスアカウントを作成
    • JSONキー
  • バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する
    • カスタムIAMロールの作成 (データロードのみを行う権限にしました)
      • storage.buckets.get
      • storage.objects.get
      • storage.objects.list
      • (storage.objects.create) ※アンロードもする場合
      • (storage.objects.delete) ※アンロードもする場合
    • アカウントへカスタムロールの割当
  • 外部ステージを作成する

Create a table in Snowflake, read the exported Parquet files

list @fh_gcp_stage; -- check files exist
create or replace table ga4_variant(v variant);
copy into ga4_variant
from @fh_gcp_stage/yourprefix/
pattern='yourprefix/ga4sample-.*'
file_format = (type='PARQUET');

をしますが、これをすると

と、カラムがv::variant となってしまうので、ちょっと工夫します

【追記】
下記を行うと、PARQUETのカラム名がそのままになります。
小文字だと、Snowflake的にあまり良くない  "hoge" とか、" をつけないといけないテーブルになってしまいます。
いろいろ試してみたのですが、現時点ではできなかった・・・・
なので、一旦、v variantで取り込み、その後展開したマート層(ウェアハウス層)を作るのがいいかなと思っています。
実際にはできなかったのですが

  • v variant
  • target_date timestamp_ntz (こっちは取り込む時に、追加したい)
    にして、取り込みテーブルにSTREAMを設定

増分のみを、TASKでをもちいて、VARIANTを展開して、フラットで使いやすいテーブル形式にするマートテーブルに追記するのが良いと思います。
(VIEWでもいいのですが、Firebaseテーブルはカラム数が多くなりそうなので、パフォーマンスを考えると実テーブル、かつ、毎回全部作り直すのは非効率なので、追記分のみ がいいのではないかと)

https://dev.classmethod.jp/articles/try-snowflake-create-table-using-template/
を参考にします。

INFER_SCHEMA句をつかって、PARQUETの半構造化データのカラム定義を取り込みます。

create or replace table firebase_analytics_tbl
  using template (
    select array_agg(object_construct(*))
      from table(
        infer_schema(
            location=>'@firebase_gcp_stage/events-20211219-000000000000'
            , file_format = (type='PARQUET')
        )
      ));

この後はとりこみます。

-- parquetファイルをCOPY
copy into cdp_l0_app.firebase_analytics_tbl
from @daisetsu_prd_firebase
pattern='firebase_analytics/events-2[0-9]+-[0-9]+'
file_format = (type='PARQUET')
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
;

Discussion