💨

AirflowのBigQueryInsertJobOperatorのオプションについて

2022/08/30に公開

https://zenn.dev/jcc/articles/accb7fef5efc42

先日「AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法」についての記事を書きましたが、BigQueryInsertJobOperatorにフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。

なぜ気づいたのか

外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、

Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character.

調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlinesといったCSVのオプションを追加すると解決できるらしいです。

https://stackoverflow.com/questions/65782572/how-to-avoid-missing-close-double-quote-character-error-in-google-bigquer

https://qiita.com/sawanoboly/items/410b3346518e569da581

ただし、configurationにcsvOptionsを追加しても解決されず(同じエラーが出た)、困っていました。

configuration={
    "load": {
	"destinationTable": {
	    "projectId": PROJECT_ID,
	    "datasetId": DATASET_NAME,
	    "tableId": TABLE,
	},
	"sourceUris": ["gs://my-bucket/biostats/*.csv"],
	"autodetect": True,
	"sourceFormat": "CSV",
	"hivePartitioningOptions": {
	    "mode": "AUTO",
	    "sourceUriPrefix": "gs://my-bucket/biostats/",
	},
	"csvOptions": {
	    "allowQuotedNewlines": True
	 },
    }
},

解決法

CLI bq loadを利用す際にどうなっているかを確認したら、どうやらcsvOptionsを明示的に書く必要がなく--allow_quoted_newlinesだけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperatorで同じことをすると、問題なくloadできました。

configuration={
    "load": {
	"destinationTable": {
	    "projectId": PROJECT_ID,
	    "datasetId": DATASET_NAME,
	    "tableId": TABLE,
	},
	"sourceUris": ["gs://my-bucket/biostats/*.csv"],
	"autodetect": True,
	"sourceFormat": "CSV",
	"hivePartitioningOptions": {
	    "mode": "AUTO",
	    "sourceUriPrefix": "gs://my-bucket/biostats/",
	},
	# csvOptionsの記述は不要 
	"allowQuotedNewlines": True
    }
},

Discussion