🧠

【Tensorflow】自作ゲームの強化学習をする②

2024/08/06に公開

前回は学習環境(ゲーム)を作成しましたので、今回は学習ループを作っていきます。ライブラリはTensorflowを使って書きます。

学習ループ

学習ループのコードはこちらの公式チュートリアルを参考に書いていきます。学習アルゴリズムはAC学習です。AC学習とはActor-Criticのことを指していて直訳すると俳優と評論家です。Actor(俳優)がゲームをプレイ(行動)し、Critic(評論家)がプレイヤーが取った行動を評価して学習していきます。ですので今回のNNの出力層は2つとなっています。

新しくLearning.pyを作ってコードを書いていきます。

Learning.py
import FlappyBird as fb

import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import keras
import tqdm

import time
from tensorflow.keras import layers, backend
from typing import Any, List, Sequence, Tuple

env = fb.FlappyBird()
num_actions = env.action_space.n
num_hidden_units = 128
file_name = "AC_FB"

record = {
    "reward_mean": []
}

class ActorCritic(tf.keras.Model):
    def __init__(self, num_actions: int, num_hidden_units: int):
        super(ActorCritic, self).__init__()

        self.num_actions: int = num_actions
        self.num_hidden_units: int = num_hidden_units

        self.common = layers.Dense(num_hidden_units, activation="relu")
        self.actor = layers.Dense(num_actions)
        self.critic = layers.Dense(1)

    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        x = self.common(inputs)
        return self.actor(x), self.critic(x)
    
    def get_config(self):
        config = {
            "num_actions": self.num_actions,
            "num_hidden_units": self.num_hidden_units,
        }
        return config


def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    state, reward, done, _ = env.step(action)
    return (state.astype(np.float32), np.array(reward, np.int32), np.array(done, np.int32))

def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
  return tf.numpy_function(env_step, [action], 
                           [tf.float32, tf.int32, tf.int32])

def run_episode(
    initial_state: tf.Tensor,  
    model: tf.keras.Model, 
    max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
    """トレーニングデータを集めるために一つのエピソードを進める"""

    action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

    death_ = tf.TensorArray(dtype=tf.string, size=0, dynamic_size=True, clear_after_read=False)

    initial_state_shape = initial_state.shape
    state = initial_state

    for t in tf.range(max_steps):
        # ステートをバッチ化されたテンソルデータに変換する (batch size = 1)
        state = tf.expand_dims(state, 0)

        # モデルを実行し、アクションの確率と評論家の値を取得する
        action_logits_t, value = model(state)

        # 確率分布による次のアクションのサンプル
        action = tf.random.categorical(action_logits_t, 1)[0, 0]
        action_probs_t = tf.nn.softmax(action_logits_t)

        # 評論家の値を保管する
        values = values.write(t, tf.squeeze(value))

        # 選択されたアクションの確率をログに保存する
        action_probs = action_probs.write(t, action_probs_t[0, action])

        # 次の状態と報酬を取得するためにアクションを環境に適用する
        state, reward, done = tf_env_step(action)
        state.set_shape(initial_state_shape)

        # 報酬を保管する
        rewards = rewards.write(t, reward)

        if tf.cast(done, tf.bool):
            break

    action_probs = action_probs.stack()
    values = values.stack()
    rewards = rewards.stack()

    return action_probs, values, rewards

def get_expected_return(
    rewards: tf.Tensor, 
    gamma: float, 
    standardize: bool = True) -> tf.Tensor:

    n = tf.shape(rewards)[0]
    returns = tf.TensorArray(dtype=tf.float32, size=n)

    # Start from the end of `rewards` and accumulate reward sums
    # into the `returns` array
    rewards = tf.cast(rewards[::-1], dtype=tf.float32)
    discounted_sum = tf.constant(0.0)
    discounted_sum_shape = discounted_sum.shape
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum
        discounted_sum.set_shape(discounted_sum_shape)
        returns = returns.write(i, discounted_sum)
    returns = returns.stack()[::-1]

    if standardize:
        returns = ((returns - tf.math.reduce_mean(returns)) / 
                (tf.math.reduce_std(returns) + eps))

    return returns

def compute_loss(action_probs: tf.Tensor, values: tf.Tensor, returns: tf.Tensor) -> tf.Tensor:
    """Actor-Criticの損失を算出する"""
    advantage = returns - values

    action_log_probs = tf.math.log(action_probs)
    actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)

    critic_loss = huber_loss(values, returns)

    return actor_loss + critic_loss

@tf.function
def train_step(
    initial_state: tf.Tensor, 
    model: tf.keras.Model, 
    optimizer: tf.keras.optimizers.Optimizer, 
    gamma: float, 
    max_steps_per_episode: int) -> tf.Tensor:
    """Runs a model training step."""

    with tf.GradientTape() as tape:

        # モデルを1エピソード(max:max_steps_per_episode)実行してトレーニングデータを収集する
        action_probs, values, rewards = run_episode(
            initial_state, model, max_steps_per_episode) 

        # 期待される報酬を計算する
        returns = get_expected_return(rewards, gamma)

        # トレーニングデータを適切なTF形式のデータに変換する
        action_probs, values, returns = [
            tf.expand_dims(x, 1) for x in [action_probs, values, returns]] 

        # ネットワークを更新するために損失を計算する
        loss = compute_loss(action_probs, values, returns)

    # 損失から勾配を計算する
    grads = tape.gradient(loss, model.trainable_variables)

    # モデルのパラメーターに勾配を適用する
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    episode_reward = tf.math.reduce_sum(rewards)

    return episode_reward

from IPython import display as ipythondisplay
from PIL import Image
def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int): 
    env.render_set_up()
    env.human = False
    state, info = env.reset()
    state = tf.constant(state, dtype=tf.float32)
    screen = env.render()
    images = [Image.fromarray(screen)]

    for i in range(1, max_steps+1):
        #print(state)
        done = False
        
        state = tf.expand_dims(state, 0)
        action_probs, _ = model(state)
        action = np.argmax(np.squeeze(action_probs))

        state, reward, done, info = env.step(action)
        #print(action)
        state = tf.constant(state, dtype=tf.float32)

        if i % 5 == 0:
            screen = env.render()
            images.append(Image.fromarray(screen))

        if done:
            break
    return images

import json
def Output_record(dir):
    f = open(dir, "w")
    json.dump(record, f, ensure_ascii=False)

if __name__ == "__main__":
    seed = 314
    np.random.seed(seed)
    tf.random.set_seed(seed)

    eps = np.finfo(np.float32).eps.item()

    model = ActorCritic(num_actions, num_hidden_units)

    huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

    min_episodes_criterion = 100
    max_episodes = 4000  # 最大で何回学習するか
    max_steps_per_episode = 5000  # 1エピソードごとに何回行動できるか

    reward_threshold = 4000  # これ以上のスコアが出れば学習成功とみなす
    running_reward = 0

    # The discount factor for future rewards
    gamma = 0.9

    # Keep the last episodes reward
    episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)
    
    #Count of desired score
    keep_desired = 0
    count_keep_desired = 10  # reward_threshold以上のスコアが何回続けば学習を終了するか

    """学習ループ"""
    t = tqdm.trange(max_episodes)
    for i in t:
        initial_state, info = env.reset()
        initial_state = tf.constant(initial_state, dtype=tf.float32)
        episode_reward = int(train_step(initial_state, model, optimizer, gamma, max_steps_per_episode))
        cod = env.couse_of_death
        
        episodes_reward.append(episode_reward)
        running_reward = statistics.mean(episodes_reward)   #1エピソードの報酬の平均
        record["reward_mean"].append(running_reward)

        t.set_postfix(episode_reward=episode_reward, running_reward=running_reward)

        # Show the average episode reward every 10 episodes
        if i % 10 == 0:
            pass # print(f'Episode {i}: average reward: {avg_reward}')

        if running_reward >= reward_threshold:
            keep_desired += 1
        else:
            keep_desired = 0

        if keep_desired >= count_keep_desired and i >= min_episodes_criterion:  
            break
    print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

    render_env = fb.FlappyBird()
    images = render_episode(render_env, model, max_steps_per_episode)
    image_file = f"{file_name}.gif"
    images[0].save(image_file, save_all=True, append_images=images[1:], loop=0, duration=10)
    print(f"saved {file_name}.gif")

    Output_record("./output.json")
    print(f"saved record.")

    del model

    print("\nfinish code.")

このコードを実行すると学習が始まります。最低100回は学習が進み、スコア4000以上10回続いたところで学習が成功したとみなし学習を終了します
その後、その学習済みのモデルでテストを行い、画面をgif画像として出力しています。もちろんテストでは学習を行わずに単に行動を選択しているだけです。
また、学習中の1エピソード毎の報酬を記録し、Json形式で書き出しています。これをグラフで表示させることでモデルの学習の推移をみることができます。

出力されたGif画像

ニューラルネットワーク(NN)の保存と復元

Gif画像で学習結果をいつでも見ることができるとはいえ、これでは実際にゲームに組み込む時、モデル自体を再現できないため毎回学習する必要があります。これでは不便でなりません。そのためモデルを保存するコードを追加します。
通常以下のコードで簡単に実装することができます。

model.save('my_model')

しかし今回はACの勾配の計算や最適化処理をつくるため、すべてカスタムで作りました。そのため、NNの構造もカスタムです。ですのでこれでは保存ができません。
ではどのように実装すればよいでしょうか?答えは簡単でモデルの重みアーキテクチャの2つのファイルに分けて保存します。
以下のコードを追加してください。

Learning.py
if __name__ == "__main__":
・・・
    print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

+   model.save_weights(filepath="model/fbWeight.weights.h5")
+   model_json = model.to_json()
+   f = open("./model/fbArchitecture.json", "w")
+   json.dump(model_json, f, ensure_ascii=False)
・・・

これによりモデルの完全な保存に成功します。
また、モデルのロードは以下のコードで実装できます。

model_open = open("./model/fbArchitecture.json", "r")
model_load = json.load(model_open)
model_new = tf.keras.models.model_from_json(model_load)
model_new.compile()
model_new.load_weights("./model/fbWeight.weights.h5")

これにより実際のモデルの復元にかかる時間が大幅にカットされました。

最後に

この記事では非常に簡単なゲームを自作してAI学習をしました。2024年現在、AIを紹介する記事は星の数ほどありますが、この記事のように具体的な手法やチュートリアルはそう多くありません。現状公式のチュートリアルを見ることぐらいがAI作成の手法について学ぶ方法だと私は考えています。この記事で作成したAIは少し奇妙な行動をしていて人間らしくありません。その原因にはニューラルネットワークが不完全であることや報酬システムに何らかの問題があることが考えられます。AI作成ではそこが一番の肝となります。ですので実験を重ねながらその問題を修正するスキルを身に着けることがAI作成を上達させるカギとなります。
この記事にあるコードを修正してみることで効果的な学習ができるかもしれません。

参考記事

https://www.tensorflow.org/guide/keras/save_and_serialize?hl=ja#weights_only_saving_in_savedmodel_format
https://developers.agirobots.com/jp/openai-gym-custom-env/

Discussion