PythonでDataflowを書いた時に詰まった話
Fringe81 Advent Calender 2020、18日目の記事です。
今回はPythonを使ってDataflowを実装する際に詰まったお話を書きます。
Dataflowの実装は初めてだったのですが、Pythonは普段から書いてるしなんとかなるだろうと思ってました。
ところが実装を進めていくと思わぬ場所で詰まってしまったので、その備忘録も兼ねてここに記そうと思います。
Dataflowについてはこちら。
DataflowのPythonモジュールについてはこちら。
また、Pythonを用いたDataflow実装がまとめてある記事はこちら。
何に困ったか
まずDataflowを使うと、様々なデータベースからデータを取得することができ、様々なデータベースへデータを保存することができます。今回僕が実装したのは以下のようなフローでした。
そこで何に困ったかと言うと、
- BigQuery経由でCloudSQLからデータを取得する部分
- JSON形式でGCSへデータを書き出す部分
の2箇所で詰まりました。
1つ目の箇所では、そもそもCloudSQLにあるデータを直接MySQLへ接続し取得できれば済む話ではあったのですが、認証部分でうまく接続できず、リソースなどを踏まえてBigQuery経由で取得する方法を試すことにしました。
しかし、BigQueryにあるデータを取得することは簡単にできたのですが、CloudSQLにあるデータをBigQuery経由で取得するために以下のようなクエリを組み込んだところ、うまく実行できませんでした。
query_bq = f"""
SELECT *
FROM EXTERNAL_QUERY("{connection_id}", "{query_sql}")
"""
同じクエリを使ってBigQueryのUI上で試したところ普通に実行できたので、なんでできないのかが全くわからず調べても解決できませんでした。
2つ目の箇所では、以下のようなJSON形式のデータを保存する際に詰まりました。
[
{"name": "hoge", "age": 26, "sex": "male"},
{"name": "fuga", "age": 24, "sex": "female"},
,,,
]
PythonのDataflowモジュールであるapache_beamの公式サイトだと、このあたりの細かな情報が掲載されてないので、Dataflow初心者としては単純にわからん状態でした。
というわけでいろいろと試した結果、なんとか解決できた方法を以下に記します。
BiqQuery経由でCloudSQLからデータを取得する方法
こちらの課題は、BigQueryにあるデータを取得するためのクエリを実行する際に、吐き出されるログに注目することで解決しました。BigQueryに接続してクエリを実行すると、以下のようなログが出力されます。
INFO:apache_beam.io.gcp.bigquery_tools:Using location 'asia-northeast1' from table <TableReference
datasetId: 'dataset_id'
projectId: 'project_id'
tableId: 'table_id'> referenced by query
SELECT...
ここで注目すべきなのが、クエリ実行時にtableIdが一つ定まった状態で返される部分です。これはクエリで参照してるBigQuery上にあるテーブルを指しており、上で書いてたクエリにはBigQuery上のテーブル(つまりtableId)が参照されてないことが原因だとわかりました。なので今回は、紐付けができるBigQuery上にあるテーブルを一緒に参照し、それにSQLから参照したテーブルをJOINさせることで無事解決しました(ここは別の解決方法もありそうなので、もしこういうのがあるよ!などの情報があれば教えていただけますと幸いです)。
JSON形式でGCSへデータを書き出す方法
こちらの課題は案外すんなり解決できました。というのも、こちらを参照して、apache_beamのモジュールをラップすれば解決できる部分でした。
import json
from apache_beam import PTransform
from apache_beam.coders import coders
from apache_beam.io import FileBasedSink
from apache_beam.io.iobase import Write
class _JsonSink(FileBasedSink):
""" Jsonファイルを書き込むためのデータフローシンクを設定 """
def __init__(self, file_path_prefix):
super(_JsonSink, self).__init__(
file_path_prefix,
coder=coders.ToStringCoder(),
file_name_suffix=".json",
num_shards=1, # 作成ファイル数を指定(デフォルトは0)
shard_name_template="", # ファイルの末尾に付く番号を除去(デフォルトはNone)
mime_type="application/json",
)
self.last_rows = dict()
def open(self, temp_path):
""" ファイルを開いた直後の処理を設定 """
file_handle = super(_JsonSink, self).open(temp_path)
# "["で始める
file_handle.write(b"[\n")
return file_handle
def write_record(self, file_handle, value):
""" レコードを書き込む際の処理を設定 """
if self.last_rows.get(file_handle, None) is not None:
file_handle.write(
self.coder.encode(json.dumps(self.last_rows[file_handle]))
)
# ","を末尾に追加
file_handle.write(b",\n")
self.last_rows[file_handle] = value
def close(self, file_handle):
""" ファイルを閉じる直前の処理を設定 """
if file_handle is not None:
# 最後のレコードを追加
file_handle.write(
self.coder.encode(json.dumps(self.last_rows[file_handle]))
)
# "]"で閉じる
file_handle.write(b"\n]\n")
file_handle.close()
class WriteToJson(PTransform):
""" Jsonファイルを書き込むためのPTransform """
def __init__(self, file_path_prefix):
self._sink = _JsonSink(file_path_prefix)
def expand(self, pcoll):
return pcoll | Write(self._sink)
あとはこのラップされたクラスを使って
pcol | WriteToJson("gs://path/to/file")
とすれば実現できました。なお、今回はGCSに複数ファイルを作成したくなかったため、num_shards=0
に設定し、作成されたファイルの語尾に番号を付けたくなかったためshard_name_template=""
にしました。
最後に
初めてDataflowを触るということで、理解するのが難しい箇所が多かったのですが、これを使いこなすことによって、複雑な処理を噛ませたデータの受け渡しができるようになり、僕としてはとても良い学習となりました。
また、SQLへ直接接続する部分がまだできてないので、次回はそこにチャレンジしていきたいと思います。
明日の当番はtenajimaさんです。引き続きFringe81 Advent Calender 2020をお楽しみください。
Discussion