【Tensorflow】自作ゲームの強化学習をする②
前回は学習環境(ゲーム)を作成しましたので、今回は学習ループを作っていきます。ライブラリはTensorflowを使って書きます。
学習ループ
学習ループのコードはこちらの公式チュートリアルを参考に書いていきます。学習アルゴリズムはAC学習です。AC学習とはActor-Criticのことを指していて直訳すると俳優と評論家です。Actor(俳優)
がゲームをプレイ(行動)し、Critic(評論家)
がプレイヤーが取った行動を評価して学習していきます。ですので今回のNNの出力層は2つとなっています。
新しくLearning.py
を作ってコードを書いていきます。
import FlappyBird as fb
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import keras
import tqdm
import time
from tensorflow.keras import layers, backend
from typing import Any, List, Sequence, Tuple
env = fb.FlappyBird()
num_actions = env.action_space.n
num_hidden_units = 128
file_name = "AC_FB"
record = {
"reward_mean": []
}
class ActorCritic(tf.keras.Model):
def __init__(self, num_actions: int, num_hidden_units: int):
super(ActorCritic, self).__init__()
self.num_actions: int = num_actions
self.num_hidden_units: int = num_hidden_units
self.common = layers.Dense(num_hidden_units, activation="relu")
self.actor = layers.Dense(num_actions)
self.critic = layers.Dense(1)
def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
x = self.common(inputs)
return self.actor(x), self.critic(x)
def get_config(self):
config = {
"num_actions": self.num_actions,
"num_hidden_units": self.num_hidden_units,
}
return config
def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
state, reward, done, _ = env.step(action)
return (state.astype(np.float32), np.array(reward, np.int32), np.array(done, np.int32))
def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
return tf.numpy_function(env_step, [action],
[tf.float32, tf.int32, tf.int32])
def run_episode(
initial_state: tf.Tensor,
model: tf.keras.Model,
max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""トレーニングデータを集めるために一つのエピソードを進める"""
action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)
death_ = tf.TensorArray(dtype=tf.string, size=0, dynamic_size=True, clear_after_read=False)
initial_state_shape = initial_state.shape
state = initial_state
for t in tf.range(max_steps):
# ステートをバッチ化されたテンソルデータに変換する (batch size = 1)
state = tf.expand_dims(state, 0)
# モデルを実行し、アクションの確率と評論家の値を取得する
action_logits_t, value = model(state)
# 確率分布による次のアクションのサンプル
action = tf.random.categorical(action_logits_t, 1)[0, 0]
action_probs_t = tf.nn.softmax(action_logits_t)
# 評論家の値を保管する
values = values.write(t, tf.squeeze(value))
# 選択されたアクションの確率をログに保存する
action_probs = action_probs.write(t, action_probs_t[0, action])
# 次の状態と報酬を取得するためにアクションを環境に適用する
state, reward, done = tf_env_step(action)
state.set_shape(initial_state_shape)
# 報酬を保管する
rewards = rewards.write(t, reward)
if tf.cast(done, tf.bool):
break
action_probs = action_probs.stack()
values = values.stack()
rewards = rewards.stack()
return action_probs, values, rewards
def get_expected_return(
rewards: tf.Tensor,
gamma: float,
standardize: bool = True) -> tf.Tensor:
n = tf.shape(rewards)[0]
returns = tf.TensorArray(dtype=tf.float32, size=n)
# Start from the end of `rewards` and accumulate reward sums
# into the `returns` array
rewards = tf.cast(rewards[::-1], dtype=tf.float32)
discounted_sum = tf.constant(0.0)
discounted_sum_shape = discounted_sum.shape
for i in tf.range(n):
reward = rewards[i]
discounted_sum = reward + gamma * discounted_sum
discounted_sum.set_shape(discounted_sum_shape)
returns = returns.write(i, discounted_sum)
returns = returns.stack()[::-1]
if standardize:
returns = ((returns - tf.math.reduce_mean(returns)) /
(tf.math.reduce_std(returns) + eps))
return returns
def compute_loss(action_probs: tf.Tensor, values: tf.Tensor, returns: tf.Tensor) -> tf.Tensor:
"""Actor-Criticの損失を算出する"""
advantage = returns - values
action_log_probs = tf.math.log(action_probs)
actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)
critic_loss = huber_loss(values, returns)
return actor_loss + critic_loss
@tf.function
def train_step(
initial_state: tf.Tensor,
model: tf.keras.Model,
optimizer: tf.keras.optimizers.Optimizer,
gamma: float,
max_steps_per_episode: int) -> tf.Tensor:
"""Runs a model training step."""
with tf.GradientTape() as tape:
# モデルを1エピソード(max:max_steps_per_episode)実行してトレーニングデータを収集する
action_probs, values, rewards = run_episode(
initial_state, model, max_steps_per_episode)
# 期待される報酬を計算する
returns = get_expected_return(rewards, gamma)
# トレーニングデータを適切なTF形式のデータに変換する
action_probs, values, returns = [
tf.expand_dims(x, 1) for x in [action_probs, values, returns]]
# ネットワークを更新するために損失を計算する
loss = compute_loss(action_probs, values, returns)
# 損失から勾配を計算する
grads = tape.gradient(loss, model.trainable_variables)
# モデルのパラメーターに勾配を適用する
optimizer.apply_gradients(zip(grads, model.trainable_variables))
episode_reward = tf.math.reduce_sum(rewards)
return episode_reward
from IPython import display as ipythondisplay
from PIL import Image
def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int):
env.render_set_up()
env.human = False
state, info = env.reset()
state = tf.constant(state, dtype=tf.float32)
screen = env.render()
images = [Image.fromarray(screen)]
for i in range(1, max_steps+1):
#print(state)
done = False
state = tf.expand_dims(state, 0)
action_probs, _ = model(state)
action = np.argmax(np.squeeze(action_probs))
state, reward, done, info = env.step(action)
#print(action)
state = tf.constant(state, dtype=tf.float32)
if i % 5 == 0:
screen = env.render()
images.append(Image.fromarray(screen))
if done:
break
return images
import json
def Output_record(dir):
f = open(dir, "w")
json.dump(record, f, ensure_ascii=False)
if __name__ == "__main__":
seed = 314
np.random.seed(seed)
tf.random.set_seed(seed)
eps = np.finfo(np.float32).eps.item()
model = ActorCritic(num_actions, num_hidden_units)
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
min_episodes_criterion = 100
max_episodes = 4000 # 最大で何回学習するか
max_steps_per_episode = 5000 # 1エピソードごとに何回行動できるか
reward_threshold = 4000 # これ以上のスコアが出れば学習成功とみなす
running_reward = 0
# The discount factor for future rewards
gamma = 0.9
# Keep the last episodes reward
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)
#Count of desired score
keep_desired = 0
count_keep_desired = 10 # reward_threshold以上のスコアが何回続けば学習を終了するか
"""学習ループ"""
t = tqdm.trange(max_episodes)
for i in t:
initial_state, info = env.reset()
initial_state = tf.constant(initial_state, dtype=tf.float32)
episode_reward = int(train_step(initial_state, model, optimizer, gamma, max_steps_per_episode))
cod = env.couse_of_death
episodes_reward.append(episode_reward)
running_reward = statistics.mean(episodes_reward) #1エピソードの報酬の平均
record["reward_mean"].append(running_reward)
t.set_postfix(episode_reward=episode_reward, running_reward=running_reward)
# Show the average episode reward every 10 episodes
if i % 10 == 0:
pass # print(f'Episode {i}: average reward: {avg_reward}')
if running_reward >= reward_threshold:
keep_desired += 1
else:
keep_desired = 0
if keep_desired >= count_keep_desired and i >= min_episodes_criterion:
break
print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')
render_env = fb.FlappyBird()
images = render_episode(render_env, model, max_steps_per_episode)
image_file = f"{file_name}.gif"
images[0].save(image_file, save_all=True, append_images=images[1:], loop=0, duration=10)
print(f"saved {file_name}.gif")
Output_record("./output.json")
print(f"saved record.")
del model
print("\nfinish code.")
このコードを実行すると学習が始まります。最低100回は学習が進み、スコア4000以上が10回続いたところで学習が成功したとみなし学習を終了します。
その後、その学習済みのモデルでテストを行い、画面をgif画像
として出力しています。もちろんテストでは学習を行わずに単に行動を選択しているだけです。
また、学習中の1エピソード毎の報酬を記録し、Json形式
で書き出しています。これをグラフで表示させることでモデルの学習の推移をみることができます。
出力されたGif画像
ニューラルネットワーク(NN)の保存と復元
Gif画像で学習結果をいつでも見ることができるとはいえ、これでは実際にゲームに組み込む時、モデル自体を再現できないため毎回学習する必要があります。これでは不便でなりません。そのためモデルを保存するコードを追加します。
通常以下のコードで簡単に実装することができます。
model.save('my_model')
しかし今回はACの勾配の計算や最適化処理をつくるため、すべてカスタムで作りました。そのため、NNの構造もカスタムです。ですのでこれでは保存ができません。
ではどのように実装すればよいでしょうか?答えは簡単でモデルの重み
とアーキテクチャ
の2つのファイルに分けて保存します。
以下のコードを追加してください。
if __name__ == "__main__":
・・・
print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')
+ model.save_weights(filepath="model/fbWeight.weights.h5")
+ model_json = model.to_json()
+ f = open("./model/fbArchitecture.json", "w")
+ json.dump(model_json, f, ensure_ascii=False)
・・・
これによりモデルの完全な保存に成功します。
また、モデルのロードは以下のコードで実装できます。
model_open = open("./model/fbArchitecture.json", "r")
model_load = json.load(model_open)
model_new = tf.keras.models.model_from_json(model_load)
model_new.compile()
model_new.load_weights("./model/fbWeight.weights.h5")
これにより実際のモデルの復元にかかる時間が大幅にカットされました。
最後に
この記事では非常に簡単なゲームを自作してAI学習をしました。2024年現在、AIを紹介する記事は星の数ほどありますが、この記事のように具体的な手法やチュートリアルはそう多くありません。現状公式のチュートリアルを見ることぐらいがAI作成の手法について学ぶ方法だと私は考えています。この記事で作成したAIは少し奇妙な行動をしていて人間らしくありません。その原因にはニューラルネットワークが不完全であることや報酬システムに何らかの問題があることが考えられます。AI作成ではそこが一番の肝となります。ですので実験を重ねながらその問題を修正するスキルを身に着けることがAI作成を上達させるカギとなります。
この記事にあるコードを修正してみることで効果的な学習ができるかもしれません。
参考記事
Discussion