🤖

Pysparkでhiveテーブルを上書きしようとしたらハマった備忘録

2022/01/07に公開約2,600字

はじめに

GlueのSparkジョブからパーティショニングされたhiveテーブルの上書きをしようとした際の備忘録になります。

やりたかったこと

table_path = "database_name" + "." + "table_name"
df = df.dropDulicates() 
df.write.mode("overwrite").partitionBy("xxx").insertInto(table_path, overWrite=True)

S3上にあるhiveテーブルの重複削除を行うために、
Spark DataFrameのdropDulicates()により重複削除したDataFrameにより
hiveテーブルのレコードを上書きしようとしたところ、
下記のようなエラーが出力されました。

java.io.FileNotFoundException: File does not exist: hdfs://folder_name/part-xxxx-xxx.snappy.parquet It is possible the underlying files have been updated. 
You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

おそらくhiveのメタストアの更新ができていないため、
REFRESH TABLE tableNameしてパーティションの情報を更新してねという解釈をしました。

エラーメッセージ通り

spark.sql("REFRESH TABLE tableName")

を実行するも変わらず、、

試しに

spark.sql("MSCK REPAIR TABLE tableName")

を試すも、こちらも変化なし

結論

Glueジョブ内でSparkContextを初期化する前に、SparkConfに色々と設定することで
DataFrameのoverwriteを用いてhiveテーブルを上書きすることができました。


import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf#これを追加
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, [
    'JOB_NAME'
])

JOB_NAME = args['JOB_NAME']

# SparkConfの設定
conf = SparkConf()
conf.set("spark.sql.catalogImplementation", "hive")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
conf.set("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")

# SparkConfをSparkContextに渡して初期化
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

~~省略~~

# 重複削除したDataFrameでhiveテーブルを上書き
table_path = "database_name" + "." + "table_name"
df = df.dropDulicates() 
df.write.mode("overwrite").partitionBy("xxx").insertInto(table_path, overWrite=True)

参考

sparksession経由でhiveにアクセスできない場合

【分散処理】PySpark ~ パーティション単位で上書きするには ~

How to enable or disable Hive support in spark-shell through Spark property (Spark 1.6)?

https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

Discussion

ログインするとコメントできます