👏

PyMC:打ち切りと切断

2023/07/08に公開

はじめに

今回は打ち切りと切断に関して紹介します。

ライブラリのインポート

import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import arviz as az
from copy import copy

import pymc as pm
import pytensor.tensor as pt


# Initialize random number generator
RANDOM_SEED = 8927
rng = np.random.default_rng(RANDOM_SEED)
az.style.use("arviz-darkgrid")

打ち切りと切断

打ち切り(censored)と切断(truncated)は欠損データの問題ですが、2つを混同しやすいので定義を図と数式で見ていきます。
このようなデータに対して普通の線形回帰を実行すると、勾配を過小評価してしまう可能性が高いことは見た目からもわかるかと思います。そのため、このようなデータに対しては通常切断回帰と打ち切り回帰(別名:トービット回帰)を実行することで、データのバイアスを除いた回帰係数を求めることができるようになります。
https://www.jstage.jst.go.jp/article/ojjams/24/1/24_1_129/_pdf

切断(truncated)

目的変数Yがある範囲から外れるデータを単に知らないというものです。コードで内容を確認しましょう。

# bounds外のデータは観測されていない
def truncate_y(x, y, bounds):
    keep = (y >= bounds[0]) & (y <= bounds[1])
    return (x[keep], y[keep])

# デモデータ
slope, intercept, σ, N = 1, 0, 2, 200
x = rng.uniform(-10, 10, N)
y = rng.normal(loc=slope * x + intercept, scale=σ)
bounds = [-5, 5]
xt, yt = truncate_y(x, y, bounds)

# plot
plt.plot(x, y, ".", c=[0.7, 0.7, 0.7])
plt.axhline(bounds[0], c="r", ls="--")
plt.axhline(bounds[1], c="r", ls="--")
plt.xlabel("x")
plt.ylabel("y")
plt.plot(xt, yt, ".", c=[0, 0, 0])

上記のコードを数式で表すと以下のようになりなります。

y^* = x\beta + \alpha \\ y = y^* \space (5 > y^* > -5)

打ち切り(censored)

目的変数Yがある範囲から外れる際に、境界の外側のデータを廃棄するのではなく、境界の値を測定値として記録します。こちらもコードで内容を確認しましょう。

# bounds外のデータは境界値に置き換えられる
def censor_y(x, y, bounds):
    cy = copy(y)
    cy[y <= bounds[0]] = bounds[0]
    cy[y >= bounds[1]] = bounds[1]
    return (x, cy)

# デモデータ
slope, intercept, σ, N = 1, 0, 2, 200
x = rng.uniform(-10, 10, N)
y = rng.normal(loc=slope * x + intercept, scale=σ)
bounds = [-5, 5]
xc, yc = censor_y(x, y, bounds)

# plot
plt.plot(x, y, ".", c=[0.7, 0.7, 0.7])
plt.axhline(bounds[0], c="r", ls="--")
plt.axhline(bounds[1], c="r", ls="--")
plt.xlabel("x")
plt.ylabel("y")
plt.plot(xc, yc, ".", c=[0, 0, 0])

上記のコードを数式で表すと以下のようになりなります。

y^* = x\beta + \alpha \\ y = \left\{ \begin{array}{ll} 5 & (y^* \ge 5) \\ y^* & (5 \ge y^* \ge -5) \\ -5 & (-5 \ge y^*) \end{array} \right.

PyMCでの分布

切断(truncated)分布

pm.Truncated()を使用するだけです。切断されており、下限値-1より小さい値は観測されていないことがわかります。

d = pm.Normal.dist(mu=0, sigma=1)
truncated_d = pm.Truncated.dist(d, lower=-1, upper=None)

samples = pm.draw(d, draws=1000)
truncated_samples = pm.draw(truncated_d, draws=1000)
plt.hist(samples, density=True, alpha=0.5, label="normal")
plt.hist(truncated_samples, density=True, alpha=0.5, label="truncated normal")
plt.legend()

打ち切り(censored)分布

pm.Censored()を使用するだけです。下限値-1より小さい値は全て-1として観測されていることがわかります。

d = pm.Normal.dist(mu=0, sigma=1)
censored_d = pm.Censored.dist(d, lower=-1, upper=None)

samples = pm.draw(d, draws=1000)
censored_samples = pm.draw(truncated_d, draws=1000)
plt.hist(samples, density=True, alpha=0.5, label="normal")
plt.hist(censored_samples, density=True, alpha=0.5, label="censored normal")
plt.legend()

回帰

上記のようなデータに対して以下の3つのアプローチを試します。3番目のアプローチは実際には打ち切りされており目的変数の値のみが観測されていないデータなので、Yに対して欠損値があるとして欠損値の問題として切断データをモデル化しています。この方法のメリットとしては、本来は情報を持っていないデータに対しても事後分布から真の値を推定することができることです。

  1. 切断回帰(truncated regression)
  2. 打ち切り回帰(censored regression)
  3. 欠損値を埋また打ち切り回帰(imputed censored regression)

通常の線形回帰

def normal_regression(x, y):
    with pm.Model() as model:
        # define prior
        slope = pm.Normal("slope", mu=0, sigma=1)
        intercept = pm.Normal("intercept", mu=0, sigma=1)
        σ = pm.HalfNormal("σ", sigma=1)
        
        pm.Normal("obs", mu=slope * x + intercept, sigma=σ, observed=y)
    return model

normal_model = normal_regression(xt, yt)

with normal_model:
    normal_idata = pm.sample(3000, 
                      tune=1000, 
                      chains=4,
                      idata_kwargs={"log_likelihood": True},
                      random_seed=42, 
                      target_accept=0.90, 
                      return_inferencedata=True)

az.summary(normal_idata, round_to=2)

切断回帰(truncated regression)

def truncated_regression(x, y, bounds):
    with pm.Model() as model:
        # define prior
        slope = pm.Normal("slope", mu=0, sigma=1)
        intercept = pm.Normal("intercept", mu=0, sigma=1)
        σ = pm.HalfNormal("σ", sigma=1)
        
        normal_dist = pm.Normal.dist(mu=slope * x + intercept, sigma=σ)
        pm.Truncated("obs", normal_dist, lower=bounds[0], upper=bounds[1], observed=y)
    return model
    
truncated_model = truncated_regression(xt, yt, bounds)

with truncated_model:
    truncated_idata = pm.sample(3000, 
                      tune=1000, 
                      chains=4,
                      idata_kwargs={"log_likelihood": True},
                      random_seed=42, 
                      target_accept=0.90, 
                      return_inferencedata=True)

az.summary(truncated_idata, round_to=2)

打ち切り回帰(censored regression)

def censored_regression(x, y, bounds):
    with pm.Model() as model:
        # define prior
        slope = pm.Normal("slope", mu=0, sigma=1)
        intercept = pm.Normal("intercept", mu=0, sigma=1)
        σ = pm.HalfNormal("σ", sigma=1)
        
        y_latent = pm.Normal.dist(mu=slope * x + intercept, sigma=σ)
        obs = pm.Censored("obs", y_latent, lower=bounds[0], upper=bounds[1], observed=y)

    return model
    
censored_model = censored_regression(xc, yc, bounds)

with censored_model:
    censored_idata = pm.sample(3000, 
                      tune=1000, 
                      chains=4,
                      idata_kwargs={"log_likelihood": True},
                      random_seed=42, 
                      target_accept=0.90, 
                      return_inferencedata=True)

az.summary(censored_idata, round_to=2)

欠損値を埋また打ち切り回帰(imputed censored regression)

# 右側打ち切りされたデータ数
n_right_censored = sum(yc >= bounds[1])
# 左側打ち切りされたデータ数
n_left_censored = sum(yc <= bounds[0])
# 実際の値を観測したデータ数
n_observed = len(yc) - n_right_censored - n_left_censored

# 打ち切りされたデータ
xc_right_censored = xc[yc >= bounds[1]]
xc_left_censored = xc[yc <= bounds[0]]
# 打ち切りされていないデータ
xc_uncensored = xc[(yc > bounds[0]) & (yc < bounds[1])]
yc_uncensored = yc[(yc > bounds[0]) & (yc < bounds[1])]
def imputed_censored_regression(x, y, bounds):
    low = bounds[0]
    high = bounds[1]
    with pm.Model() as model:
        # define prior
        slope = pm.Normal("slope", mu=0, sigma=1)
        intercept = pm.Normal("intercept", mu=0, sigma=1)
        σ = pm.HalfNormal("σ", sigma=1)
        
        # 右側打ち切り分布
        # 右側打ち切りされたデータの数だけ生成
        # 初期値をinitvalで設定
        right_censored = pm.Normal(
            "right_censored",
            slope * xc_right_censored + intercept,
            σ,
            transform=pm.distributions.transforms.Interval(high, None),
            shape=int(n_right_censored),
            initval=np.full(n_right_censored, high + 1),
        )
        # 左側打ち切り分布
        # 左側打ち切りされたデータの数だけ生成
        # 初期値をinitvalで設定
        left_censored = pm.Normal(
            "left_censored",
            slope * xc_left_censored + intercept,
            σ ,
            transform=pm.distributions.transforms.Interval(None, low),
            shape=int(n_left_censored),
            initval=np.full(n_left_censored, low - 1),
        )
        
        obs = pm.Normal("obs", mu=slope * xc_uncensored + intercept, sigma=σ, observed=yc_uncensored, shape=int(n_observed))
        
    return model
    
imputed_censored_model = imputed_censored_regression(xc, yc, bounds)

with imputed_censored_model:
    imputed_censored_idata = pm.sample(3000, 
                      tune=1000, 
                      chains=4,
                      idata_kwargs={"log_likelihood": True},
                      random_seed=42, 
                      target_accept=0.90, 
                      return_inferencedata=True)

az.summary(imputed_censored_idata, round_to=2)

結果の比較

真の係数の値は1なのですが、通常の線形回帰ではかなり低めに見積もられていることがわかります。それに対して、他の3つのアプローチでは真の係数の値に近い値を推定できています。また、今回のケースでは、切断回帰より打ち切りデータの方が真の値に近い値を推定できています。これはケースバイケースですが、XとYの両方のデータがない切断データより、打ち切りデータの方は目的変数Yの値のみが欠損しているだけなので、より良い推定値が得られているようです。

fig, ax = plt.subplots(1, 4, figsize=(10, 5), sharex=True)

az.plot_posterior(normal_idata, var_names=["slope"], ref_val=slope, ax=ax[0])
ax[0].set(title="normal regression\n(truncated data)", xlabel="slope")

az.plot_posterior(truncated_idata, var_names=["slope"], ref_val=slope, ax=ax[1])
ax[1].set(title="Truncated regression\n(truncated data)", xlabel="slope")

az.plot_posterior(censored_idata, var_names=["slope"], ref_val=slope, ax=ax[2])
ax[2].set(title="Censored regression\n(censored data)", xlabel="slope");

az.plot_posterior(imputed_censored_idata, var_names=["slope"], ref_val=slope, ax=ax[3])
ax[3].set(title="Imputed Censored regression\n(censored data)", xlabel="slope");

最後に

今回は打ち切りと切断を紹介し、それぞれの定義やpymcでの扱い方、欠損値の問題としての考え方を理解できました。他のトピックとしては、上限値や下限値がわからない場合はその値にも事前分布をおくこともできます。

Discussion