時系列データを長さ L のバッファに 確率 p で ランダムな位置のデータと置換する際の性質
バッファの定義
バッファの長さを
時系列データはこのように得られる。
ここで、 最初
そうすると、このように実装が与えられる。
import random
L: int # > 0 の整数
p: float # [0,1]の浮動小数
buffer = []
def add(data):
if len(buffer) < L: # 最初 L 個は追加
buffer.append(data)
else:
if random.random() > p: # 追加しない
return
index = random.randint(0, L - 1) # ランダムな位置を取得
buffer[index] = data
なぜこのようなバッファを考えるか
強化学習、オンライン学習において、経験バッファを実装する際に使うため。
バッファ長が有限であるが、過去のデータもある程度保持しておきたいといったときに、このバッファを用いる。
順序は保持せず、ただデータを持つだけの場合に使う。
T ステップ前のデータの生存確率
ここで、 時刻が十分に経過したとして (
t-k のデータを含む確率 q(k)
特定のスロットが時刻 新しいデータが追加された際、確率
一般化すると:
t-T 以前のデータを含む確率を求める
あるスロットが時刻ステップ1. 定義を代入する
これは時刻
ステップ2: 定数項を取り出す
ステップ3: 記号の簡略化
計算を簡単にするために、
ステップ4: 和の変形
ここで、
ステップ5: 無限等比級数の和の公式を適用
ステップ6: 公式を代入して計算
r を元の式に戻す
ステップ7: したがって、特定のスロットが時刻
L 個のスロットが時刻t-T+1 以降のデータのみを含む確率
全全
求める関数
したがって、バッファが
可視化
ソースコード
import numpy as np
import matplotlib.pyplot as plt
import japanize_matplotlib
def f(L, p, T):
"""
バッファがTステップ前までのデータを持つ確率を計算する関数
引数:
L -- バッファの長さ
p -- 置換確率
T -- ステップ数
戻り値:
バッファがTステップ前までのデータを持つ確率
"""
return 1 - (1 - (1 - p/L)**(T-1))**L
# パラメータの設定
L = 10
p_values = np.arange(0.1, 1.1, 0.1)
T_max = 500
T_values = np.arange(1, T_max + 1)
# プロットの設定
plt.figure(figsize=(12, 8))
# 各pに対してグラフをプロット
for p in p_values:
f_values = [f(L, p, T) for T in T_values]
plt.plot(T_values, f_values, label=f'p = {p:.1f}')
# グラフの装飾
plt.title(f'バッファがTステップ前までのデータを持つ確率 (L = {L})', fontsize=14)
plt.xlabel('T (ステップ数)', fontsize=12)
plt.ylabel('f(L,p,T) (確率)', fontsize=12)
plt.grid(True, alpha=0.3)
plt.legend(loc='best')
plt.ylim(0, 1.05)
# グラフの表示
plt.tight_layout()
plt.show()
バッファが平均的に何ステップ前までデータを保持するか
先ほど、バッファがTステップ以前のデータを持つ確率として:
を導出した。
となる。
これを用いて期待値を計算する:
簡略化のために
二項定理を適用して:
和の順序を入れ替え:
ここで
したがって、期待値の厳密な式は:
E[T] の近似解の導出
厳密な期待値の計算は、
そこで、近似値を導出する。
f が十分に小さくなるまで 足し合わせる。
方法1: これは愚直だけれども簡単な方法で、 割と大きい値まで 正確に求められる。
普通にPythonで書くと遅くて仕方がないので、 numba
を使って高速化する。
ソースコード
import numba
from math import log, exp
@numba.jit(nopython=True)
def expected_oldest_data_age(L, p, max_steps, cutoff=1e-10):
"""
バッファ内の最も古いデータの年齢の期待値を近似計算
Parameters:
L (int): バッファの長さ
p (float): データ置換確率 (0 < p <= 1)
max_steps (int): 最大ステップ数
cutoff (float): 打ち切り精度
Returns:
float: 期待値 E[T]
"""
expectation = 0.0
q = 1 - p/L
log_q = log(q)
# 最初の項(t=1)
expectation += 1.0 # P(T >= 1) = 1
for t in range(2, max_steps + 1):
# 効率的に確率を計算
log_term = (t-1) * log_q
# 数値アンダーフロー対策
if log_term < -700: # exp(-700)はほぼ0
prob = 0
else:
term = exp(log_term)
prob = 1 - (1 - term)**L
expectation += prob
# 確率がカットオフより小さくなったら終了
if prob < cutoff:
break
return expectation, t, prob
def main():
# パラメータ設定例
L_values = [10, 100, 1000, 10_000, 100_000]
p_values = [0.1, 0.5, 0.9]
max_steps = 100_000_000
print("バッファ内の最も古いデータの年齢の期待値:")
print("-" * 100)
for L in L_values:
for p in p_values:
expected, t, prob = expected_oldest_data_age(L, p, max_steps)
print(f"L: {L: >7}\tp {p}\t期待値: {str(expected)[:10]}\t導出時間ステップ: {t: >10} \t打ち切り確率: {prob:.2e}")
print("-" * 100)
if __name__ == "__main__":
main()
実行結果
ッファ内の最も古いデータの年齢の期待値:
----------------------------------------------------------------------------------------------------
L: 10 p 0.1 期待値: 291.929888 導出時間ステップ: 2522 打ち切り確率: 9.92e-11
----------------------------------------------------------------------------------------------------
L: 10 p 0.5 期待値: 57.6023617 導出時間ステップ: 495 打ち切り確率: 9.90e-11
----------------------------------------------------------------------------------------------------
L: 10 p 0.9 期待値: 31.5565915 導出時間ステップ: 270 打ち切り確率: 9.60e-11
----------------------------------------------------------------------------------------------------
L: 100 p 0.1 期待値: 5185.28339 導出時間ステップ: 27619 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 100 p 0.5 期待値: 1035.37964 導出時間ステップ: 5514 打ち切り確率: 9.97e-11
----------------------------------------------------------------------------------------------------
L: 100 p 0.9 期待値: 574.277682 導出時間ステップ: 3058 打ち切り確率: 9.94e-11
----------------------------------------------------------------------------------------------------
L: 1000 p 0.1 期待値: 74851.4658 導出時間ステップ: 299325 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 1000 p 0.5 期待値: 14967.6986 導出時間ステップ: 59854 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 1000 p 0.9 期待値: 8313.94654 導出時間ステップ: 33246 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 10000 p 0.1 期待値: 978756.209 導出時間ステップ: 3223130 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 10000 p 0.5 期待値: 195747.726 導出時間ステップ: 644614 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 10000 p 0.9 期待値: 108746.784 導出時間ステップ: 358113 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 100000 p 0.1 期待値: 12090140.5 導出時間ステップ: 34485493 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 100000 p 0.5 期待値: 2418023.68 導出時間ステップ: 6897086 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
L: 100000 p 0.9 期待値: 1343344.02 導出時間ステップ: 3831707 打ち切り確率: 9.99e-11
----------------------------------------------------------------------------------------------------
方法2: 近似式で求める。
どうやら、 次の式で近似的に求められるらしい (Claude曰く)
ここで
ただ、このまま使用するとこの近似式が正しいかどうかわからないし、おそらくいくつかの微調整項が必要だろう。
よって、
そして、 厳密な解や、先に導出した 方法1の正確な近似値を用いて、これらの定数項を最適化する。
ソースコード
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
import numba
from math import log, exp
import time
import japanize_matplotlib
# オイラー・マスケローニ定数
gamma = 0.57721566490153286
@numba.jit(nopython=True)
def expected_oldest_data_age(L, p, max_steps, cutoff=1e-10):
"""
バッファ内の最も古いデータの年齢の期待値を近似計算
Parameters:
L (int): バッファの長さ
p (float): データ置換確率 (0 < p <= 1)
max_steps (int): 最大ステップ数
cutoff (float): 打ち切り精度
Returns:
float: 期待値 E[T]
"""
expectation = 0.0
q = 1 - p/L
log_q = log(q)
# 最初の項(t=1)
expectation += 1.0 # P(T >= 1) = 1
for t in range(2, max_steps + 1):
# 効率的に確率を計算
log_term = (t-1) * log_q
# 数値アンダーフロー対策
if log_term < -700: # exp(-700)はほぼ0
prob = 0
else:
term = exp(log_term)
prob = 1 - (1 - term)**L
expectation += prob
# 確率がカットオフより小さくなったら終了
if prob < cutoff:
break
return expectation
def generate_data():
"""より広範なL, pの組み合わせでデータを生成"""
# 幅広いLとpの値を設定
L_values = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000]
p_values = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]
max_steps = 100_000_000
results = []
total = len(L_values) * len(p_values)
count = 0
print(f"全{total}通りの組み合わせで計算中...")
start_time = time.time()
for L in L_values:
for p in p_values:
count += 1
if count % 10 == 0 or count == total:
elapsed = time.time() - start_time
print(f"進捗: {count}/{total} ({count/total*100:.1f}%) 経過時間: {elapsed:.1f}秒")
expected = expected_oldest_data_age(L, p, max_steps)
results.append((L, p, expected))
return results
def approximation(L, p, alpha, beta):
"""近似式の計算: E[T] ≈ α(L/p)(ln(L) + γ) + β"""
return alpha * (L/p) * (np.log(L) + gamma) + beta
def objective_function(params, data):
"""最小化する目標関数(二乗相対誤差の和)"""
alpha, beta = params
error_sum = 0
for L, p, exact_value in data:
predicted = approximation(L, p, alpha, beta)
# 相対誤差の二乗を使用
relative_error = (predicted - exact_value) / exact_value
error_sum += relative_error ** 2
return error_sum
def optimize_parameters(data):
"""最適なα, βを求める"""
# 初期値
initial_guess = [1.0, 0.0]
# 最適化の実行
result = minimize(
lambda params: objective_function(params, data),
initial_guess,
method='Nelder-Mead',
options={'maxiter': 1000000}
)
return result.x
def validate_approximation(data, alpha, beta):
"""近似式の精度を検証"""
errors = []
for L, p, exact_value in data:
predicted = approximation(L, p, alpha, beta)
error_percent = 100 * abs(predicted - exact_value) / exact_value
errors.append(error_percent)
return np.mean(errors), np.std(errors), np.max(errors), np.min(errors)
def plot_results(data, alpha, beta):
"""結果をプロット"""
# データ整形
L_values = sorted(list(set([d[0] for d in data])))
p_values = sorted(list(set([d[1] for d in data])))
# マーカーと色のセット
markers = ['o', 's', '^', 'v', 'D', '*', 'p', 'h', 'X', '+']
colors = plt.cm.tab10(np.linspace(0, 1, len(p_values)))
plt.figure(figsize=(14, 8))
# 各pごとにプロット
for i, p in enumerate(p_values):
p_data = [(L, exact) for L, p_val, exact in data if p_val == p]
if not p_data:
continue
L_subset, exact_values = zip(*p_data)
# 実際の値
plt.scatter(L_subset, exact_values,
marker=markers[i % len(markers)],
color=colors[i],
label=f'p={p} (実測値)')
# 近似値
L_range = np.array(L_subset)
approx_values = [approximation(L, p, alpha, beta) for L in L_range]
plt.plot(L_range, approx_values, '--', color=colors[i])
plt.xscale('log')
plt.yscale('log')
plt.xlabel('バッファ長さ (L)')
plt.ylabel('最も古いデータの期待年齢 E[T]')
plt.title(f'バッファ内の最も古いデータの期待年齢\n近似式: E[T] ≈ {alpha:.4f}(L/p)(ln(L) + γ) + {beta:.2f}')
plt.grid(True, which='both', linestyle='--', alpha=0.6)
plt.legend(loc='upper left', ncol=2)
plt.tight_layout()
plt.show()
def main():
# データ生成
print("データを生成中...")
data = generate_data()
print(f"生成されたデータポイント数: {len(data)}")
# パラメータ最適化
print("最適なパラメータを計算中...")
optimal_alpha, optimal_beta = optimize_parameters(data)
print(f"最適なパラメータ: alpha = {optimal_alpha:.6f}, beta = {optimal_beta:.6f}")
# 精度検証
mean_error, std_error, max_error, min_error = validate_approximation(data, optimal_alpha, optimal_beta)
print("\n近似式の精度:")
print(f"平均相対誤差: {mean_error:.2f}%")
print(f"標準偏差: {std_error:.2f}%")
print(f"最大誤差: {max_error:.2f}%")
print(f"最小誤差: {min_error:.2f}%")
# 最終的な近似式
print(f"\n最適化された近似式: E[T] ≈ {optimal_alpha:.4f} × (L/p) × (ln(L) + {gamma:.4f}) + {optimal_beta:.2f}")
# L値に対する厳密値と近似値の比較表示(pごとにグループ化)
print("\n厳密値vs近似値の比較 (サンプル):")
p_values = sorted(list(set([d[1] for d in data])))
L_values = sorted(list(set([d[0] for d in data])))
L_sample = [10, 100, 1000, 10000] # サンプル表示用
for p in [0.1, 0.5, 0.9]: # サンプル表示用
print(f"\np = {p}:")
print("-" * 70)
print(f"{'L':<10}{'厳密値':<15}{'近似値':<15}{'相対誤差 (%)':<15}")
print("-" * 70)
for L in L_sample:
# データから該当する値を検索
exact_value = next((exact for L_val, p_val, exact in data if L_val == L and p_val == p), None)
if exact_value is not None:
predicted = approximation(L, p, optimal_alpha, optimal_beta)
error_percent = 100 * abs(predicted - exact_value) / exact_value
print(f"{L:<10}{exact_value:<15.2f}{predicted:<15.2f}{error_percent:<15.2f}")
# 結果のプロット
plot_results(data, optimal_alpha, optimal_beta)
if __name__ == "__main__":
main()
出力
最適なパラメータ: alpha = 1.000486, beta = -0.398272
近似式の精度:
平均相対誤差: 0.19%
標準偏差: 0.29%
最大誤差: 1.61%
最小誤差: 0.01%
最適化された近似式: E[T] ≈ 1.0005 × (L/p) × (ln(L) + 0.5772) + -0.40
厳密値vs近似値の比較 (サンプル):
p = 0.1:
----------------------------------------------------------------------
L 厳密値 近似値 相対誤差 (%)
----------------------------------------------------------------------
10 291.93 287.72 1.44
100 5185.28 5184.51 0.01
1000 74851.47 74885.72 0.05
10000 978756.21 979231.29 0.05
p = 0.5:
----------------------------------------------------------------------
L 厳密値 近似値 相対誤差 (%)
----------------------------------------------------------------------
10 57.60 57.23 0.65
100 1035.38 1036.58 0.12
1000 14967.70 14976.83 0.06
10000 195747.73 195845.94 0.05
p = 0.9:
----------------------------------------------------------------------
L 厳密値 近似値 相対誤差 (%)
----------------------------------------------------------------------
10 31.56 31.62 0.19
100 574.28 575.70 0.25
1000 8313.95 8320.28 0.08
10000 108746.78 108803.12 0.05
よって、 近似値を求める際はこのようにすると良いだろう。
ただ、どちらの値もLが大きい時に、ほとんど計算には影響しないので、使わなくても良いだろう。
Claudeに教えてもらった内容
E[T] \approx \frac{L}{p}\ln{L} + O(1) の導出
理論的近似値 この近似式の導出を詳細に解説します。これは確率論における極値理論(Extreme Value Theory)の応用例です。
1. 問題の再確認
まず、問題設定を振り返ります:
- 長さ
のバッファがあるL - 各時刻で確率
で新しいデータがランダムに選ばれた位置に挿入されるp - バッファ内の最も古いデータの年齢
の期待値を求めたいT
2. 個々のスロットのデータ寿命分布
各バッファスロットは、時刻ごとに確率
ここで
この幾何分布の期待値は:
3. 最大値の漸近的挙動
バッファ内の最も古いデータの年齢
この最大値の期待値の漸近的挙動を求めるために、極値理論を適用します。
4. 極値理論の適用
幾何分布は指数分布の離散版と見なせます。極値理論によると、独立同分布に従う確率変数の最大値の分布は、適切にスケーリングすると、極値分布(ここではタイプIのグンベル分布)に収束します。
幾何分布の最大値の期待値について、次の近似が成り立ちます:
ここで
5. 最終的な近似式
常数項をまとめて
という近似式が得られます。
6. 直感的な解釈
この近似式は直感的にも理解できます:
- 各スロット単体のデータの期待寿命は
\frac{L}{p} -
個の独立なスロットの最大値を取ると、対数因子L が掛かる\ln{L} - したがって、最も古いデータの期待年齢は
のオーダー\frac{L}{p}\ln{L}
関連する重要な定理
この導出に関連する重要な定理には以下があります:
-
フィッシャー・ティペット・グネデンコの定理(Fisher-Tippett-Gnedenko theorem):
独立同分布に従う確率変数の最大値の分布は、適切な正規化を行うと、極値分布の一つに収束します。 -
レンガーの定理(Rényi's theorem):
指数分布や幾何分布の最大値の漸近挙動を特徴づけます。 -
オーダー統計量の理論(Order Statistics Theory):
確率変数を順序付けした際の統計的性質を扱う理論です。
極値理論は、洪水や地震などの極端な事象のモデリング、金融リスク管理、待ち行列理論など、様々な応用分野で重要な役割を果たしています。
T になるように 確率 p を定める
期待生存長が 単に式変形をして、