💨
AirflowのBigQueryInsertJobOperatorのオプションについて
先日「AirflowでGCS外部パーティションテーブルをBigQueryに取り込む方法」についての記事を書きましたが、BigQueryInsertJobOperator
にフォーマットのオプションを追加する際に、実際の挙動は公式ドキュメントの記述と若干違うので、追記しておきます。
なぜ気づいたのか
外部テーブルを取り組む際に、閉じるカッコが見つからないというエラーが出て、
Error detected while parsing row starting at position: 4204. Error: Missing close double quote (") character.
調査してみた結果、改行記号が含まれているのが原因のようでした。stack overflowによるとallowQuotedNewlines
といったCSVのオプションを追加すると解決できるらしいです。
ただし、configurationにcsvOptions
を追加しても解決されず(同じエラーが出た)、困っていました。
configuration={
"load": {
"destinationTable": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": TABLE,
},
"sourceUris": ["gs://my-bucket/biostats/*.csv"],
"autodetect": True,
"sourceFormat": "CSV",
"hivePartitioningOptions": {
"mode": "AUTO",
"sourceUriPrefix": "gs://my-bucket/biostats/",
},
"csvOptions": {
"allowQuotedNewlines": True
},
}
},
解決法
CLI bq load
を利用す際にどうなっているかを確認したら、どうやらcsvOptions
を明示的に書く必要がなく--allow_quoted_newlines
だけ追加すとでうまくいきました。AirflowのBigQueryInsertJobOperator
で同じことをすると、問題なくloadできました。
configuration={
"load": {
"destinationTable": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": TABLE,
},
"sourceUris": ["gs://my-bucket/biostats/*.csv"],
"autodetect": True,
"sourceFormat": "CSV",
"hivePartitioningOptions": {
"mode": "AUTO",
"sourceUriPrefix": "gs://my-bucket/biostats/",
},
# csvOptionsの記述は不要
"allowQuotedNewlines": True
}
},
Discussion