Airflow で BigQuery のテーブルを clone するなら BigQueryInsertJobOperator を使え
概要
- Airflow で BigQuery 上のテーブルを clone したい
- どうやら
BigQueryToBigQueryOperator
で実行されるのは copy っぽい - clone するなら
BigQueryInsertJobOperator
を使用すればよい
調査した環境
apache-airflow-providers-google 10.9.0
copy と clone
そもそも,BigQuery におけるテーブルの clone とは 2023 年 5 月に GA された比較的新しい機能です.詳細については以下の記事で詳しく説明されているので本稿では割愛します.
- https://cloud.google.com/bigquery/docs/table-clones-intro?hl=ja
- https://dev.classmethod.jp/articles/bigquery-clone/
BigQueryToBigQueryOperator
はテーブルを copy する
本稿執筆時点では,"Airflow BigQuery copy table" 等でググると BigQueryToBigQueryOperator
が最初にヒットします.GPT-4 に質問しても BigQueryToBigQueryOperator
が帰ってきます.しかし,ドキュメントに則って以下のソースコードでタスクを生成すると,作成されたテーブルはコピーになります.
copy_with_bq_to_bq_operator = BigQueryToBigQueryOperator(
source_project_dataset_tables="source_project.source_dataset.source_table",
destination_project_dataset_table=destination_project.destination_dataset.destination_table,
...
)
ググった結果では clone する方法がイマイチ分からず,せっかくなのでソースコードを追いかけてみました.これが本稿の執筆動機です.
コードを追いかけてみる
本節はタイトルの通りなので,興味のない方は読み飛ばしてください.
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
の ソースコード を読んでみます.128 行目を見ると,内部では BigQueryHook.run_copy
が呼び出されており,このメソッドがテーブルのコピーを行っていることがわかります.
job_id = hook.run_copy(
source_project_dataset_tables=self.source_project_dataset_table,
destination_project_dataset_table=self.destination_project_dataset_table,
...
)
次に BigQueryHook.run_copy
のソースコードを読んでみます.1879 行目にこのメソッドは非推奨であり,代わりに BigQueryHook.insert_job
を使用しろと書かれています.
This method is deprecated. Please use :func:`.insert_job` instead.
コードを読み進めると,1947 行目で insert_job
メソッドを呼び出しており, 1928 行目から insert_job
メソッドに渡す configuration
という引数を定義しています.BigQueryInsertJobOperator
のドキュメントによると,この引数は BigQuery API における Job の configuration と同等のようで,その詳細はこちらになります.
BigQueryHook.run_copy
における configuration の中身を見てみると,jobType が copy になっているので,BigQuery へテーブルをコピーするための job を発行していることが分かります.
ドキュメントによると,jobType が copy の際,operationType という項目に COPY, SNAPSHOT, CLONE などを指定することでコピーの形式を指定できるようです.CLONE を指定すればテーブルを clone できる旨は公式ドキュメントに記載されています.
run_copy
では何も指定されていないので,デフォルト値が設定されているのでしょうか.未指定の場合の挙動に関してのドキュメントを見つけることはできませんでした.(実際にやってみると copy されたので copy っぽい感じはしますが...)
configuration = {
"copy": {
"createDisposition": create_disposition,
"writeDisposition": write_disposition,
"sourceTables": source_project_dataset_tables_fixup,
"destinationTable": {
"projectId": destination_project,
"datasetId": destination_dataset,
"tableId": destination_table,
},
}
}
run_copy
が非推奨になり,insert_job
を使用しろと書かれていることと併せると,これまではコピーのためのメソッドが提供されていたが,今後は自分でコピーのための Job を作成しなければならないと解釈できます.具体的には,BigQueryInsertJobOperator
で jobType を copy にし,operationType を CLONE にすれば AirFlow からテーブルを clone できそうな感じがします.
BigQueryInsertJobOperator
で clone してみる
以下のように
copy_with_bq_insert_job_operator = BigQueryInsertJobOperator(
configuration={
"copy": {
"sourceTable": {
"projectId": source_project,
"datasetId": source_dataset,
"tableId": destination_table,
},
"destinationTable": {
"projectId": destination_project,
"datasetId": destination_dataset,
"tableId": destination_table,
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_EMPTY",
"operationType": "CLONE",
}
},
...
)
bq コマンドを使用して確認すると,作成されたテーブルは確かに clone で作成されたものでした.clone で作成されたテーブルでは bq show
で帰ってくる json に cloneDefinition
という情報が付加されています.
$ bq show --format=prettyjson \
destination_project.destination_dataset.destination_table \
| jq '.cloneDefinition.baseTableReference'
{
"datasetId": "source_dataset",
"projectId": "source_project",
"tableId": "source_table"
}
コンソールからもテーブルが copy で作成されたものか,clone で作成されたものかを確認することができます.clone で作成されたテーブルでは以下のようにクローン元のテーブルの情報が表示されます.
まとめ
- 現時点で Airflow から BQ 上のテーブルを clone するなら
BigQueryInsertJobOperator
を使用するのが良さそう - copy でいいにしても,
BigQueryToBigQueryOperator
の内部で使用されているメソッドは非推奨になっているので代替案としてBigQueryInsertJobOperator
は候補に挙げられる
Discussion