gcloud-aio-bigqueryでasync/awaitを使ってBigQueryのクエリを叩く
PythonからBigQueryを叩くには公式の google-cloud-bigquery があるのですが、asyncioに対応していません。
一部のライブラリではv2側でasyncioに対応しているのですが、BigQueryはv2でも対応していないように見えます。ちなみにv1はほとんどの実装でREST APIを叩いて、v2はほとんどの実装でRPC APIを叩いています(どちらも全部確認したわけではありません)。一部のライブラリでv2のほうを叩くとサーバーエラーが多発したこともあり、今のところなるべくv1でasyncioを使用したいです。
GCPのPythonライブラリをasyncioで動くように書き換えたものが gcloud-aio で用意されており、その中に gcloud-aio-bigquery があります。あまり使われているのを見かけませんが、AirflowのGCP周りの一部で使われているようです。
ただしREST APIそのままの実装になっているため、REST APIの知識が必要です。手軽に使えるとは言い難いです。
REST APIでの流れ
まずクエリを実行するには jobs.query を呼びます。
代表的なパラメーターは以下の通りです。
- query : 必須。SQLクエリを指定する。
- maxResults : 1レスポンスに含まれる行数。指定しなくてもバイト数でページングされるが、指定しておいたほうが安全。
- timeoutMs : タイムアウトの秒数
- useLegacySql : BigQueryのレガシーSQLを使用するかどうか。デフォルトtrueなのでfalseを指定しておいたほうがいい。
実行するとジョブが作成されます。
レスポンスを確認してデータが完結しているか確認します。 jobComplete
が true の場合、ジョブが完了して結果の行を取得できる状態です。この場合、 rows
に結果の行が含まれています。また、 schema
にスキーマが含まれています。 rows
だけ見てもどの列の何のデータがの判別が難しいので、 schema
と合わせて確認します。
jobComplete
が true で、 pageToken
が含まれている場合、まだ読み込むべきデータが残っているので続きを読み込みます。 pageToken
が無ければ終了です。
データが完結していない場合は、レスポンスの jobReference
の jobId
を拾って、( pageToken
があればそれも一緒に) jobs.getQueryResults を実行します。結果は jobs.query と同じです。
全体の流れをまとめると以下のようになります。
- jobs.query でジョブを作成
- jobComplete が true なら
- rows がない(totalRows=0)→終了
- rows がある→そのデータを使う
- pageToken がない→終了
- jobComplete が false 、または pageToken がある→ jobs.getQueryResults を実行し、上の判定ロジックに戻る
実際に使ってみる
実際に動くものが以下になります。
import asyncio
from collections.abc import AsyncGenerator
import aiohttp
from gcloud.aio.bigquery import Job
from google.cloud import bigquery
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.table import _parse_schema_resource # type:ignore
async def query(
sql: str, max_results: int = 100
) -> AsyncGenerator[list[bigquery.Row], None]:
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession(loop=loop) as session:
job = Job(session=session)
query_request = {
"query": sql,
"maxResults": max_results,
"useLegacySql": "false",
}
response = await job.query(query_request=query_request)
while True:
errors = response.get("errors")
page_token = response.get("pageToken")
if errors is not None:
raise RuntimeError(errors)
if response.get("jobComplete"):
if int(response.get("totalRows", 0)) == 0:
return
schema = _parse_schema_resource(response.get("schema", {}))
yield _rows_from_json(response.get("rows", ()), schema)
if page_token is None:
return
else:
await asyncio.sleep(1.0) # 終わってなさそうなので少し待つ
# ジョブの結果を確認
job = Job(
job_id=response["jobReference"]["jobId"],
project=response["jobReference"]["projectId"],
session=session,
)
params = {
"location": response["jobReference"]["location"],
"maxResults": max_results,
"pageToken": page_token,
}
response = await job.get_query_results(params=params)
async def main() -> None:
async for rows in query("SELECT * FROM `project_id.dataset_id.table`"):
for row in rows:
# row["key"] で値を取得可能
for key, value in row.items():
print(f"{key}={value}")
if __name__ == "__main__":
asyncio.run(main())
雑に書いているので必要に応じて書き換えてください。タイムアウトの設定や例外の処理などが別途必要になります。
結果行の整形に公式の google-cloud-bigquery
も使っています。また、 aiohttp
パッケージも必要です。
プロジェクトIDを指定したい場合は Job のコンストラクタで指定してもいいですし、環境変数 GOOGLE_CLOUD_PROJECT
などで指定しても反映されます。
まとめ
公式で対応してほしいです。
Discussion