📘

Databricksのメダリオンアーキテクチャを踏襲したエンジニアリング演習をしてみたけど....

に公開1

やったこと

YahooFinanceの株価データを使用して、DatabricksとBronze,Silver,Goldレイヤーをそれぞれ作ってみた。メダリオンアーキテクチャを踏襲したDatabricksでETLを練習してみることが目的だったので、エンジニアリングの方針はchatGPTに作らせた(この記事の骨組みもGPT任せ)。
メダリオンアーキテクチャについてはこちら:https://www.databricks.com/jp/glossary/medallion-architecture

先んじてまとめと感想

メダリオンアーキテクチャを踏襲してエンジニアリングをやってみたけど、正直いまいちピンときてないのが本音。業務レベルでやればもう少し実感湧くのかしら...

でもこのアーキテクチャというか思想はとても良いと思う!
2年前の初の分析案件で、クレンジングと特徴量エンジニアリングを同じ工程でやらされて結構しんどい目にあったので...

使用技術

  • Databricks: データ処理環境
  • PySpark: 分散データ処理
  • Delta Lake: データ保存形式
  • SQL: データクエリおよび管理

ステップ 1: データ取得と準備

Yahoo Finance APIを利用して、日経225構成銘柄のうち10銘柄の株価データを取得しました。
その後、取得したデータをBronzeレイヤーとして、Databricks内に保存しました。
この段階ではデータに対してまだ加工や整形は行ってないです

!pip install yfinance -q
import yfinance as yf
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import col


# 取得株価
stocks = ["7203.T", "6758.T", "7974.T", "8306.T", "9433.T", 
           "9983.T", "6594.T", "9432.T", "4063.T", "8035.T"]

# 取得期間
start_date = "2020-01-01"
end_date = datetime.today().strftime('%Y-%m-%d')

# データ取得
stock_df = pd.DataFrame()
for stock in stocks:
  df = yf.download(stock, start=start_date, end=end_date)
  df["stock"] = stock
  df.columns = [col[0] for col in df.columns]
  df = df.reset_index()
  stock_df = pd.concat([stock_df, df], axis=0)
stock_df = stock_df[["stock","Date","Close","High","Low","Open","Volume"]]

テーブル保存
df_spark = spark.createDataFrame(stock_df)
df_spark.write.mode("overwrite").format("delta").saveAsTable(path_bronze)

ステップ 2: データ整形(Silverレイヤー)

まずは以下3種のデータ整形を行いました
・全カラムに欠損値があるレコードを削除
・株価が負の値をもつレコードを削除
・株式コードと日付の組み合わせに対して重複を削除

次に整形後のデータをSilverレイヤーとして保存しました。
このデータは、分析や可視化のための基盤となるそう。

from pyspark.sql.functions import col,upper
from functools import reduce

# Brozeデータ読み込み
bronze_df = spark.read.table(path_bronze)

# 整形
# 欠損レコード削除
print(bronze_df.count())
non_null_cond = [col(c).isNotNull() for c in bronze_df.columns]
silver_df = bronze_df.filter(reduce(lambda x,y: x&y ,non_null_cond))
print(silver_df.count())

# 値が負のレコード除外
print(silver_df.count())
non_minus_cond = [col(c)>=0 for c in silver_df.columns if c not in ["stock", "Date"]]
silver_df = silver_df.filter(reduce(lambda x,y: x&y ,non_minus_cond))
print(silver_df.count())

# 重複削除
print(silver_df.count())
silver_df = silver_df.dropDuplicates(["stock", "Date"])
print(silver_df.count())

# 保存
silver_df.write.format("delta").mode("overwrite").saveAsTable(path_silver)

ちなみにreduceって見たことあるけど使ったことなかったから、勉強する良い機会になった。
特定の値(今回は&)で配列の中身をくっつけるイメージだと思ってる

silver_df.filter(reduce(lambda x,y: x&y ,non_minus_cond))

ステップ 3: トレンド分析(Goldレイヤー)

3.1 移動平均と前日差分の計算

株価データに対して、直近5日間および25日間の終値の移動平均と、直近5日間の出来高の移動平均を算出しました。

from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, lag, round, trunc, year, month, count, max, min, when, abs

# 銘柄ごと・日付順のウィンドウ定義
stock_window = Window.partitionBy("stock").orderBy("Date")
df_trend = spark.read.table(silver_path)

# ウィンドウ定義:銘柄ごと・日付順
stock_window = Window.partitionBy("stock").orderBy("Date")

# 直近5日間の終値の移動平均、直近25日間の終値の移動平均、直近5日間の出来高の移動平均、前日からの終値の変化率(%)、出来高の前日差分
df_trend = df_trend.withColumn("ma_5", round(avg("close").over(stock_window.rowsBetween(-4, 0)), 2)) \
                   .withColumn("ma_25", round(avg("close").over(stock_window.rowsBetween(-24, 0)), 2)) \
                   .withColumn("vol_ma_5", round(avg("volume").over(stock_window.rowsBetween(-4, 0)), 2)) \
                   .withColumn("pct_change_close", round(((col("close") - lag("close").over(stock_window)) / lag("close").over(stock_window)) * 100, 2)) \
                   .withColumn("vol_change", col("volume") - lag("volume").over(stock_window))

# 保存
df_trend.write.format("delta").mode("overwrite").saveAsTable(gold_trend_metrics_path)

3.2 株価変動の計算

前日からの終値の変化率(%)を計算し、出来高の差分も算出しました。これにより、日々の株価の変動や出来高の急増を把握することができます。

df_monthly_summary = spark.read.table(silver_path)
# 日付から年月カラムを作成
df_monthly_summary = df_monthly_summary.withColumn("year", year("Date"))\
                                .withColumn("month", month("Date"))

# 月次で集計(終値・出来高の平均、最大、最小、件数)
monthly_summary = df_monthly.groupBy("stock", "year", "month").agg(
    round(avg("close"), 2).alias("avg_close"),
    round(max("close"), 2).alias("max_close"),
    round(min("close"), 2).alias("min_close"),
    round(avg("volume"), 2).alias("avg_volume"),
    count("*").alias("record_count")
)

# 保存
monthly_summary.write.format("delta").mode("overwrite").saveAsTable(gold_monthly_summary_path)

3.3 変動フラグの付与

次に、株価変動が大きいレコードに対してフラグを付けました。

  • flag_price_spike: 株価の前日比が±10%以上の変動があった場合にフラグ付け
  • flag_volume_spike: 出来高の前日差分が、過去5日間の出来高の移動平均の2倍以上であった場合にフラグ付け
df_flags = df_trend # Step1のデータフレームを使用
# 変動が大きいレコードにフラグ付け(前日比 ±10%以上 or 出来高差が平均の2倍以上)
df_flags = df_flags.withColumn("flag_price_spike", when(abs(col("pct_change_close")) > 10, 1).otherwise(0)) \
                             .withColumn("flag_volume_spike", when(abs(col("vol_change")) > 2 * col("vol_ma_5"), 1).otherwise(0))

# 保存
df_flags.write.format("delta").mode("overwrite").saveAsTable(gold_quality_flags_path)

Discussion

manabianmanabian

こんにちは。私なりの意見を述べさせていただきます。

記事に書かれているように、私は開発と運用を実施してメダリオンアーキテクチャの理解が深まりました。

例えば、メダリオンアーキテクチャのメリットについては、運用時のエラー発生時に実感することがあります。データが残っているので、どの時点でおかしくなったのかなどが調査できて便利です。

それに加えて、リネージも役立つときがあります。同時の書き込みのエラーが発生したさいに、どの処理が競合しているかを把握できました。そこから、データを確認して、どのデータが影響しているのかの調査につながります。