🌊

Workflows を使って依存関係のあるデータパイプラインを作る

2022/10/24に公開

クラウドエースでデータ ML エンジニアをやっている神谷と申します。業務では、データ基盤構築やデータ分析に取り組んでいます。本記事では、軽量なワークフローをサーバレスでサクッと作れる Workflows を使って、依存関係のあるデータ パイプラインを構築するにはどういった実装方法が望ましいか検討・検証します。

1. はじめに

本記事では、Workflows を使って依存関係のあるデータ パイプラインを構築するための実装方法について検討・検証します。Google Cloud でデータパイプラインを構築するにあたり、様々なプロダクトの組み合わせやアーキテクチャがあります。その中でもサーバレスで運用コストが少なく、お手軽に構築できるのが Workflows の特徴ですが、CSV ファイルを ETL し BigQuery に格納するオーソドックスなバッチデータ パイプラインにどれほどフィットするか検証します。

2. Workflows とは

Workflows は Google Cloud が提供するワークフロー管理ツールであり、サーバレスかつ http ベースのジョブ制御を行えるため、軽量なデータ パイプラインを作るのに向いています。

Google Cloud における類似のプロダクトには Cloud Composer、Vertex AI Pipelines 等がありますが、サーバを用意したり、重厚な OSS フレームワークを覚える必要があるため、学習コストや運用コストが高く、導入の敷居が一段高くなりがちです。その点、Workflows は http ベースの呼び出し方でジョブキックを行え、なおかつ yaml でタスクの依存関係を定義できるため、これらの知識がある方にとっては扱いやすいプロダクトとなっています。データ パイプラインのみならず、バックエンドで何かしらのワークフローを組みたい場合にも適しています。

一方で、Workflows は軽量なぶん、冪等性の考慮やエラー ハンドリング、データフローのダッシュボード化等、自前で考えることがそれなりにあります。また、コンピューティングを提供するプロダクトではないため、処理そのものはスケーラビリティのある他の ETL プロダクト(Dataflow、Dataproc、BigQuery 等)に任せる必要があります。これまで、Cloud Functions や Pub/Sub といった Google Cloud において比較的低レイヤーのプロダクトで組んでいたパイプラインに関しては、今後は Workflows で作るほうがベターになるかもしれません。

3. Workflows の書き方

BigQuery のクエリを実行したい場合、以下のような設定を yaml で書くだけで OK です。
やっていることとしては、「googleapis.bigquery.v2.jobs.insert」というエンドポイントに引数を詰めてコールしているだけです。

- create_dwh_main:
    call: googleapis.bigquery.v2.jobs.insert
    args:
      projectId: ${project_id}
      body:
        configuration:
          query:
            query: ${ex_query}
            destinationTable:
              projectId: ${project_id}
              datasetId: ${dataset_id}
              tableId: ${dwh_table_id}
            create_disposition: ${create_disposition}
            write_disposition: ${write_disposition}
            allowLargeResults: true
            useLegacySql: false
            tableDefinitions:
              external_temp_table:
                schema:
                  fields: ${ex_fields}
                sourceUris: ${"gs://" + args.bucket + "/" + args.data.name}
                sourceFormat: "CSV"
                csvOptions:
                  skipLeadingRows: 1
    result: query_result

上記のように、IN/OUT や各種オプションもクライアントライブラリの API のように使えます。また、単なる yaml ではなく DSL となっており、変数やイテレーション、条件分岐等も使えるため、ある程度のプログラミングが可能です。筆者は今回始めて Workflows を利用したのですが、デプロイにかかる時間が短く(数秒)、クラウドでのテストが非常に簡単にできました。Cloud Functions だと各種言語に対応していますが、デプロイには数分かかるので、そこと比較すると開発者体験が良かったです。

4. データ パイプラインの要件とアーキテクチャ

改めて、今回のデータ パイプライン要件は以下のとおりです

  • 複数のデータソース(CSV)を GCS に受信し、データ型変換等の簡単なクレンジングを行った上で BigQuery に書き込む
  • BigQuery に格納した複数の DWH を利用して、BigQuery にデータマートを作成する(ストアド プロシージャを利用)
  • データソース(CSV)と最終的なデータマート(BigQuery)は N:M の関係になる
  • パイプラインが冪等(一回実行しても複数回実行しても同じ結果になる)になるように設計する
  • 同じ種類の CSV が同日複数回連携されることは考えない(GCS のリテンションポリシーによって拒否する)

パイプラインのアーキテクチャは以下のとおりです。オーソドックスな3層構造(ブロンズ:データレイク、シルバー:DWH、ゴールド:データマート)のデータレイヤー(※データレイクとブロンズレイヤーを分ける考えもありますが、本記事では同一とします)となります。

  1. ファイル受信:別システムから CSV 連携で GCS にデータを受信する
  2. クレンジング:型変換や簡単なクレンジングを BigQuery の外部テーブルクエリで実行する
  3. 集計・格納:複雑な集計処理を BigQuery のストアドプロシージャで実行し、データマートに保存する

データレイクは CSV を無加工で GCS に配置します。そして、そこから簡単なクレンジングを施したものを DWH、ガッツリと集計したものをデータマートとして、いずれも BigQuery に保存します。本記事の実装例は簡略化したものではありますが、本番環境相当のスケーラビリティを想定しています。

②のクレンジング処理は、BigQuery の外部テーブルクエリ(GCS にある CSV ファイルを BigQuery のテーブルと見立ててクエリを発行できる。UDF を使えば様々な列加工処理が組めるので便利)を使います。UDF は GCF や Cloud Run の関数も呼べるため、BigQuery の特性を活かしてコスパとスケーラビリティを担保しつつも、実装の自由度や拡張性があります。また、③の集計・格納処理については BigQuery のストアドプロシージャを使って、②と同様にスケーラビリティや運用コストを最適化します。

余談ですが、BigQuery では最近マルチ ステートメント クエリが GA になったため、トランザクションに対する堅牢性が上がりました。これによって、一つのトランザクションの中に複数のクエリを記述できるようになり、データ品質を保ちながらもより複雑なパイプラインを実装可能です。

5. データ パイプラインの実装

Workflows のコードは以下のようになりました。


main:
  params: [args]
  steps:
    - get_gcs_meta_data:
        steps:
            - init_step:
                assign:
                  - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                  - dataset_id: "sample_dataset"
                  - in_csv_name: "${args.data.name}"
                  - create_disposition: "CREATE_IF_NEEDED"
                  - write_disposition: "WRITE_TRUNCATE"
                  - target_date: '${text.replace_all(text.substring(args.data.timeCreated, 0, 10), "-", "")}'
                  - required_dwh_map:
                      mart1: ["sample_tran", "sample_master", "sample_master2"]
                      mart2: ["sample_tran", "sample_master"]
                      mart3: ["sample_tran", "sample_master2"]
                  - today: ${text.replace_all(text.substring(time.format(sys.now(), "Asia/Tokyo"), 0, 10), "-", "")}
                  - done_marts: []
                  - skipped_marts: []
                  - logs: []
            - log_step1:
                call: sys.log
                args:
                    text: ${"[入力データ] " + in_csv_name}
                    severity: INFO
            - random_sleep_step:
                steps:
                  - get_random_number:
                      call: http.get
                      args:
                          url: http://www.randomnumberapi.com/api/v1.0/random
                          query:
                            min: 1
                            max: 20
                            count: 1
                      result: random_number
                  - log_step2:
                      call: sys.log
                      args:
                          text: ${"ランダムスリープ秒数:" + json.encode_to_string(random_number["body"][0])}
                          severity: INFO
                  - sleep_step:
                      call: sys.sleep
                      args:
                          seconds: ${random_number["body"][0]}
    - data_source_mapping:
        switch:
          - condition: ${in_csv_name=="sample_tran.csv"}
            steps:
              - step_a:
                  assign:
                    - dwh_table_id: "sample_tran"
                    - ex_query: ${ "SELECT key1,key2,attribute1 FROM external_temp_table"}
                    - ex_fields:
                        - name: "key1"
                          type: "INT64"
                        - name: "key2"
                          type: "INT64"
                        - name: "attribute1"
                          type: "STRING"
            next: create_dwh
          - condition: ${in_csv_name=="sample_master.csv"}
            steps:
              - step_b:
                  assign:
                    - dwh_table_id: "sample_master"
                    - ex_query: ${ "SELECT key1,attribute2 FROM external_temp_table"}
                    - ex_fields:
                        - name: "key1"
                          type: "INT64"
                        - name: "attribute2"
                          type: "STRING"
            next: create_dwh
          - condition: ${in_csv_name=="sample_master2.csv"}
            steps:
              - step_c:
                  assign:
                    - dwh_table_id: "sample_master2"
                    - ex_query: ${ "SELECT key1,key2,attribute3 FROM external_temp_table"}
                    - ex_fields:
                        - name: "key1"
                          type: "INT64"
                        - name: "key2"
                          type: "INT64"
                        - name: "attribute3"
                          type: "STRING"
            next: create_dwh
    - create_dwh:
        steps:
            - create_dwh_main:
                call: googleapis.bigquery.v2.jobs.insert
                args:
                  projectId: ${project_id}
                  body:
                    configuration:
                      query:
                        query: ${ex_query}
                        destinationTable:
                          projectId: ${project_id}
                          datasetId: ${dataset_id}
                          tableId: ${dwh_table_id}
                        create_disposition: ${create_disposition}
                        write_disposition: ${write_disposition}
                        allowLargeResults: true
                        useLegacySql: false
                        tableDefinitions:
                          external_temp_table:
                            schema:
                              fields: ${ex_fields}
                            sourceUris: ${"gs://" + args.bucket + "/" + args.data.name}
                            sourceFormat: "CSV"
                            csvOptions:
                              skipLeadingRows: 1
                result: query_result
            - log_step3:
                call: sys.log
                args:
                    text: ${"[DWH作成] " + dwh_table_id + "、" + json.encode_to_string(query_result)}
                    severity: INFO
    - create_mart:
        for:
          value: key
          in: ${keys(required_dwh_map)}
          steps:
            - is_target_mart:
                switch:
                  - condition: ${not(dwh_table_id in required_dwh_map[key])}
                    steps:
                      - continue_step:
                          next: continue
            - check_required_dwh:
                for:
                  value: dwh
                  in: ${required_dwh_map[key]}
                  steps:
                    - get:
                        call: googleapis.bigquery.v2.tables.get
                        args:
                          datasetId: ${dataset_id}
                          projectId: ${project_id}
                          tableId: ${dwh}
                        result: status
                    - check:
                        switch:
                          - condition: ${text.replace_all(text.substring(time.format(int(status.lastModifiedTime) / 1000, "Asia/Tokyo"), 0, 10), "-", "")!=today}
                            steps:
                              - STEP_A:
                                  assign:
                                      - skipped_marts: ${list.concat(skipped_marts, key)}
                            next: continue
                    - log_step4:
                        call: sys.log
                        args:
                            text: ${"[前提DWHチェック] " + "データマート:" + key + "、前提DWH:" + dwh + "、作成完了済み " + json.encode_to_string(status)}
                            severity: INFO
            - create_mart_main:
                switch:
                  - condition: ${not(key in skipped_marts)}
                    steps:
                      - call_procedure:
                          call: googleapis.bigquery.v2.jobs.insert
                          args:
                            projectId: ${project_id}
                            body:
                              configuration:
                                query:
                                  query: ${"CALL `ca-int-ml1-research.sample_dataset.create_" + key + "`();"}
                                  useLegacySql: false
                          result: query_result

                      - log_step5:
                          call: sys.log
                          args:
                              text: ${"[データマート作成] " + key + "、作成完了済み " + json.encode_to_string(query_result)}
                              severity: INFO

                      - save_result:
                          assign:
                              - done_marts: ${list.concat(done_marts, key)}
    - the_end:
        return: ${"作成完了データマート:" + json.encode_to_string(done_marts)}

起動方法に関しては、GCPサービスの様々なAPIコールを検知できる Eventarc トリガーから、「google.cloud.storage.object.v1.finalized」イベントを利用しています。これによって、GCSへのファイルを配置(新規作成 or 更新)をトリガーに Workflow のジョブが実行されます。

これらを踏まえて、実装のポイントは以下の通りです。

  • 前半で DWH レイヤー、後半でデータマート レイヤーを作成する
  • DWH、データマート作成はいずれも BigQuery のジョブ(クエリ、プロシージャ)を API 経由でキックする
  • 上流 DWH 作成、下流データマート作成ともに全件洗い替え
  • CSV ファイル一つにつき一つのワークフロージョブが起動するため、複数のファイルを同時受信しても並列実行される
  • 複数の CSV ファイルを同時受信した場合に、前提DWHチェックが間に合わなくなるのを避けるために、冒頭にランダムスリープを入れている
  • 上流(DWH 作成)のいずれかが完了したタイミングで都度下流(データマート作成)が起動し、なおかつ上流の前提条件が揃ったものは全て作成されるようになっている
  • BigQuery 書き込みで「WRITE_TRUNCATE」を使用しているため、アトミック(完全に正しいデータが作られるか、エラーになるかの All or Nothingで、中途半端なデータは作られない)になっている
  • 上流の DWH が作成完了しているかは、対象 BigQuery テーブルの更新日付とシステム日付が一致していることによって判断する

6. データ パイプラインの実行結果

パイプラインの実行結果は以下の通りです。

  • 入力データとして、「sample_master2.csv」を GCS で受け取り、それをトリガーとして「sample_master2」の DWH テーブルを作る
  • 「sample_master2」が前提となっているデータマートについて、その他の前提 DWH の作成状況をチェックし、全ての前提 DWH が作成完了していた場合のみ、自身の「データマート作成」を実行する(「sample_master2」を前提としているデータマートは mart1、mart3 だが、ログを見るとそれらのみ作られている)

データマートと DWH の依存関係については、yaml 内で以下のように定義しています。

- required_dwh_map:
    mart1: ["sample_tran", "sample_master", "sample_master2"]
    mart2: ["sample_tran", "sample_master"]
    mart3: ["sample_tran", "sample_master2"]

BigQuery の各セットは以下のようになっています。

「mart1」を作成するストアドは「create_mart1」であり、内容としては前提となる DWH を結合し、その結果でテーブルごと上書きしています。

7. まとめ

Workflows を使って依存関係のあるデータパイプラインを構築しました。初めて Workflows を実装したのですが、かなり使いやすくて楽しくプログラミングできました。
今回に関しては依存関係管理を外部に持たず、サービス(BigQuery 等)が保持するジョブ履歴を見るようにしました。エラーやリトライ等も考慮に入れて、もっと複雑かつ厳密に依存関係を定義するなら Firestore 等を組み込んでもいいかもしれません。

※ BigQuery enterprise data warehouse、Google Cloud Storage service は Google LLC の商品です。

Discussion