決定木ってなに...?
記事を書いた経緯
最近kaggleのコンペで機械学習にちょっと手を出しておりその関係で、LightGBMというモデルを使用している。
初心者にはよく使われているらしく確かに、原理をあまり理解していなくてもなんとなくパラメータをポチポチ設定をしてなんとなく使用しても、意外と使うことができる。
しかも大してスペックが良くないPCでもそんなに時間がかからない。(複雑なことをしていないからだと思うが数十秒)
これだけ軽いなら、ちょっとした機械学習による分析や異常検知や傾向予測をアプリとかに組み込めそうだなと思いつつどんなものか調べてみた。
[参考記事]
いわく「決定木ベースの勾配ブースティング手法(Gradient Boosting Dicision Tree)」...🤔
正直前提知識がないのでなんのことかわからず、いろんなサイトを見ても要領をえない...決定木ってなんか聞いたことはあるな...という感じだった。
とりあえず簡単なものを作ってみて大まかな原理を知ろうと思ってpythonで決定木のコードを作ろうと思ったのがきっかけ。
決定木のコード
gptに作ってもらったコードをとりあえずprint文を駆使してなんとか呼ばれる順番や中の処理を探ってみました。
[コード]
import numpy as np
# サンプルデータ
# 特徴量 (X1, X2) とラベル (y)
X = np.array([[2, 3],
[10, 15],
[4, 5],
[8, 9],
[3, 4]
])
y = np.array([0, 1, 0, 1, 0]) # 2つのクラス: 0または1
def create_node(feature=None, threshold=None, left=None, right=None, value=None):
return {
'feature': feature,
'threshold': threshold,
'left': left,
'right': right,
'value': value
}
def fit(X, y, max_depth=None):
print("最初にまずfitが呼ばれて次にgrow_treeが深度0,最大深度3で呼ばれる")
def fit(X, y, max_depth=None, n_estimators=100, learning_rate=0.1):
print("最初にまずfitが呼ばれて次にgrow_treeが呼ばれる")
# 初期予測を全体の平均ラベルで設定
predictions = np.full(y.shape, np.mean(y))
for _ in range(n_estimators):
# 残差を計算
residuals = y - predictions
# 残差に基づいて新しい決定木を訓練
tree = grow_tree(X, residuals, depth=0, max_depth=max_depth)
# 新しい決定木の予測を取得
tree_predictions = predict(tree, X)
# 学習率を考慮して予測を更新
predictions += learning_rate * tree_predictions
return predictions
return grow_tree(X, y, depth=0, max_depth=max_depth)
def grow_tree(X, y, depth=0, max_depth=None):
num_samples, num_features = X.shape
print(f"grow_treeの中で{num_samples}が0でないかもしくは{depth}が{max_depth}に到達してないかどうか確認する")
if num_samples == 0 or (max_depth is not None and depth >= max_depth):
print(f"{depth}が最大深度{max_depth}まで到達した場合nodeを作成する。この時most_common_labelをラベルデータ{y}で呼んでcreate_nodeの戻り値を返す")
return create_node(value=most_common_label(y))
print(f"grow_treeの中で最大深度に達していてもしなくてもbest_splitを特徴量{X},ラベル{y},特徴量のshape{num_features}で呼び、結果の{best_split(X, y, num_features)}を変数に格納")
best_feature, best_threshold = best_split(X, y, num_features)
if best_feature is None:
print(f"サンプル数が少なかったりすべてのラベルが同じだったりしてよい分割ができなかった場合(best_feature:{best_feature})")
print(f"create_nodeをラベル{y}のなかの最も頻出するラベル{most_common_label(y)}で呼んで戻り値を返す。")
return create_node(value=most_common_label(y))
left_indices = X[:, best_feature] < best_threshold
right_indices = X[:, best_feature] >= best_threshold
print(f"どちらかの特徴量{X[:, best_feature]}が閾値より小さい場合は左、大きい場合は右へ移動する。")
print("深度を1プラスしながらgrow_treeを再帰的に呼んで、かつcreate_nodeも返す。")
left_child = grow_tree(X[left_indices], y[left_indices], depth + 1, max_depth)
right_child = grow_tree(X[right_indices], y[right_indices], depth + 1, max_depth)
return create_node(best_feature, best_threshold, left_child, right_child)
def best_split(X, y, num_features):
best_gain = -1
best_feature = None
best_threshold = None
for feature in range(num_features):
thresholds = np.unique(X[:, feature])
print(thresholds,":特徴量内のすべてなユニークな値を閾値に設定する。")
for threshold in thresholds:
left_indices = X[:, feature] < threshold
right_indices = X[:, feature] >= threshold
print("閾値と比較して小さいなら左、大きいなら右に値を入れる")
if len(np.unique(y[left_indices])) == 0 or len(np.unique(y[right_indices])) == 0:
print("もうサンプル特徴量が無くなってしまった場合に次の閾値に移る。")
continue
gain = information_gain(y, y[left_indices], y[right_indices])
if gain > best_gain:
print("情報利得がこれまでのbest_gainより大きいならbest_gainをgain,best_featureをfeature,best_thresholdをthresholdにする。")
best_gain = gain
best_feature = feature
best_threshold = threshold
return best_feature, best_threshold
def information_gain(parent, left_child, right_child):
p = float(len(left_child)) / (len(left_child) + len(right_child))
print(f"information_gain情報利得...親を分割したときこれで良かったのかどうかのための情報を得る...親{parent},左{left_child},右{right_child}")
print(p,"""p:左の子ノードのサンプル数が全体のサンプル数に対してどの程度の割合を占めるか。左ノードのが全体に与える影響を示す。""")
print("""
情報利得を計算して返す。具体的には、親ノードのエントロピーから、左ノードと右ノードのエントロピーの影響を引いています。
親ノードのエントロピー: 分割前の不確実性。
左ノードの影響: 左ノードのサンプル数の割合(p)とそのエントロピーの積。
右ノードの影響: 右ノードのサンプル数の割合(1 - p)とそのエントロピーの積。
""")
return entropy(parent) - p * entropy(left_child) - (1 - p) * entropy(right_child)
def entropy(y):
class_labels, counts = np.unique(y, return_counts=True)
print("""
np.unique()関数を使用→yに含まれるユニークなラベル(class_labels)
それぞれのラベルの出現回数(counts)
各ラベルの頻度を出す。
""")
probabilities = counts / len(y)
print(probabilities,"ラベルの頻度をラベルの数で割ってラベルの出現割合を出力する。")
print("""
エントロピーの公式の計算
各ラベルの確率に対して、対数を取り、その結果を合計
最終的に、マイナスの符号を付けてエントロピーの値を返す。
1e-10は、対数でゼロ割りを避けるための極小の値。
""")
return -np.sum(probabilities * np.log(probabilities + 1e-10))
def most_common_label(y):
print(f"渡ってきたラベルデータ{y}のリストから最も多い回数登場しているラベル{np.bincount(y).argmax()}を返す。これは分割される際に一つのノード内に複数のラベルが含まれる可能性があるため",)
return np.bincount(y).argmax()
def predict(tree, X):
print(f"""
tree:
fit→grow_tree→create_nodeの戻り値
'feature'
'threshold'
'left'
'right'
'value'
inputs:
各特徴量のリストに対して順番にinputsに格納
これらを引数にして
single_predictを呼ぶ。"
""")
return np.array([single_predict(tree, inputs) for inputs in X])
def single_predict(node, inputs):
print(f"nodeのvalue{node['value']}を返す。")
while node['value'] is None:
if inputs[node['feature']] < node['threshold']:
print(f"リーフノードに到達するまで、閾値{node['threshold']}より特徴量{node['feature']}が小さい場合は左に、他は右にノードを進める。")
node = node['left']
else:
node = node['right']
print(f"リーフノード終了時に,{node['value']}というノードが持つクラスラベルが格納される。")
return node['value']
tree = fit(X, y, max_depth=3)
# テストデータに対する予測を行う
test_data = np.array([[3, 5], [8, 10], [1, 1]])
y_pred = predict(tree, test_data)
print(f"{y_pred}は特徴量を入れたときにリーフノードに到達した際のラベルの値")
# 予測結果を表示
for i in range(len(test_data)):
print(f'Test data: {test_data[i]}, Predicted label: {y_pred[i]}')
詰まった点&まとめ
葉を伸ばすか否かの部分でエントロピー、情報利得という概念が出てきた。化学で聞いたことはあったがこの場合ではカテゴリ分けされた時親より子の方がラベルの混在具合のようなものを調べている。
親より子のエントロピーが少なければ子の方が混在してない=情報利得が大きいということ。
情報利得が大きければその場合の閾値で枝を伸ばそう!という処理を関数で行っている。
この処理を再帰的に呼び出してどんどん分類を進めていっているというのが処理内容だった。
普段こういった部分にまで踏み込まないので、アルゴリズムの中身を触ることでとても勉強になりました。こういったことは定期的にやる予定です🙌
Discussion