🔜

adjustのデータをSnowflakeに入れてredashで参照する

2023/03/16に公開

前置き

こんにちは。株式会社GENDAのデータエンジニアのこみぃです。

相変わらず会社のデータをSnowflakeに統合していく業務に勤しむ日々なのですが、先日アプリの計測ツールであるadjustのデータを入れるという業務をやりました。

やり方をググっても日本語の記事が出てこなかった(こみぃ調べ)ので、知見を共有すべく記事にしてみました。

AdjustのデータをSnowflakeに入れる仕組み概要

今回構築したシステムの概要はこんな感じです。

  • Adjustのコンソールの設定でログがS3に置かれるようにする
  • S3のPutをトリガーにLambdaを起動し、ndjsonに変換して別のS3に置く
  • Snowflakeの外部ステージ参照でndjsonのS3を参照する
  • dbtの定期実行で外部ステージを定期的に内部テーブルにコピーする

Adjustのコンソールの設定でログがS3に置かれるようにする

ここはAdjustのコンソールで設定します。
ちなみにこの設定をすると問答無用でデータが置かれるようになりますので、データのサンプルを先に入手できるなら先にLambdaやSnowflakeとのつなぎこみを作成してから作業することをおすすめします。

S3のPutをトリガーにLambdaを起動し、ndjsonに変換して別のS3に置く

S3はファイルが置かれることをトリガーにLambdaを起動することが出来ます。
S3を作ってからLambdaを作り、Lambdaのコンソール画面でトリガーの追加を行うのがいいですね。

Pythonの知識があればLambdaはさくっと作れると思いますが、一応注意点として Adjustが配置するcsvではヘッダーの列名が{}で囲われている という不思議な仕様がありますので、列名から {} を除外しておくことをおすすめします。

Snowflakeの外部ステージ参照でndjsonのS3を参照する

ここは以下を参照して作成します。
https://docs.snowflake.com/ja/user-guide/data-load-s3-create-stage

なお、外部ステージを参照するViewを作っておくと、この後のdbtの構築が非常にスムーズになります。
現状(2023年3月現在)のAdjustのデータだと、おそらくこんな感じのViewになります。

create_view
create or replace view VIEW(
	INSTALLED_AT,
	IDFA,
	GPS_ADID,
	ADID,
	ACTIVITY_KIND,
	ENGAGEMENT_TIME,
	EVENT,
	EVENT_NAME,
	REVENUE_FLOAT,
	CURRENCY,
	ATT_STATUS,
	SK_TS,
	SK_PAYLOAD,
	SK_VERSION,
	SK_NETWORK_ID,
	SK_CAMPAIGN_ID,
	SK_TRANSACTION_ID,
	SK_APP_ID,
	SK_ATTRIBUTION_SIGNATURE,
	SK_REDOWNLOAD,
	SK_SOURCE_APP_ID,
	SK_CONVERSION_VALUE,
	SK_INVALID_SIGNATURE,
	SK_FIDELITY_TYPE,
	SK_DID_WIN,
	SK_INSTALL_DIRECT,
	PARTNER,
	COUNTRY,
	IP_ADDRESS,
	USER_AGENT,
	NETWORK_NAME,
	CAMPAIGN_NAME,
	ADGROUP_NAME,
	CREATIVE_NAME,
	TRACKER,
	RANDOM,
	NONCE
) as
SELECT
    cast($1:installed_at as string) installed_at,
    cast($1:idfa as string) idfa,
    cast($1:gps_adid as string) gps_adid,
    cast($1:adid as string) adid,
    cast($1:activity_kind as string) activity_kind,
    cast($1:engagement_time as string) engagement_time,
    cast($1:event as string) event,
    cast($1:event_name as string) event_name,
    cast($1:revenue_float as string) revenue_float,
    cast($1:currency as string) currency,
    cast($1:att_status as string) att_status,
    cast($1:sk_ts as string) sk_ts,
    cast($1:sk_payload as string) sk_payload,
    cast($1:sk_version as string) sk_version,
    cast($1:sk_network_id as string) sk_network_id,
    cast($1:sk_campaign_id as string) sk_campaign_id,
    cast($1:sk_transaction_id as string) sk_transaction_id,
    cast($1:sk_app_id as string) sk_app_id,
    cast($1:sk_attribution_signature as string) sk_attribution_signature,
    cast($1:sk_redownload as string) sk_redownload,
    cast($1:sk_source_app_id as string) sk_source_app_id,
    cast($1:sk_conversion_value as string) sk_conversion_value,
    cast($1:sk_invalid_signature as string) sk_invalid_signature,
    cast($1:sk_fidelity_type as string) sk_fidelity_type,
    cast($1:sk_did_win as string) sk_did_win,
    cast($1:sk_install_direct as string) sk_install_direct,
    cast($1:partner as string) partner,
    cast($1:country as string) country,
    cast($1:ip_address as string) ip_address,
    cast($1:user_agent as string) user_agent,
    cast($1:network_name as string) network_name,
    cast($1:campaign_name as string) campaign_name,
    cast($1:adgroup_name as string) adgroup_name,
    cast($1:creative_name as string) creative_name,
    cast($1:tracker as string) tracker,
    cast($1:random as string) random,
    cast($1:nonce as string) nonce
FROM
    @外部ステージ名
;

dbtの定期実行で外部ステージを定期的に内部テーブルにコピーする

Snowflakeから外部ステージの参照は非常に重い処理なので、定期的に内部テーブルにコピーする仕組みをdbtで作っておくと参照が爆速になって良いです。
もしSnowflakeのプランがエンタープライズであればMatelialized Viewを使うのも良いと思います。

データを参照するクエリを作る

ndjsonへの変換時に工夫するともう少しよくなるのですが、弊社では一旦すべてのデータをstring扱いで参照するようにしています。
突然データの形式が変わっても集計のクエリを変えれば良いようにするためですね。

Adjustのデータは時間のカラムがunix_timestampなようで、変換するのにちょっとテクが必要だったので、参考にSQLを載せておきます。

convert_to_jst
select
    idfa,
    adid,
    date(to_timestamp(cast(engagement_time as int))) as engagement_date_utc,
    date(CONVERT_TIMEZONE('UTC', 'Asia/Tokyo', to_timestamp(cast(engagement_time as int)))) as engagement_date_jst,
    revenue_float,
    currency
from
    {テーブル名}

さらなる発展

Adjustのログは様々なアプリ内のイベントが混在したデータとなってますので、event_nameごとに中間集計をしておくなどすると便利です。
インストールや課金などを分けておくと、マーケターの方々も毎回whereで抽出する必要がなくなるので非常に良いですね。

本日のまとめ

今回の例だとAdjustでしたが、サードパーティの機能でデータをPUSHで提供してくれるような仕組みは他にもあると思いますので、その際には今回紹介したパターンを思い出していただけると、データパイプラインの構成に悩むことが減るかと思います。

そういうわけで、本日のまとめは非常に簡単ですね。

データパイプラインの構築のパターンを覚えよう!!

結びの言葉

ちなみに、この記事を読んで「AdjustがSnowflakeのデータシェアリングでデータを提供してくれたらもっと楽なんだけどなー」と思ったあなたは非常に鋭いです。Snowflakeのことがよくわかってきてますね!!
そんな世界を夢見つつも、しばらくはこうして地道にパイプラインを作っていきましょう。

最後に一つ宣伝を。
私が所属する株式会社GENDAでは一緒に働く仲間をすごくすごい真剣に求めています。
興味がありましたらぜひお気軽にお声おかけください。
https://genda.jp/

本日はこのあたりで。
それじゃあ、バイバイ!

Discussion