🎲
マルコフ連鎖モンテカルロを用いたベイズ推定
はじめに
- 以前の記事 でメトロポリス・ヘイスティング法のアルゴリズムの紹介と詳細釣り合い条件が成立することの証明を行なった
- 折角記事にしたので, 実装も行いたいと思う. なお, 今回はPythonで実装するが, アルゴリズムを追うためにライブラリなどは極力用いないようにする.
- 題材として森賀・木田・須山(2022) 1.4.8章の問題を取り上げ実際にシュミレーションを行なった.
メトロポリス-ヘイスティング法のアルゴリズム
前回の記事からの再掲であるが, メトロポリス-ヘイスティング法のアルゴリズムは以下のようになる.
- 候補先
を提案分布\mathbf{x}' からサンプリングする.Q(\mathbf{x}, \mathbf{x'}) -
を受理する確率を以下のように定義する.\mathbf{x}'
-
に従ってr を受理するか否かを決定する.\mathbf{x}' -
を受理した場合は\mathbf{x}' を新たな状態として採用する. そうでない場合は\mathbf{x}' を新たな状態として採用する.\mathbf{x}
問題設定
- 森賀・木田・須山(2022) 1.4.8章の問題設定と同じものを用いる. (以下, 一部改変し引用)
- なお, 使用するアルゴリズムが引用部分の章とは異なるので注意すること
- 参考文献では対数事後分布を用いているが, 今回はメトロポリス・ヘイスティング法をスクラッチで書くことが目的なので事後分布を用いる.
- 参考文献では一様分布からサンプリングを行っているが, 今回は正規分布からサンプリングを行う.
事前分布の確認
- 事前分布を確認する
import matplotlib.pyplot as plt
from scipy.stats import norm
prior_mu = np.float64(0.0)
prior_sigma = np.float64(10.0)
x = np.arange(-20, 20, 0.1)
y = norm.pdf(x=x, loc=prior_mu, scale=prior_sigma)
plt.plot(x, y)
データの生成
- 問題設定で書いたようなデータを生成する.
import numpy as np
import matplotlib.pyplot as plt
true_mu = np.float64(0.5)
true_sigma = np.float64(0.1)
data_size = 100
data = np.random.normal(
loc=true_mu,
scale=true_sigma,
size=100
)
plt.hist(data)
np.random.normal
の戻り値, について
補足: -
公式 Documentには以下のように記載されている.
out ndarray or scalar
- これでは意味不明であるので, コードを書き確認した.
- 私が確認したもの
- size=1の場合は
float64
型の要素を持つndarray - size>1の場合は
float64
型の要素を持つndarray
- size=1の場合は
-
Y.Sakiさんからのコメントをいただきもっと複雑なことが判明
- コメントを元に以下のコードで確認を行った
- Google Colab 2023年8月22日に下記を実行した
戻り値の確認を行うコード
import numpy as np
def test_numpy_random_normal() -> None:
# size なしだと float (np.float64ではない)
expect_float_size_not_exist = np.random.normal(0, 1)
assert type(expect_float_size_not_exist) != np.float64
assert type(expect_float_size_not_exist) == float
# loc, scale に合わせた array
expect_np_ndarray_size_none = np.random.normal(
loc=[0, 1, 2],
scale=[1, 0.5, 0.1],
size=None
)
assert type(expect_np_ndarray_size_none) == np.ndarray
assert type(expect_np_ndarray_size_none[0]) == np.float64
#長さ2の array
expect_np_ndarray_size_int = np.random.normal(
loc=0,
scale=1,
size=2
)
assert type(expect_np_ndarray_size_int) == np.ndarray
assert type(expect_np_ndarray_size_int[0]) == np.float64
# shape が 2x3 の array
expect_np_ndarray_size_tuple = np.random.normal(
loc=0,
scale=1,
size=(2,3)
)
assert type(expect_np_ndarray_size_tuple) == np.ndarray
assert type(expect_np_ndarray_size_tuple[0]) == np.ndarray
assert type(expect_np_ndarray_size_tuple[0][0]) == np.float64
test_numpy_random_normal()
提案分布の実装
今回は簡単にするために, 提案分布
def generate_proposal_mu(x: np.float64) -> np.float64:
if type(x) != np.float64:
raise TypeError("input type of x must be np.float64", type(x))
return np.random.normal(loc=x, scale=1.0, size=1)[0]
P(\mathbf{x'}), P(\mathbf{x}) の計算の実装
- 事前分布の場合一次元の配列が戻るが,
norm.pdf
が返すのは一次元の配列なので,sum
を取ることでnp.float64にする.
from scipy.stats import norm
import numpy as np
def calc_posterior_at_point(
point: np.float64,
data: np.ndarray,
sigma: np.float64,
prior_mu: np.float64,
prior_sigma: np.float64
) -> np.float64:
likelihood: np.float64 = norm.pdf(data, loc=point, scale=1).sum()
prior: np.float64 = norm.pdf(x=point, loc=prior_mu, scale=prior_sigma).sum()
return likelihood * prior
シュミレーション全体の実装
- sampleの統計量にはburn-in区間が含まれないようにしている.
def leaning(
num_of_iter: int = 1000,
burn_in: int = 100,
point_init: np.float64 = np.float64(0.0),
true_sigma: np.float64 = np.float64(1.0),
prior_mu: np.float64 = np.float64(0.0),
prior_sigma: np.float64 = np.float64(1.0),
data: np.ndarray = np.array([0.0])
) -> np.ndarray:
num_of_accepts = 0
sample = []
point_current = point_init
for i in range(num_of_iter):
if i % 10000 == 0 and 10000 <= i:
print("[ learning ]: iteration: ", str(i), point_current)
point_proposed: np.float64 = generate_proposal_mu(x=point_current)
posterior_point_current = calc_posterior_at_point(
point=point_current,
data=data,
sigma=true_sigma,
prior_mu=prior_mu,
prior_sigma=prior_sigma
)
posterior_point_proposed = calc_posterior_at_point(
point=point_proposed,
data=data,
sigma=true_sigma,
prior_mu=prior_mu,
prior_sigma=prior_sigma
)
r = np.random.rand()
# conditionの部分が長くなるので変数化. Qは対称性のため不要
ratio = posterior_point_proposed / posterior_point_current
if ratio > r:
# 受容(accept)する場合
point_current = point_proposed
sample.append(point_current)
num_of_accepts += 1
else:
# 棄却(reject)する場合
sample.append(point_current)
print("[ learning ]: done!")
prob_accepts = round(num_of_accepts / num_of_iter, 4)
print("[ summary ]: Accept Ratio =>", prob_accepts)
sample_removed_burn_in = sample[burn_in:]
mean_of_sample = sum(sample_removed_burn_in) / len(sample_removed_burn_in)
var_of_sample = sum([(s - mean_of_sample)**2 for s in sample_removed_burn_in]) / len(sample_removed_burn_in)
print("[ summary ]: Sample Mean =>", mean_of_sample)
print("[ summary ]: Sample Var =>", var_of_sample)
return np.array(sample)
実験
- 実装ができたので, 実験を行う.
- 初期値はburn-in区間を確認するために大きめに設定している.
- 得られたサンプルの平均と分散にはburn-in区間の除外をしている.
from typing import Final
# 事前分布のパラメータ
class Prior:
prior_mu: Final[np.float64] = np.float64(0.0)
prior_sigma: Final[np.float64] = np.float64(10.0)
prior = Prior()
# データ
class DataSet:
true_mu: Final[np.float64] = np.float64(3.0) # 未知
true_sigma: Final[np.float64] = np.float64(1.0) # 既知
data_size: Final[int]= 100
data: np.ndarray = np.random.normal(
loc=true_mu,
scale=true_sigma,
size=100
)
dataset = DataSet()
# 学習の設定
class LearningSetting:
num_of_iter: Final[int] = 100000
burn_in: Final[int] = 1000
point_init: Final[np.float64] = np.float64(10.0)
learning_setting = LearningSetting()
mh_sample = leaning(
num_of_iter=learning_setting.num_of_iter,
burn_in=learning_setting.burn_in,
point_init=learning_setting.point_init,
true_sigma=dataset.true_sigma,
prior_mu=prior.prior_mu,
prior_sigma=prior.prior_sigma,
data=dataset.data
)
出力
[ learning ]: iteration: 10000 1.697439828599817
[ learning ]: iteration: 20000 0.7404421913663297
[ learning ]: iteration: 30000 2.2163519576496005
[ learning ]: iteration: 40000 4.742790481555529
[ learning ]: iteration: 50000 3.0739867212709924
[ learning ]: iteration: 60000 3.82706682313065
[ learning ]: iteration: 70000 4.858460010771224
[ learning ]: iteration: 80000 1.3750802697037912
[ learning ]: iteration: 90000 1.6392568111494752
[ learning ]: done!
[ summary ]: Accept Ratio => 0.7827
[ summary ]: Sample Mean => 3.078533590108084
[ summary ]: Sample Var => 1.8797483282234986
結果の確認
- トレースプロットを確認する. トレースプロットはサンプルの推移を確認するためのプロットである.
- 初期値を大きめに設定したので, burn-in区間の存在が確認できる.
import matplotlib.pyplot as plt
y = np.arange(0, learning_setting.num_of_iter, 1)
plt.plot(y, mh_sample)
- 事後密度を確認する
- 事後分布は実は共役事前分布から正規分布だということがわかっている
- 実際にプロットしてみたところ, 事後分布が正規分布のような形になっていることがわかる
import seaborn as sns
sns.kdeplot(data=mh_sample[learning_setting.burn_in:])
Discussion
わかりやすい記事をありがとうございました.
np.random.normal
の戻り値について補足情報があるのでよろしければ御覧ください.戻り値
out ndarray or scalar
はsize
が1かそれ以上かではなくNone
(default値) か整数or整数のタプルかによって変わります.size
を指定しないかNone
の場合,ndarray ではなく数値のfloat
が返ります.loc
とscale
に合わせられます.ここには array 値を指定することもでき,この場合は同じ shape の ndarray が返ります.size
の ndarray が返ります(使い道はないですが0でも長さゼロのベクトルarray([])
となります).shape
の ndarray になります.貴重なご意見ありがとうございます.
内容ですが, 大変参考になりました. 中々すごい仕様ですね... 手元で確認し, 記事に反映させていただければと思います. 🙇♂️