🎲

マルコフ連鎖モンテカルロを用いたベイズ推定

2023/08/20に公開
2

はじめに

  • 以前の記事 でメトロポリス・ヘイスティング法のアルゴリズムの紹介と詳細釣り合い条件が成立することの証明を行なった
  • 折角記事にしたので, 実装も行いたいと思う. なお, 今回はPythonで実装するが, アルゴリズムを追うためにライブラリなどは極力用いないようにする.
  • 題材として森賀・木田・須山(2022) 1.4.8章の問題を取り上げ実際にシュミレーションを行なった.

メトロポリス-ヘイスティング法のアルゴリズム

前回の記事からの再掲であるが, メトロポリス-ヘイスティング法のアルゴリズムは以下のようになる.

  1. 候補先\mathbf{x}'を提案分布Q(\mathbf{x}, \mathbf{x'})からサンプリングする.
  2. \mathbf{x}'を受理する確率を以下のように定義する.
\begin{align} r = \min \left( 1, \frac{P(\mathbf{x'}) Q(\mathbf{x'} , \mathbf{x})}{P(\mathbf{x}) Q(\mathbf{x} ,\mathbf{x'})} \right) \end{align}
  1. rに従って\mathbf{x}'を受理するか否かを決定する.
  2. \mathbf{x}'を受理した場合は\mathbf{x}'を新たな状態として採用する. そうでない場合は\mathbf{x}を新たな状態として採用する.

問題設定

\begin{align} & \mu \sim \text{Norm}(\mu|\mu, \sigma)\\ & x_n | \mu \sim \text{Norm}(x|\mu, 1.0^2), \; n=1,\cdots 100\\ \end{align}
  • なお, 使用するアルゴリズムが引用部分の章とは異なるので注意すること
    • 参考文献では対数事後分布を用いているが, 今回はメトロポリス・ヘイスティング法をスクラッチで書くことが目的なので事後分布を用いる.
    • 参考文献では一様分布からサンプリングを行っているが, 今回は正規分布からサンプリングを行う.

事前分布の確認

  • 事前分布を確認する
import matplotlib.pyplot as plt
from scipy.stats import norm

prior_mu = np.float64(0.0)
prior_sigma = np.float64(10.0)

x = np.arange(-20, 20, 0.1)
y = norm.pdf(x=x, loc=prior_mu, scale=prior_sigma)
plt.plot(x, y)

データの生成

  • 問題設定で書いたようなデータを生成する.
\begin{align} & x_n | \mu \sim \text{Norm}(x|\mu, 1.0^2), \; n=1,\cdots 100\\ \end{align}
import numpy as np
import matplotlib.pyplot as plt

true_mu = np.float64(0.5)
true_sigma = np.float64(0.1)
data_size = 100

data = np.random.normal(
    loc=true_mu,
    scale=true_sigma,
    size=100
)

plt.hist(data)

補足: np.random.normalの戻り値, について

  • 公式 Documentには以下のように記載されている.
    • out ndarray or scalar
    • これでは意味不明であるので, コードを書き確認した.
  • 私が確認したもの
    • size=1の場合はfloat64型の要素を持つndarray
    • size>1の場合はfloat64型の要素を持つndarray
  • Y.Sakiさんからのコメントをいただきもっと複雑なことが判明
    • コメントを元に以下のコードで確認を行った
    • Google Colab 2023年8月22日に下記を実行した
戻り値の確認を行うコード
import numpy as np

def test_numpy_random_normal() -> None:
    # size なしだと float (np.float64ではない)
    expect_float_size_not_exist = np.random.normal(0, 1)
    assert type(expect_float_size_not_exist) != np.float64
    assert type(expect_float_size_not_exist) == float

    # loc, scale に合わせた array
    expect_np_ndarray_size_none = np.random.normal(
        loc=[0, 1, 2],
        scale=[1, 0.5, 0.1],
        size=None
    )
    assert type(expect_np_ndarray_size_none) == np.ndarray
    assert type(expect_np_ndarray_size_none[0]) == np.float64

    #長さ2の array
    expect_np_ndarray_size_int = np.random.normal(
        loc=0,
        scale=1,
        size=2
    )
    assert type(expect_np_ndarray_size_int) == np.ndarray
    assert type(expect_np_ndarray_size_int[0]) == np.float64

    # shape が 2x3 の array
    expect_np_ndarray_size_tuple = np.random.normal(
        loc=0,
        scale=1,
        size=(2,3)
    )
    assert type(expect_np_ndarray_size_tuple) == np.ndarray
    assert type(expect_np_ndarray_size_tuple[0]) == np.ndarray
    assert type(expect_np_ndarray_size_tuple[0][0]) == np.float64

test_numpy_random_normal()

提案分布の実装

今回は簡単にするために, 提案分布Q(\mathbf{x}, \mathbf{x'})を分散1の正規分布とする.

def generate_proposal_mu(x: np.float64) -> np.float64:
    if type(x) != np.float64:
        raise TypeError("input type of x must be np.float64", type(x))
    return np.random.normal(loc=x, scale=1.0, size=1)[0]

P(\mathbf{x'}), P(\mathbf{x})の計算の実装

  • 事前分布の場合一次元の配列が戻るが, norm.pdfが返すのは一次元の配列なので, sumを取ることでnp.float64にする.
from scipy.stats import norm
import numpy as np

def calc_posterior_at_point(
    point: np.float64,
    data: np.ndarray,
    sigma: np.float64,
    prior_mu: np.float64,
    prior_sigma: np.float64
) -> np.float64:
    likelihood: np.float64 = norm.pdf(data, loc=point, scale=1).sum()
    prior: np.float64 = norm.pdf(x=point, loc=prior_mu, scale=prior_sigma).sum()
    return likelihood * prior

シュミレーション全体の実装

  • sampleの統計量にはburn-in区間が含まれないようにしている.
def leaning(
    num_of_iter: int = 1000,
    burn_in: int = 100,
    point_init: np.float64 = np.float64(0.0),
    true_sigma: np.float64 = np.float64(1.0),
    prior_mu: np.float64 = np.float64(0.0),
    prior_sigma: np.float64 = np.float64(1.0),
    data: np.ndarray = np.array([0.0])
) -> np.ndarray:
    num_of_accepts = 0
    sample = []
    point_current = point_init
    for i in range(num_of_iter):
        if i % 10000 == 0 and 10000 <= i:
            print("[ learning ]: iteration: ", str(i), point_current)

        point_proposed: np.float64 = generate_proposal_mu(x=point_current)

        posterior_point_current = calc_posterior_at_point(
            point=point_current,
            data=data,
            sigma=true_sigma,
            prior_mu=prior_mu,
            prior_sigma=prior_sigma
        )
        posterior_point_proposed = calc_posterior_at_point(
            point=point_proposed,
            data=data,
            sigma=true_sigma,
            prior_mu=prior_mu,
            prior_sigma=prior_sigma
        )

        r = np.random.rand()

        # conditionの部分が長くなるので変数化. Qは対称性のため不要
        ratio = posterior_point_proposed / posterior_point_current
        if ratio > r:
            # 受容(accept)する場合
            point_current = point_proposed
            sample.append(point_current)
            num_of_accepts += 1
        else:
            # 棄却(reject)する場合
            sample.append(point_current)

    print("[ learning ]: done!")

    prob_accepts = round(num_of_accepts / num_of_iter, 4)
    print("[ summary ]: Accept Ratio =>", prob_accepts)

    sample_removed_burn_in = sample[burn_in:]
    mean_of_sample = sum(sample_removed_burn_in) / len(sample_removed_burn_in)
    var_of_sample = sum([(s - mean_of_sample)**2 for s in sample_removed_burn_in]) / len(sample_removed_burn_in)
    print("[ summary ]: Sample Mean =>", mean_of_sample)
    print("[ summary ]: Sample Var =>", var_of_sample)

    return np.array(sample)

実験

  • 実装ができたので, 実験を行う.
  • 初期値はburn-in区間を確認するために大きめに設定している.
  • 得られたサンプルの平均と分散にはburn-in区間の除外をしている.
from typing import Final

# 事前分布のパラメータ
class Prior:
    prior_mu: Final[np.float64] = np.float64(0.0)
    prior_sigma: Final[np.float64] = np.float64(10.0)
prior = Prior()


# データ
class DataSet:
    true_mu: Final[np.float64] = np.float64(3.0) # 未知
    true_sigma: Final[np.float64] = np.float64(1.0) # 既知
    data_size: Final[int]= 100

    data: np.ndarray = np.random.normal(
        loc=true_mu,
        scale=true_sigma,
        size=100
    )
dataset = DataSet()


# 学習の設定
class LearningSetting:
    num_of_iter: Final[int] = 100000 
    burn_in: Final[int] = 1000
    point_init: Final[np.float64] = np.float64(10.0)
learning_setting = LearningSetting()


mh_sample = leaning(
    num_of_iter=learning_setting.num_of_iter,
    burn_in=learning_setting.burn_in,
    point_init=learning_setting.point_init,
    true_sigma=dataset.true_sigma,
    prior_mu=prior.prior_mu,
    prior_sigma=prior.prior_sigma,
    data=dataset.data
)

出力

[ learning ]: iteration:  10000 1.697439828599817
[ learning ]: iteration:  20000 0.7404421913663297
[ learning ]: iteration:  30000 2.2163519576496005
[ learning ]: iteration:  40000 4.742790481555529
[ learning ]: iteration:  50000 3.0739867212709924
[ learning ]: iteration:  60000 3.82706682313065
[ learning ]: iteration:  70000 4.858460010771224
[ learning ]: iteration:  80000 1.3750802697037912
[ learning ]: iteration:  90000 1.6392568111494752
[ learning ]: done!
[ summary ]: Accept Ratio => 0.7827
[ summary ]: Sample Mean => 3.078533590108084
[ summary ]: Sample Var => 1.8797483282234986

結果の確認

  • トレースプロットを確認する. トレースプロットはサンプルの推移を確認するためのプロットである.
  • 初期値を大きめに設定したので, burn-in区間の存在が確認できる.
import matplotlib.pyplot as plt

y = np.arange(0, learning_setting.num_of_iter, 1)
plt.plot(y, mh_sample)

  • 事後密度を確認する
  • 事後分布は実は共役事前分布から正規分布だということがわかっている
  • 実際にプロットしてみたところ, 事後分布が正規分布のような形になっていることがわかる
import seaborn as sns

sns.kdeplot(data=mh_sample[learning_setting.burn_in:])

参考文献

Discussion

Y. SakiY. Saki

わかりやすい記事をありがとうございました.
np.random.normal の戻り値について補足情報があるのでよろしければ御覧ください.

戻り値 out ndarray or scalarsize が1かそれ以上かではなく None(default値) か整数or整数のタプルかによって変わります.

  • size を指定しないか None の場合,ndarray ではなく数値の float が返ります.
    • より正確には,パラメータの locscale に合わせられます.ここには array 値を指定することもでき,この場合は同じ shape の ndarray が返ります.
  • 1を含む非負整数の場合,長さ=size の ndarray が返ります(使い道はないですが0でも長さゼロのベクトル array([]) となります).
  • タプルの場合,対応した shape の ndarray になります.
>>> np.random.normal(0, 1) # size なしだと float
-0.4889771972920983
>>> np.random.normal([0, 1, 2], [1, 0.5, 0.1], size=None)
array([-0.3134048 , -0.27584239,  2.01921351]) # loc, scale に合わせた array
>>> np.random.normal(0, 1, size=2) #長さ2の array
array([-0.14941973,  1.2294889 ])
>>> np.random.normal(0, 1, size=(2,3)) # shape が 2x3 の array
array([[-0.28363238,  2.21540244, -0.32698727],
       [-1.36822202, -0.95087651, -0.71768491]])
shunsockshunsock

@Y.Saki さん

貴重なご意見ありがとうございます.
内容ですが, 大変参考になりました. 中々すごい仕様ですね... 手元で確認し, 記事に反映させていただければと思います. 🙇‍♂️