📌

Airflow で BigQuery のテーブルを clone するなら BigQueryInsertJobOperator を使え

2023/10/16に公開

概要

  • Airflow で BigQuery 上のテーブルを clone したい
  • どうやら BigQueryToBigQueryOperator で実行されるのは copy っぽい
  • clone するなら BigQueryInsertJobOperator を使用すればよい

調査した環境

apache-airflow-providers-google 10.9.0

copy と clone

そもそも,BigQuery におけるテーブルの clone とは 2023 年 5 月に GA された比較的新しい機能です.詳細については以下の記事で詳しく説明されているので本稿では割愛します.

BigQueryToBigQueryOperator はテーブルを copy する

本稿執筆時点では,"Airflow BigQuery copy table" 等でググると BigQueryToBigQueryOperator が最初にヒットします.GPT-4 に質問しても BigQueryToBigQueryOperator が帰ってきます.しかし,ドキュメントに則って以下のソースコードでタスクを生成すると,作成されたテーブルはコピーになります.

copy_with_bq_to_bq_operator = BigQueryToBigQueryOperator(
	source_project_dataset_tables="source_project.source_dataset.source_table",
	destination_project_dataset_table=destination_project.destination_dataset.destination_table,
	...
)

ググった結果では clone する方法がイマイチ分からず,せっかくなのでソースコードを追いかけてみました.これが本稿の執筆動機です.

コードを追いかけてみる

本節はタイトルの通りなので,興味のない方は読み飛ばしてください.

airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperatorソースコード を読んでみます.128 行目を見ると,内部では BigQueryHook.run_copy が呼び出されており,このメソッドがテーブルのコピーを行っていることがわかります.

job_id = hook.run_copy(
	source_project_dataset_tables=self.source_project_dataset_table,
	destination_project_dataset_table=self.destination_project_dataset_table,
	...
)

次に BigQueryHook.run_copyソースコードを読んでみます.1879 行目にこのメソッドは非推奨であり,代わりに BigQueryHook.insert_job を使用しろと書かれています.

This method is deprecated. Please use :func:`.insert_job` instead.

コードを読み進めると,1947 行目で insert_job メソッドを呼び出しており, 1928 行目から insert_job メソッドに渡す configuration という引数を定義しています.BigQueryInsertJobOperatorドキュメントによると,この引数は BigQuery API における Job の configuration と同等のようで,その詳細はこちらになります.

BigQueryHook.run_copy における configuration の中身を見てみると,jobType が copy になっているので,BigQuery へテーブルをコピーするための job を発行していることが分かります.

ドキュメントによると,jobType が copy の際,operationType という項目に COPY, SNAPSHOT, CLONE などを指定することでコピーの形式を指定できるようです.CLONE を指定すればテーブルを clone できる旨は公式ドキュメントに記載されています.

run_copy では何も指定されていないので,デフォルト値が設定されているのでしょうか.未指定の場合の挙動に関してのドキュメントを見つけることはできませんでした.(実際にやってみると copy されたので copy っぽい感じはしますが...)

configuration = {
	"copy": {
		"createDisposition": create_disposition,
		"writeDisposition": write_disposition,
		"sourceTables": source_project_dataset_tables_fixup,
		"destinationTable": {
			"projectId": destination_project,
			"datasetId": destination_dataset,
			"tableId": destination_table,
		},
	}
}

run_copy が非推奨になり,insert_job を使用しろと書かれていることと併せると,これまではコピーのためのメソッドが提供されていたが,今後は自分でコピーのための Job を作成しなければならないと解釈できます.具体的には,BigQueryInsertJobOperator で jobType を copy にし,operationType を CLONE にすれば AirFlow からテーブルを clone できそうな感じがします.

BigQueryInsertJobOperator で clone してみる

以下のように

copy_with_bq_insert_job_operator = BigQueryInsertJobOperator(
	configuration={
		"copy": {
			"sourceTable": {
				"projectId": source_project,
				"datasetId": source_dataset,
				"tableId": destination_table,
			},
			"destinationTable": {
				"projectId": destination_project,
				"datasetId": destination_dataset,
				"tableId": destination_table,
			},
			"createDisposition": "CREATE_IF_NEEDED",
			"writeDisposition": "WRITE_EMPTY",
			"operationType": "CLONE",
		}
	},
	...
)

bq コマンドを使用して確認すると,作成されたテーブルは確かに clone で作成されたものでした.clone で作成されたテーブルでは bq show で帰ってくる json に cloneDefinition という情報が付加されています.

$ bq show --format=prettyjson \
	destination_project.destination_dataset.destination_table \
	| jq '.cloneDefinition.baseTableReference'

{
	"datasetId": "source_dataset",
	"projectId": "source_project",
	"tableId": "source_table"
}

コンソールからもテーブルが copy で作成されたものか,clone で作成されたものかを確認することができます.clone で作成されたテーブルでは以下のようにクローン元のテーブルの情報が表示されます.

clone で作成された場合はベーステーブル情報がテーブル詳細に表示される

まとめ

  • 現時点で Airflow から BQ 上のテーブルを clone するなら BigQueryInsertJobOperator を使用するのが良さそう
  • copy でいいにしても,BigQueryToBigQueryOperator の内部で使用されているメソッドは非推奨になっているので代替案として BigQueryInsertJobOperator は候補に挙げられる

Discussion