🛂

新卒5ヶ月目で任された分析基盤構築 - Digdag/Embulk/dbtによるデータパイプライン開発記

2025/01/30に公開

はじめに

私がカウンターワークスに新卒で入社して4〜5ヶ月ごろ経った頃、弊社のショップカウンターというサービスのデータパイプラインの構築の仕事をすることになりました。
厳密には私はショップカウンター事業部のエンジニアではなく、システム統括部といういわゆるDevOpsチームに所属しておりますが、今回は特別にデータ分析基盤の構築をさせていただきました。

DevOpsチームとは?

一般的にはDevOpsチームは、ソフトウェア開発(Development)と運用(Operations)を統合するチームです。
弊社はOperations専門のチームが存在しないので、厳密にはOpsの側面が強いチームかもしれません。
参考: https://clouddirect.jp.fujitsu.com/service/navi-tech-devops

データベース(DB)のデータとGoogleAnalytics4(GA4)のデータをBigqueryに転送し、
Bigquery内でテーブルを再構築することで扱いやすいデータにしBIツールで参照する環境を作るというものです。

この記事では、データパイプライン構築のことを思い出しつつ、使ったツールや構成の説明をとても簡単にしつつ感じたことを雑多に書いていきたいと思います。

それぞれのツールについては公式ドキュメントや参考にした記事のリンクを貼っておりますので、詳細を知りたい方はぜひご参照ください。

BigQueryについて

BigQueryはGoogleが提供するサービスで、大規模なデータのクエリや分析を簡単に行うことができます。SQLライクなクエリ言語を使用し、大量のデータを処理可能です。
https://cloud.google.com/bigquery?hl=ja

GA4について

Googleが提供するウェブ解析ツール。ユーザーの行動やサイトのパフォーマンスを分析するために使用されます。
https://developers.google.com/analytics?hl=ja

データパイプライン構築の背景と目的

弊社ショップカウンターの事業部が抱えている課題としてGA4のデータとDBのデータを組み合わせた分析ができていなかったり、DBのスキーマが複雑でデータをうまく活用できていなかったりということがありました。
これらの課題を解決するために分析基盤を構築することが今回のゴールでした。

そこで以下の目的でデータをBIツールで分析できるようにするためのデータパイプラインを作りました。

構成

先に最終的な構成をご紹介します。
結論として次のような構成になりました。
データの転送部分や通知の部分は省略しており、この他にも CloudRun関数AWS lambda を使っています。
また、dbt で作成したクエリを定期実行して DWH を作成するのに Bigquery のスケジュールクエリを利用しています。

利用したツール

ツール 主な特徴
Digdag 複雑なデータ処理をワークフローとして定義でき、スケジュール実行やリトライが可能。
Embulk 大規模データのバルク転送が可能なOSSツール
dbt SQLでのデータ変換を簡単に記述でき、依存関係の管理やドキュメント生成が可能。
Qlik Sense BIツール。直感的なデータの探索やインタラクティブなダッシュボード作成、大規模データ処理やAIでの提案機能が付属。

はじめに感じたこと

正直、最初にこの話を聞いた時、とても楽しそうな仕事だと思いました。
新卒で入社して半年も経っていない私は、正直データパイプラインというものを舐めていました。
その少し前にアプリケーションのアクセスログをBigqueryに転送するというタスクをしていた私は、DBのデータをBigqueryに転送するなんて余裕でしょと思っていました。← 大間違いでした。
もちろん転送するだけなら難易度はそこまで高くありませんが、使いやすくデータを整理したり、エラー時の処理を考えたり、スケジュール実行の依存関係を整理したり、コストについて考えたりと次々に考えることが増えていきました。

最初は、先輩が途中まで残していたリポジトリを眺めるところから始まりました。
Digdag? Embulk?
今まで聞いたことのないツールばかりで何をしているか全く検討もつかなかったので、情報を取得するところから始まりました。
どうもこれらは ETLのツールだということを知りました。

ETLとは?

そもそも、ETLとは Extract Transform Load の略です。

ステップ 説明
Extract(抽出) データソースから必要なデータを収集する。
例: データベース、クラウドストレージなど。
Transform(変換) 抽出したデータを整形・変換する。
例: タイムゾーンの統一、データ形式の統一、集約処理など。
Load(書き出し) 変換されたデータをターゲットシステムに保存する。
例: データウェアハウス、BIツールなど。

今回の例で言えば、アプリケーションのDBをBigqueryに転送します。
この時、DBからデータをとるところが Extract 、DBのデータのタイムゾーンを変えたり、データの持ち方を変えるところが Transform 、最終的なデータを Bigquery に投入するところが. Load になります。

Digdagの概要

データパイプラインを作るにあたって苦しんだ理由の一つがDigdagです。
Digdagは、ワークフローを簡単に作成できるオープンソースのワークフローエンジンです。
以下のような digファイル を作成することでタスクをスケジュール実行することができます。

export:
  timezone: Asia/Tokyo
+task1:
  echo>: "Hello, Digdag!"
+task2:
  sh>: script.sh

この Digdag の用語が難しく、task や workflow 、session や attempt という単位があり、最初に理解するまで少し時間がかかりました。

以下にDigdagの用語についてまとめたものを記載します。

task

task は、Digdagにおけるジョブの最小単位です。

具体的には、以下のような一つの処理を指します。

  • Shellスクリプトの実行
  • SQLの実行
workflow

workflow は、1つ1つのtaskを意味のある単位にまとめたものです。

例えば、以下のような一連の処理を表します。

  • RDSからデータを整形し、BigQuery に転送するフロー

Digdagでは1つのworkflowが1つのファイルで管理され、
.dig ファイル(YAML形式)で記述されます。

session

session は、workflowを実行するための予定(スケジュール)を表します。

digdag sessions コマンドを使用すると、session の詳細を確認できます。

attempt

attempt は、session の実際の実行を指します。

  • 通常、1 session につき1 attempt です。
  • 再実行するたびに attempt が増加します。

sessionのステータスは、常に最新の attempt のステータスに対応します。

よく使うコマンド

プロジェクトの登録

Digdagでプロジェクトを登録するには、digdag push コマンドを使用します。これにより、指定したプロジェクトがサーバーにアップロードされ、スケジュールや管理が可能になります。

dig ファイルを更新した場合にも、push が必要です。

digdag push [プロジェクト名]

スケジュールの確認

登録されたプロジェクトのスケジュールを確認するには、digdag schedules コマンドを使用します。これにより、現在設定されているすべてのスケジュールの詳細が表示されます。

digdag schedules

プロジェクトの一覧

サーバーにアップロードされているプロジェクトの一覧を表示するには、digdag projects コマンドを使用します。

digdag projects

ログの見方

実行されたタスクのログを確認するには、digdag log <session_id> コマンドを使用します。これにより、指定したセッションIDに関連するログを取得できます。

digdag log 123

セッションの取得

セッションIDは以下のコマンドで取得できます。

digdag sessions

https://www.digdag.io/

Embulkの概要

続いて、利用したのはEmbulkです。
Embulkは、データベース間のデータの転送を簡単にするためのオープンソースソフトウェアです。
並列実行されているので大量のデータを高速で転送できます。
先ほどのDigdagのワークフローを利用してこのEmbulkのコマンドをスケジュール実行していきます。
Embulk単体では次のようなコマンドで転送することができます。

Embulk run  [config_file.yml]

YAMLファイルは以下のように記述します。興味がある方はぜひご覧ください。

EmbulkのYAML設定ファイル構造
  • in: インポート元のデータソースを定義します。
    例: type: mysql, host: localhost, user: user_name
  • out: エクスポート先のデータソースを定義します。
    例: type: bigquery, project: project_id, dataset: dataset_name

例、基本の設定ファイル:

in:
  type: mysql
  host: localhost
  user: my_user
  password: my_password
  database: my_db
  table: my_table

out:
  type: bigquery
  project: my_project
  dataset: my_dataset
  table: my_table
  mode: append

今回のETLでは、liquid ファイルというテンプレートエンジンを利用しており、inの部分とoutの部分をconfig/_bg.yml.liquid config/_mysql.yml.liquid に分離して記述しています。

結論として最終的なyml.liquidファイルは以下のようになりました。

{% include 'config/mysql' %}
  table: samples
  select: "id, name, created_at, updated_at"
  incremental: true
  incremental_columns: ["updated_at", "id"]

{% include 'config/bq' %}
  table: accounts
  time_partitioning:
    type: DAY
    field: created_at
  column_options:
    - {name: id, type: INTEGER}
    - {name: id, type: STRING}
    - {name: created_at, type: DATETIME, timezone: "Asia/Tokyo"}
    - {name: updated_at, type: DATETIME, timezone: "Asia/Tokyo"}

Embulkでまず大変だったのは、個人情報の取り扱いでした。カウンターワークスでは、DBのデータにある個人情報をBigqueryに転送してはいけないという制限がありました。
最初はカラムごとにデータをマスク化することでの実現を考えていましたが、どのデータが個人情報なのかという判断は難しく開発者に委ねると事故に繋がりかねないということで、転送するカラムはホワイトリスト形式にして確実にOKなものだけを転送することにしました。

もう一つはタイムゾーンの扱いです。異なるデータベースでtimezoneの形式が異なるということがあり、利用者が使いやすくするために全て JST の DATETIME に設定することにしました。
TIMESTAMP形式のデータも明示的に JST の DATETIME に変えることで、意図せず UTC で表示されることを避けました。
この時、データベースに「9999年12月31日23:59」というデータがあったので、 UTC->JST 変換時にエラーが起き大変だったのを覚えています。

https://www.embulk.org/docs/

dbtの概要

さて Digdag と Embulk を用いて db のデータを Bigquery に転送することができました。
しかし、この生の状態では直接 db のデータを見るのと変わりません。
そこで利用者が使いやすい形に整形していく必要があります。
ETL の T の部分ですね。

そこで利用したツールがdbtです。
dbtはデータウェアハウスをSQLクエリベースで作成することができ、かつコード管理できるのでエンジニアとしては簡単にデータウェアハウスを作成することができます。

恥ずかしながら私は、SQLをそれまでちゃんと書いたことがなく、
LEFT JOIN? INNER JOIN? と混乱するレベルだったので必死に勉強しました。

SQLの勉強やデータウェアハウスの設計をしたのち、クエリをひたすら書いていく作業に入りました。
これが、アプリケーションのDB設計についてわからないと難しくとても時間がかかりました。
幸いにも最近作られたテーブル群は説明が丁寧に書かれていたので、比較的簡単でしたが、昔に作られたテーブルはどれが必要なのかの判断に時間がかかり大変でした。
特にカテゴリを管理しているテーブルは、現行で使われているものと、古くに使われたまま更新されていないものがあり、間違って結合をした結果、レコード数が想定よりもはるかに少なくなり、誤りに気づくのに時間がかかりました。

テーブルを作成する中で、特に意識していたことは、データウェアハウス としてどんな形が理想かでした。

データウェアハウス はデータの保管場所(データストア)を示す用語で、データレイク、データウェアハウス、データマート という具合にデータの状態や使われ方によって分類されます。

以下にそれぞれの データストア についての説明を記載しておきます。

データレイクとは?

データレイク は、あらゆる形式のデータをそのままの形で大量に保存する場所(データの湖)です。
(今回はデータレイクに入れる過程でEmbulkでタイムゾーンの整形を行っているので、厳密にはそのままの形ではありません。)

データウェアハウスとは?

データウェアハウス は、大量のデータを効率的に統合し、長期的に保管・分析するための場所(データの倉庫)です。

データマートとは?

データマート は、データウェアハウス の一部を切り出し、特定の目的(ユースケース)に特化して利用する小規模なデータストアです。

データウェアハウスデータマート とは違い、多様なユースケースで利活用できる形に整理されている必要があります。
初めは、データウェアハウスデータマート の境界を定めるのが難しかったです。
通常は、データマート ありきで、それを元に、データウェアハウス を定めるのですが今回はそれができませんでした。
というのも最終的なゴールが自由に分析できる環境ということでしたので、そもそもユースケースが定められていなかったのです。
そこで、データマートから考えるのではなく、データウェアハウス として使いやすい形に設計するということが必要でした。

最終的には今回は、設計方針として スタースキーマ を採用し、中心の テーブル に対して関連するデータをその周囲のテーブルにまとめて結合する形にしました。

理由は、BIツールを利用するユーザーがデータを結合するときに、非正規化されているスタースキーマであればどのテーブルを結合するかがシンプルだからです。

中心テーブルに関連データを結合することで、目的の情報を取得可能な構造になっています。

実際には、ジャンルやカテゴリなどの中間テーブルのようになっているテーブルは、正規化されていた方が使いやすいため、一部分はスノーフレークスキーマのように正規化されている部分もあります。
しかし、多くのデータはスタースキーマのようになっているため、シンプルな結合でデータを抽出することができます。

スタースキーマとは?

スタースキーマは、データウェアハウスで使用される単純で非正規化されたデータモデルです。
特徴:

  • 中央にファクトテーブルがあり、周囲にディメンションテーブルが配置される
  • クエリがシンプルで高速
  • データの重複が発生しやすい
  • データ構造の変更には弱い
    適している場面:クエリパフォーマンスを優先し、分析が簡単な構造を求める場合。
スノーフレークスキーマとは?

スノーフレークスキーマは、データウェアハウスで使用される正規化されたデータモデルです。
特徴:

  • ディメンションテーブルが複数の関連テーブルに分割される
  • ストレージ効率が高い
  • クエリが複雑になる可能性がある
  • データ構造の変更に強い
    適している場面:データの重複を減らし、ストレージ容量を節約したい場合。

参考: dbtドキュメント
https://docs.getdbt.com/docs/build/documentation

スノーフレークとスタースキーマの違いと比較
https://www.integrate.io/jp/blog/snowflake-schemas-vs-star-schemas-what-are-they-and-how-are-they-different-ja/

BIツール選定

最終的にダッシュボードを作るBIツールはQlik Senseを利用することにしました。
選定の大きな軸になったのは、コストの他に 「データ結合まで含めて分析を行える人数」 でした。
データを抽出・分析できる環境を整えることが主目的だったため、データ結合の段階からのライセンスを多くの人に配る必要がありました。
特にBIツールの価格はライセンスごとに分かれていることが多く、最上位のライセンスを多くの人に与えようとすると、どうしてもコストがかかってしまいます。

QlikSenseには Qlik Cloud® Analyticsという定額で20人が最上位のライセンス(フルユーザー)で利用可能なプランがあります。(それ以上の人数も追加で購入可能)

参考: Qlik Cloud® Analyticsのプランと価格
https://www.qlik.com/ja-jp/pricing

これによって、なるべく多くの人数にデータ結合に関わる権限を渡すことができました。

また、BIツールとしてはとても有名な Tableau と 最後まで悩みました。
直感的なビジュアライズやドキュメントの豊富さで Tableau は優れているため、比較資料を作る過程で気持ちが揺れ動いていた記憶があります。
最終的にはコスト面と先述したライセンスの数、そして最低限の利用者の要求を満たせるかという観点で Qlik Sense を選びましたが、どちらになってもおかしくはない状況でした。

https://www.tableau.com/ja-jp

Qlik Senseを使ってみての感想

操作性はやや難しいと使った当初から感じていました。
しかし、だんだん仕様に慣れてきてどこに何が配置されているのかがわかってくると、スムーズにダッシュボードが作成できるようになります。
若干細かい改善をしようとすると、どうすればいいのかわからなくなりますが、おおよそのことはQlik Senseのヘルプページに書かれてあるので問題ありませんでした。

https://help.qlik.com/

感想とこれからの改善

データパイプラインおよび分析の環境を作るまでの道のりはとても長く、想定していたよりも時間がかかりました。
私のスキル不足もありますが、何よりも考慮すべきことが多く、作業自体よりも仕様や要求の洗い出し に時間がかかりました。
例えば、BIツールにせよ、Bigqueryにせよ、社内の人に使ってもらうまでに「権限はどうするか」「クエリ実行のコストは問題ないか」「もし、誤った操作をした場合は?」「間違って操作しないためにどうするか」など考慮すべきことが多く、調査やドキュメントの作成に費やす時間が膨大でした。
特にDigdag、Embulk、 dbt、 Bigquery、 BIツール と役割が異なりながらも互いに近いツールを利用するなかで、 「どの工程で」「何を」 させるのが適切なのかなど設計に苦慮することも多かったです。
実際に運用してみて、まだまだ社内に浸透しているとはいえず改善しなければいけない箇所も多いです。
しかし、データパイプラインを構築しBIツールにまでデータを届かせるという最低限のフローは作れたため、今後さらに改善されて使いやすいものになっていくのではないかと期待しています。

そのための改善策として考えていることは、第一にデータパイプラインやBIツールの知識や仕様の共有です。
今では私はまた別のタスクに注力しているため、なかなかBIツールの改善にまで手が回りません。
そこで、ショップカウンターの開発に関わるエンジニアにも手伝ってもらい、みんなで一丸となって改善していきたい と思っています。

参考記事等

終わりに

ここまで読んでいただきありがとうございました。

たくさん、大変だったことを書きましたが、このプロジェクトを通じて何枚も成長したと感じています。そして何よりも入社して半年も経っていない新卒に重要なタスクを任せてもらえて、カウンターワークスで良かったと心底思っています!

株式会社カウンターワークスでは、
成長意欲にあふれた新卒エンジニアのメンバーも募集しています!
興味のある方はぜひ以下のリンクからご応募ください!

COUNTERWORKS テックブログ

Discussion