📑
Cloud SchedulerでDataflowを実行してみる
はじめに
データエンジニアのp_katsuです。
DataflowのETLの開発にあたり、全体像やつまづいたところをまとめておきたいと思います。
全体像
実際はソースDBからターゲットDBまでのETL処理ですが、本記事ではstorage(GCS)からターゲットDB(BigQuery)までの流れに絞って記載します。
要件
bucket
└── rawdata
├── 2020-01-01.json.gz
├── 2020-01-02.json.gz
└── 2020-01-03.json.gz
バケットには、ソースDBの最新のスナップショットがjson.gzで毎日転送されてきます。
Dataflowでは最新のファイルの取得、解凍、必要列の選択、jsonでの書き出しを行います。
書き出されたjsonファイルをBigQueryで外部テーブルとしてクエリします。
解凍、jsonでの書き出しだけであればGoogle提供のBulk Decompress Cloud Storage Files テンプレートを利用できたのですが、可逆性を保持して最新ファイルを取得するためにクラシックテンプレートを作成することにしました。
Dataflowコード例
ジョブ設定
import apache_beam as beam
import json
import gzip
from datetime import datetime, date, timedelta
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.io import fileio
class BeamOptions:
def __init__(self,runner):
self.options = PipelineOptions()
# GoogleCloud Option
self.options.view_as(GoogleCloudOptions).job_name = "job_name"
# Setup Option
self.options.view_as(SetupOptions).save_main_session = True
# Standard Option
self.options.view_as(StandardOptions).runner = runner
DoFnクラス
class GetChatPath(beam.DoFn): # 日付を指定してファイル取得
def process(self, element):
dt = (date.today() - timedelta(1)).strftime("%Y-%m-%d")
path = f"gs://bucket/rawdata/{dt}.json.gz"
yield path
class ReadGzipAndParseJson(beam.DoFn): # 解凍
def process(self, element):
gcs = GcsIO()
with gcs.open(element.metadata.path, 'rb') as f:
with gzip.open(f, 'rt', encoding='utf-8') as gz:
data = gz.read()
records = json.loads(data)
# 読み込みはjsonの構造に準じる
class SelectColumns(beam.DoFn): # 必要列の選択
def __init__(self, columns):
self.columns = columns
def process(self, element):
selected_data = {key: element[key] for key in self.columns if key in element}
yield selected_data
class ToJsonString(beam.DoFn): # jsonに変換
def process(self, element):
yield json.dumps(element, ensure_ascii=False)
実行
def run():
columns_to_select = ["id", "text"]
output_uri = "gs://bucket/output"
with beam.Pipeline(options=BeamOptions("DataflowRunner").options) as p:
(
p
| 'Create' >> beam.Create([None])
| 'Get Path' >> beam.ParDo(GetChatPath())
| 'MatchFiles' >> fileio.MatchAll()
| 'ReadMatches' >> fileio.ReadMatches()
| 'Parse JSON' >> beam.ParDo(ReadGzipAndParseJson())
| 'Select Specific Columns' >> beam.ParDo(SelectColumns(columns_to_select))
| 'Convert to JSON String' >> beam.ParDo(ToJsonString())
| 'Write to JSON' >> beam.io.WriteToText(output_uri, file_name_suffix='.json')
)
if __name__ == "__main__":
run()
カスタムテンプレートの作成
GCSのバケットは以下のような構成としています。
bucket
├── rawdata
│ ├── 2020-01-01.json.gz
│ ├── 2020-01-02.json.gz
│ └── 2020-01-03.json.gz
├── output
│ └── text.json
├── staging
└── templates
コマンド実行でテンプレートが作成されます。Dataflow パイプライン オプションを設定する を参考。
python pipeline.py \
--runner DataflowRunner \
--project project-name \
--staging_location gs://bucket/staging \
--template_location gs://bucket/templates/template \
--region asia-northeast1
Cloud Scheduler
全文は割愛しますが、HTTPリクエストのポイントだけ触れます。
projects.locations.templates.launchを参考にしています。
URL
https://dataflow.googleapis.com/v1b3/projects/project-name/locations/asia-northeast1/templates:launch?gcsPath=gs://bucket/templates/template
本文
{
"jobName": "job-name",
"environment": {
"tempLocation": "gs://bucket/staging",
"workerZone": "asia-northeast1-a"
}
}
展望
ひとまずエラーなく動作するようになりました。内部の処理はさらに研究していきます!
今後の発展として、schedulerはcronジョブではなく、GCSにオブジェクトが転送されたタイミングで起動することもできそうです。また、dataflowはストリーミング処理もできそうなので、その辺りを深掘りしていきたいと思います!
Discussion