AWS Glue ETL Jobで新しくテーブルを作った後はメタデータが足りないかも?
この記事は 2020-11-10 時点で確認した内容を元に記載しています。
TL; DR
DataSink
で新規作成したテーブルはメタデータが足りないため Redshift Spectrum で読み込むとエラーが発生します。不足しているメタデータ(Table.StorageDescriptor.SerdeInfo.Parameters
)に適当な値を設定しましょう。
Glue ETL Job でテーブルが作れる
Creating Tables, Updating Schema, and Adding New Partitions in the Data Catalog from AWS Glue ETL Jobs にもある通り DataSink
を利用すると Crawler を使わずにテーブルを作成(DataCatalog のメタデータとデータ実体の出力)することができます。
例えば以下のようなコードを書けば foo_db.bar_table
というテーブルを作ることができます。個別に Crawler の設定をせずともテーブルが作成できるので便利ですね。
...
# サンプルコードを一部改変
# https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html
sink = glueContext.getSink(
connection_type="s3",
path="s3://path/to/data",
enableUpdateCatalog=True,
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["partition_key0", "partition_key1"]
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase="foo_db", catalogTableName="bar_table")
sink.writeFrame(last_transform)
Glue ETL Job 実行後 DataCatalog を確認するとテーブルができていることが分かります。
新規作成したテーブルはそのままでは読み込めない
ただ, 上記の方法で作ったテーブルをそのまま Redshift Spectrum で読み込もうとしたら以下のようなエラーが発生してしまいました。デシリアライズでエラーが発生しているようです。
Error running query:
Invalid DataCatalog response for external table "foo_db"."bar_table":
Cannot deserialize table.
Missing mandatory field:
Parameters in response from external catalog.
データの実体(Parquet ファイル)自体は S3 に出力されていたのでメタデータの方に問題がありそうです。そこで aws glue get-table
コマンドを使って
- Redshift Spectrum で読み込めるテーブルのメタデータ
- Glue ETL Job で新規作成したテーブルのメタデータ
を比較してみました。すると Table.StorageDescriptor.SerdeInfo.Parameters
の有無に差があることが分かりました。
--- エラーが出るテーブルのメタデータ
+++ エラーが出ないテーブルのメタデータ
"Compressed": false,
"NumberOfBuckets": 0,
"SerdeInfo": {
- "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+ "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+ "Parameters": {}
},
"SortColumns": [],
"StoredAsSubDirectories": false
エラーメッセージの Missing mandatory field
と併せて考えるとどうやら SerializationLibrary
に与えるパラメータに対応する key が足りなくてエラーがでていたようです。
パラメータを設定すれば読み込めるようになる
そこで Table.StorageDescriptor.SerdeInfo.Parameters
が存在しない場合は空 Object を設定する終期処理(_finalize()
)を追加するようにしたところ Redshift Spectrum での読み込み時エラーは発生しなくなりました。
やはり DataSink
で新規作成されたテーブルに
Table.StorageDescriptor.SerdeInfo.Parameters
が存在しないことが原因だったようです。
def _get_update_table_parameters(
table_definition: dict,
) -> dict:
table_input = table_definition.copy()
database_name = table_input.pop("DatabaseName")
for delete_key in (
"CatalogId",
"CreateTime",
"UpdateTime",
"CreatedBy",
"IsRegisteredWithLakeFormation",
):
if delete_key in table_input:
del table_input[delete_key]
return dict(
DatabaseName=database_name,
TableInput=table_input,
)
def _finalize(table_name: str, database: str) -> None:
glue = boto3.client("glue", "ap-northeast-1")
table: dict = glue.get_table(
DatabaseName=database, Name=table_name
)["Table"]
serde_info = table["StorageDescriptor"]["SerdeInfo"]
if "Parameters" in serde_info:
return
serde_info["Parameters"] = {}
update_table_parameters = _get_update_table_parameters(table)
update_table_parameters[
"TableInput"
]["StorageDescriptor"]["SerdeInfo"] = serde_info
glue.update_table(**update_table_parameters)
...
sink = glueContext.getSink(
connection_type="s3",
path="s3://path/to/data",
enableUpdateCatalog=True,
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["partition_key0", "partition_key1"]
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
catalogDatabase="foo_db", catalogTableName="bar_table"
)
sink.writeFrame(last_transform)
_finalize("bar_table", "foo_db")
...
まとめ
ということで DataSink
を使って新規作成したテーブルはそのままの状態だとメタデータが足りず Redshift Spectrum からの読み込み時にエラーが発生することが分かりました。そして今回は Boto3 で無理やり不足しているメタデータを設定することで問題は解消しました。
この手の処理は DataSink.writeFrame()
の中で解決してもらうのが本来あるべき姿だと思うので AWS による改善を待ちます。
自分向け調査メモ
Glue に不慣れだったので調査に少し時間がかかってしまいました。もし同じようなエラーに遭遇したときはまず以下を確認したほうが良さそうです。
- S3 上にデータが出力されているか
- 実体がそもそも存在していなければ当然読み込めません。
-
aws glue get-table
でメタデータが正しく出力されているか- AWS Console だと見えない部分もあるので CLI を使って確認したほうが良いです。
- 正常に読み込めるデータとエラーがでるデータの diff を見るとなにか気付けるかも知れません。
Discussion