Pysparkによる累積和から日単位へのクロス集計 (Azure SQL Server系のメトリクスを利用して)
モチベーション
Azure SQL Server系(Synapseなども含めるため)は、動的管理ビュー[1]を用いてメトリクスを確認することができる。それらのメトリクスはインスタンスが起動してからの累積和を保管しているものが多い。例えば、index統計など。そのため、日単位でどの程度の変化があったかなどを確認する場合には、累積和の差分を取り確認する必要がある。
そこで、Pysparkを用いて、累積和を日次で取得して、差分を取ることにより日単位の数値を算出し、クロス集計で確認できるようにする。
メトリクスを取得
- 対象からJDBC経由でメトリクス取得をする。取得するメトリクスのSQLはMSのサイトなどを参考に都度必要なものを抽出する。ここでは割愛。
- 今回は、インデックスの統計情報[2]をもとにして、'user_scans'の値をもとに集計を行い、どの程度各テーブルにアクセスがあったのかを確認する。SQL上で合わせて'schema_name','table_name'も取得しておく。
driver_class = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
jdbcDF = spark.read \
.format("jdbc") \
.option("driver", driver_class) \
.option("url", "jdbc:sqlserver://xxxxxx") \
.option("query", sql) \
.option("user", username) \
.option("password", password) \
.load()
AzureのBlobに格納
- dt(日次)でダイナミックパーティションかつ上書き(option("partitionOverwriteMode", "dynamic")で格納するため、dtカラムを付与する。
- アクセスキーを用いたblob自体へのアクセス設定はこちら[3]
- ADLS GEN2のfilepathであれば、こちら[4]
from pyspark.sql.functions import *
df_dt = jdbcDF.withColumn('dt', current_date())
df_dt.write.mode('overwrite').option("partitionOverwriteMode",
"dynamic").partitionBy('dt').parquet(filePath)
Azure Blob読み込み/集計
- データフレームにdtカラムを付与するために、option("basePath", filePath)を指定し、読み込む。(サンプルは2022/10月分を読み込む)
- 集計前に、同一テーブル内に複数のインデックスが作成されている可能性があるので、集約して合計しておく。(ここは今回、同一テーブル全体の数値を見る想定としたため)
- 集計の仕方としては、lagで前日の値を当日のレコードにセットをし、当日と前日の差を取ることにより、日単位の値を集計している[5]。(前日レコードがない場合にはNullをセット。前日の値の方が大きい場合にはリセットされていると想定し、累積和の値を設定)
- 日単位だけでなく、集計対象の日を合算した値もsumのなかでfor columns[6]するこにより集計している。
from pyspark.sql.window import Window as W
df = spark.read.option("basePath", filePath).parquet(
filePath + '/dt=2022-10-*')
df_user_scans = df.groupby('schema_name', 'table_name', 'dt').agg(
F.sum('user_scans').alias('user_scans'))
windowSpec = W.partitionBy('schema_name', 'table_name').orderBy('dt')
df_user_scans = df_user_scans.withColumn(
'prev_day_user_scans', F.lag('user_scans').over(windowSpec))
df_user_scans = df_user_scans.withColumn('user_scans_perday', F.when(F.col('prev_day_user_scans').isNull(), None).when(F.col(
'user_scans') >= F.col('prev_day_user_scans'), F.col('user_scans') - F.col('prev_day_user_scans')).otherwise(F.col('user_scans')))
df_pivot = df_user_scans.select('schema_name', 'table_name', 'user_scans_perday', 'dt').groupBy(
'schema_name', 'table_name').pivot('dt').sum('user_scans_perday')
df_pivot_total = df_pivot.withColumn('total', sum(F.when(df_pivot[col].isNull(), 0).otherwise(
df_pivot[col]) for col in df_pivot.columns[2:])).orderBy(F.col('total').desc())
-
https://learn.microsoft.com/ja-jp/sql/relational-databases/system-dynamic-management-views/system-dynamic-management-views?view=sql-server-ver16 ↩︎
-
https://learn.microsoft.com/ja-jp/sql/relational-databases/system-dynamic-management-views/sys-dm-db-index-usage-stats-transact-sql?view=sql-server-ver16 ↩︎
-
https://learn.microsoft.com/ja-jp/azure/databricks/data/data-sources/azure/azure-storage#--access-azure-data-lake-storage-gen2-or-blob-storage-using-the-account-key ↩︎
-
https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-introduction-abfs-uri ↩︎
-
https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark ↩︎
-
https://stackoverflow.com/questions/31955309/add-column-sum-as-new-column-in-pyspark-dataframe ↩︎
Discussion