📑

Cloud SchedulerでDataflowを実行してみる

2024/12/18に公開

はじめに

データエンジニアのp_katsuです。
DataflowのETLの開発にあたり、全体像やつまづいたところをまとめておきたいと思います。

全体像

実際はソースDBからターゲットDBまでのETL処理ですが、本記事ではstorage(GCS)からターゲットDB(BigQuery)までの流れに絞って記載します。

要件

bucket
└── rawdata
    ├── 2020-01-01.json.gz
    ├── 2020-01-02.json.gz
    └── 2020-01-03.json.gz

バケットには、ソースDBの最新のスナップショットがjson.gzで毎日転送されてきます。
Dataflowでは最新のファイルの取得解凍必要列の選択jsonでの書き出しを行います。
書き出されたjsonファイルをBigQueryで外部テーブルとしてクエリします。

解凍jsonでの書き出しだけであればGoogle提供のBulk Decompress Cloud Storage Files テンプレートを利用できたのですが、可逆性を保持して最新ファイルを取得するためにクラシックテンプレートを作成することにしました。

Dataflowコード例

ジョブ設定

import apache_beam as beam
import json
import gzip
from datetime import datetime, date, timedelta
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.io import fileio

class BeamOptions:
    def __init__(self,runner):
        self.options = PipelineOptions()
        # GoogleCloud Option
        self.options.view_as(GoogleCloudOptions).job_name = "job_name"
        # Setup Option
        self.options.view_as(SetupOptions).save_main_session = True
        # Standard Option
        self.options.view_as(StandardOptions).runner = runner

DoFnクラス

class GetChatPath(beam.DoFn): # 日付を指定してファイル取得
    def process(self, element):
        dt = (date.today() - timedelta(1)).strftime("%Y-%m-%d")
        path = f"gs://bucket/rawdata/{dt}.json.gz"
        yield path

class ReadGzipAndParseJson(beam.DoFn): # 解凍
    def process(self, element):
        gcs = GcsIO()
        with gcs.open(element.metadata.path, 'rb') as f:
            with gzip.open(f, 'rt', encoding='utf-8') as gz:
                data = gz.read()
        records = json.loads(data)
        # 読み込みはjsonの構造に準じる

class SelectColumns(beam.DoFn): # 必要列の選択
    def __init__(self, columns):
        self.columns = columns

    def process(self, element):
        selected_data = {key: element[key] for key in self.columns if key in element}
        yield selected_data

class ToJsonString(beam.DoFn): # jsonに変換
    def process(self, element):
        yield json.dumps(element, ensure_ascii=False)

実行

def run():
    columns_to_select = ["id", "text"]
    output_uri = "gs://bucket/output"

    with beam.Pipeline(options=BeamOptions("DataflowRunner").options) as p:
        (
            p
            | 'Create' >> beam.Create([None])
            | 'Get Path' >> beam.ParDo(GetChatPath())
            | 'MatchFiles' >> fileio.MatchAll()
            | 'ReadMatches' >> fileio.ReadMatches()
            | 'Parse JSON' >> beam.ParDo(ReadGzipAndParseJson())
            | 'Select Specific Columns' >> beam.ParDo(SelectColumns(columns_to_select))
            | 'Convert to JSON String' >> beam.ParDo(ToJsonString())
            | 'Write to JSON' >> beam.io.WriteToText(output_uri, file_name_suffix='.json')
        )

if __name__ == "__main__":
    run()

カスタムテンプレートの作成

GCSのバケットは以下のような構成としています。

bucket
├── rawdata
│   ├── 2020-01-01.json.gz
│   ├── 2020-01-02.json.gz
│   └── 2020-01-03.json.gz
├── output
│   └── text.json
├── staging
└── templates

コマンド実行でテンプレートが作成されます。Dataflow パイプライン オプションを設定する を参考。

python pipeline.py \
    --runner DataflowRunner \
    --project project-name \
    --staging_location gs://bucket/staging \
    --template_location gs://bucket/templates/template \
    --region asia-northeast1

Cloud Scheduler

全文は割愛しますが、HTTPリクエストのポイントだけ触れます。
projects.locations.templates.launchを参考にしています。
URL

https://dataflow.googleapis.com/v1b3/projects/project-name/locations/asia-northeast1/templates:launch?gcsPath=gs://bucket/templates/template

本文

 {
    "jobName": "job-name",
    "environment": {
        "tempLocation": "gs://bucket/staging",
        "workerZone": "asia-northeast1-a"
    }
}

展望

ひとまずエラーなく動作するようになりました。内部の処理はさらに研究していきます!

今後の発展として、schedulerはcronジョブではなく、GCSにオブジェクトが転送されたタイミングで起動することもできそうです。また、dataflowはストリーミング処理もできそうなので、その辺りを深掘りしていきたいと思います!

Discussion