PyTorchでDQNをやってみた
ミニゲームをAIに攻略させてみましょうか
ごあいさつ
ダヴノー ニ ヴィージェリシ!(お久しぶりです!)ミ・ズターニャ・ソヴィエツカです!
今回はPythonの基本的な使い方がわかり、AIに興味がある人向けとして、ミニゲームを攻略するAIを作ってみようと思います!
今回は降ってくるアイテムを左右に移動して回収するゲームを作り、プレイさせてみようと思います!
一緒に頑張りましょう!
AI開発に興味がある方は、是非参考にしてくださると嬉しいです!
また、ふんわりと雰囲気でやっていくので、数学的な話や高度な話はできるだけしません!
ここどういうことなのだろう?とか、何をしているのだろう?とか疑問もったら、是非調べてみてください!
私は独学で行っているため文章中に間違った情報がある可能性があります!
また、記事の最後にソースコードの全文をMITライセンスで公開します!
AI開発に挑戦される方やPyTorchを使ってゲームを攻略するAIを作ってみたい方にとって、この記事が役立つよう頑張ります!
今回は事前にPyTorchのインストール方法がわかっていると楽かも…?
この画像のようなものを作ります。
【必要なライブラリと環境】#1
このチュートリアルでは、以下のライブラリと環境を使用しました!
- Windows 10
- Python 3.10.6
- PyTorch 2.3.1(CUDA12.1)
- NumPy
- PyGame
OSはWindows10を使用している前提で進めます。
前回はTensorFlowを使用しましたが、今回はPyTorchを使用してみようと思います!
PyTorch
は、機械学習やディープラーニングのために高度な数学計算やモデルの構築をサポートしてくれるフレームワークです!
こちらもニューラルネットワークのモデルを作成し、予測をさせたりすることが可能です。
NumPy
は数値計算を効率的に行うことができます!
このチュートリアルでは直接的には少しだけ利用します。
PyGame
はPythonでゲームを作成するのにつかわれるライブラリの一つです!簡単に2Dゲームを作成することが可能です!
ライブラリのインストールしてみましょうか
では早速ライブラリをインストールしてみましょう!
コマンドプロンプトやPowerShellを開き、
まずは以下のコマンドを入力して実行してください!
python -m pip install --upgrade pip
または
python3 -m pip install --upgrade pip
その後、PyTorch以外のライブラリをインストールしましょう!
python -m pip install [ライブラリの名前]
または
pip install [ライブラリの名前]
をすることでインストールできます!
pip install PyGame
等
PyTorch以外を上記のコマンドでインストールしたら、
PyTorchの公式サイトを開きましょう!
INSTALL PYTORCHの項目で自分の環境に合わせて選択をしてください!
ちょっと難しいので、今回はCPUで動くように作成するので自分でGPUを使用するつもりがない場合はCPUを選んでもいいと思います!
私の場合は
PyTorch Build: Stable (2.3.1)
Your OS: Windows
Package: Pip
Language: Python
Compute Platform: CUDA 12.1
を選択しました!
すると、Run this Commandの欄にコマンドが出てくるのでコピーしてコマンドプロンプトやPowerShellに張り付けて実行しましょう!
暫く待てばインストールが完了すると思います!
ライセンス
一応、明示しておきますが、この記事及びソースコードにおける、私の権利についてはMITライセンスになります。
MIT License
Copyright (c) 2024 mi_ztyanya
本ソフトウェアおよび関連文書ファイル(以下「本ソフトウェア」という、)
以下に定める条件に従い、本ソフトウェアの複製を取得するすべての人に対し、ソフトウェアを無制限に扱うことを無償で許可します。
これには、ソフトウェアの複製を使用、複写、変更、結合、掲載、頒布、サブライセンス、および/または販売する権利、およびソフトウェアを提供する相手に同じことを許可する権利も無制限に含まれます。
上記の著作権表示および本許諾表示を、ソフトウェアのすべての複製または重要な部分に記載するものとします。
ソフトウェアは「現状のまま」で、明示であるか暗黙であるかを問わず、何らの保証もなく提供されます。
ここでいう保証とは、商品性、特定の目的への適合性、および権利非侵害についての保証も含みますが、
それに限定されるものではありません。
作者または著作権者は、契約行為、不法行為、またはそれ以外であろうと、ソフトウェアに起因または関連し、
あるいはソフトウェアの使用またはその他の扱いによって生じる一切の請求、損害、
その他の義務について何らの責任も負わないものとします。
【まずはゲームを作っていきましょうか】#2
メインループ
まずは、AIにプレイしてもらう為のゲームを作っていきます!
この項はそこまで重要ではないのでスキップしてソースコードをコピーして利用しても構いません。
スキップする場合は 【DQNを作成してみましょうか】#3 から始めるといいと思います!
PyGameを使用しますが、今回は重要ではないのでこのライブラリの解説は特にしません。
では、上から落ちてくるアイテムを左右によけるゲームを作りましょう!
まずはメインループを作成することにします。
早速PyGameをインポートして実行してみましょう!
main.py
を作成し、以下のコードを記入してみましょうか
import pygame
# Pygameの初期化
pygame.init()
# ゲームウィンドウのサイズ
screen_width = 800
screen_height = 600
screen = pygame.display.set_mode((screen_width, screen_height))
# フォントの設定
font = pygame.font.Font(None, 36) # デフォルトフォントをサイズ36で使用
# 色の定義
white = (255, 255, 255)
black = (0, 0, 0)
# アイテムの最大数
max_items = 10
# メインループ
run = True
while run:
for event in pygame.event.get():
if event.type == pygame.QUIT:
run = False
# 描画
screen.fill(white)
# テキストを左上に描画
text = font.render(f"{int(counter/30)}", True, black) # 黒色で描画
screen.blit(text, (10, 10))
# 画面を更新
pygame.display.update()
pygame.quit()
はい!
白い画面が表示されましたね…?
とりあえずメインループを用意して画面全体に描画することができましたね!
オブジェクトを作成
では、次はゲームに表示するオブジェクトのクラスを作っていきましょう!
一気に行きますので、ついてきてください!
entities.py
を作成してimportをしましょう。
ここでimportするものはこの二つです!
import random
import pygame
まずはプレイヤーのクラスから!
# プレイヤー
class Player:
def __init__(self, screen_height, lane_count, lane_width):
self.width = 50
self.height = 70
self.lane = lane_count // 2 # 初期にいるレーンは中央
self.y = screen_height - self.height - 10
self.speed = 5
self.lane_count = lane_count
self.lane_width = lane_width
def draw(self, screen, color):
"""
描画
:param screen: 描画先
:param color: 色
:return:
"""
x = self.lane * self.lane_width + (self.lane_width - self.width) // 2
pygame.draw.rect(screen, color, (x, self.y, self.width, self.height))
# 移動
def move(self, direction):
"""
車を移動させる
:param direction: 移動方向
:return: None
"""
if direction == "left" and self.lane > 0:
self.lane -= 1
elif direction == "right" and self.lane < self.lane_count - 1:
self.lane += 1
次はアイテムのクラスです!
# アイテム
class Item:
def __init__(self, lane_count, lane_width):
self.width = 50
self.height = 70
self.lane = random.randint(0, lane_count - 1)
self.y = -self.height
self.speed = 5
self.lane_count = lane_count
self.lane_width = lane_width
def draw(self, screen, color):
"""
描画
:param screen: 描画先
:param color: 色
:return:
"""
x = self.lane * self.lane_width + (self.lane_width - self.width) // 2
pygame.draw.rect(screen, color, (x, self.y, self.width, self.height))
def move(self):
"""
アイテムを移動させる
:return: None
"""
self.y += self.speed
これでゲームに登場するクラスは完成ですね!
描画してみる
実際にゲームに登場させましょうか。
先ほど作成したクラスをインポートしましょう!
from entities import Player, Item
色の定義の下に変数を追記しちゃいましょう。
# 色の定義
white = (255, 255, 255)
black = (0, 0, 0)
red = (255, 0, 0)
# フレームレート調整用
clock = pygame.time.Clock()
# レーンの設定
lane_count = 11 # レーンの数
lane_width = screen_width // lane_count
# ゲームオブジェクト
player = Player(screen_height, lane_count, lane_width)
items = []
メインループに描画と移動を実装しましょう。
counter = 0
run = True
while run:
action = 1
for event in pygame.event.get():
if event.type == pygame.QUIT:
run = False
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
action = 0
elif event.key == pygame.K_RIGHT:
action = 2
if action == 0:
player.move("left")
elif action == 2:
player.move("right")
# アイテムの生成
if max_items > len(items):
if random.randint(1, 10) == 1:
items.append(Item(lane_count, lane_width))
# アイテムの移動と削除
for obs in items:
obs.move()
if obs.y > screen_height:
items.remove(obs)
# 描画
screen.fill(white)
player.draw(screen, red)
for obs in items:
obs.draw(screen, black)
# テキストをレンダリング
text = font.render(
f"time{int(counter / 30)}::life{life}", True, black
) # 黒色で描画
# テキストを左上に描画
screen.blit(text, (10, 10))
# 画面を更新
pygame.display.update()
clock.tick(30)
# カウンターを更新
counter += 1
さっそく実行してみましょうか。
どうでしょうか?
上からアイテムがふり、方向キーでプレイヤーを左右に移動させられましたか?
一度ゲームを完成させる
次は当たり判定を実装し、ゲームを一度完成させてみましょうか。
新しく関数を作成しプレイヤーの移動処理も移しましょう…
# 行動する
def take_action(player, items, action):
if action == 0:
player.move("left")
elif action == 2:
player.move("right")
point = 0
for obs in items:
# アイテムの移動
obs.move()
# プレイヤーに当たった場合
if (
player.lane == obs.lane
and player.y < obs.y + obs.height
and player.height + player.y > obs.y
):
items.remove(obs)
point += 1
# アイテムの削除
if obs.y > screen_height - 10:
items.remove(obs)
point -= 1
return point
メインループにlifeがなくなれば終了にする処理を追加して・・・
point = take_action(player, items, action) # 行動の結果を反映
life += point
done = False
# lifeがなくなれば終了
if life < 0:
done = True
elif 10 < life:
life = 10
# プレイヤーのHPが0より下
if done:
player = Player(screen_height, lane_count, lane_width)
items = []
counter = 0
life = 0
メインループも関数にしちゃいましょう!
# メインループ
def game_loop():
# ゲームオブジェクト
player = Player(screen_height, lane_count, lane_width)
items = []
run = True
counter = 0
life = 0
while run:
action = 1
for event in pygame.event.get():
if event.type == pygame.QUIT:
run = False
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
action = 0
elif event.key == pygame.K_RIGHT:
action = 2
# アイテムの生成
if max_items > len(items):
if random.randint(1, 10) == 1:
items.append(Item(lane_count, lane_width))
point = take_action(player, items, action) # 行動の結果を反映
life += point
done = False
# lifeがなくなれば終了
if life < 0:
done = True
elif 10 < life:
life = 10
# 描画
screen.fill(white)
player.draw(screen, red)
for obs in items:
obs.draw(screen, black)
# テキストをレンダリング
text = font.render(
f"time{int(counter / 30)}::life{life}", True, black
) # 黒色で描画
# テキストを左上に描画
screen.blit(text, (10, 10))
# 画面を更新
pygame.display.update()
clock.tick(30)
# カウンターを更新
counter += 1
# プレイヤーのHPが0より下
if done:
player = Player(screen_height, lane_count, lane_width)
items = []
counter = 0
life = 0
pygame.quit()
では、最後にgame_loopを呼び出しましょう!
# ゲームの開始
game_loop()
どうですか?
プレイできましたか?
ここまでのコード
main.py
import random
import pygame
from entities import Player, Item
# メインループ
def game_loop():
# ゲームオブジェクト
player = Player(screen_height, lane_count, lane_width)
items = []
run = True
counter = 0
life = 0
while run:
action = 1
for event in pygame.event.get():
if event.type == pygame.QUIT:
run = False
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
action = 0
elif event.key == pygame.K_RIGHT:
action = 2
# アイテムの生成
if max_items > len(items):
if random.randint(1, 10) == 1:
items.append(Item(lane_count, lane_width))
point = take_action(player, items, action) # 行動の結果を反映
life += point
done = False
# lifeがなくなれば終了
if life < 0:
done = True
elif 10 < life:
life = 10
# 描画
screen.fill(white)
player.draw(screen, red)
for obs in items:
obs.draw(screen, black)
# テキストをレンダリング
text = font.render(
f"time{int(counter / 30)}::life{life}", True, black
) # 黒色で描画
# テキストを左上に描画
screen.blit(text, (10, 10))
# 画面を更新
pygame.display.update()
clock.tick(30)
# カウンターを更新
counter += 1
# プレイヤーのHPが0より下
if done:
player = Player(screen_height, lane_count, lane_width)
items = []
counter = 0
life = 0
pygame.quit()
# 行動する
def take_action(player, items, action):
if action == 0:
player.move("left")
elif action == 2:
player.move("right")
point = 0
for obs in items:
# アイテムの移動
obs.move()
# プレイヤーに当たった場合
if (
player.lane == obs.lane
and player.y < obs.y + obs.height
and player.height + player.y > obs.y
):
items.remove(obs)
point += 1
# アイテムの削除
if obs.y > screen_height - 10:
items.remove(obs)
point -= 1
return point
# Pygameの初期化
pygame.init()
# フォントの設定
font = pygame.font.Font(None, 36) # デフォルトフォントをサイズ36で使用
# ゲームウィンドウのサイズ
screen_width = 800
screen_height = 600
screen = pygame.display.set_mode((screen_width, screen_height))
# 色の定義
white = (255, 255, 255)
black = (0, 0, 0)
red = (255, 0, 0)
# フレームレート調整用
clock = pygame.time.Clock()
# レーンの設定
lane_count = 11 # レーンの数
lane_width = screen_width // lane_count
# アイテムの最大数
max_items = 10
# ゲームの開始
game_loop()
entities.py
import random
import pygame
# プレイヤー
class Player:
def __init__(self, screen_height, lane_count, lane_width):
self.width = 50
self.height = 70
self.lane = lane_count // 2 # 初期にいるレーンは中央
self.y = screen_height - self.height - 10
self.speed = 5
self.lane_count = lane_count
self.lane_width = lane_width
def draw(self, win, color):
"""
描画
:param win: 描画先
:param color: 色
:return:
"""
x = self.lane * self.lane_width + (self.lane_width - self.width) // 2
pygame.draw.rect(win, color, (x, self.y, self.width, self.height))
# 移動
def move(self, direction):
"""
車を移動させる
:param direction: 移動方向
:return: None
"""
if direction == "left" and self.lane > 0:
self.lane -= 1
elif direction == "right" and self.lane < self.lane_count - 1:
self.lane += 1
# アイテム
class Item:
def __init__(self, lane_count, lane_width):
self.width = 50
self.height = 70
self.lane = random.randint(0, lane_count - 1)
self.y = -self.height
self.speed = 5
self.lane_count = lane_count
self.lane_width = lane_width
def draw(self, win, color):
"""
描画
:param win: 描画先
:param color: 色
:return:
"""
x = self.lane * self.lane_width + (self.lane_width - self.width) // 2
pygame.draw.rect(win, color, (x, self.y, self.width, self.height))
def move(self):
"""
アイテムを移動させる
:return: None
"""
self.y += self.speed
【DQNを作成してみましょうか】#3
今回作成するAI
ゲーム部分ができたところで、AIを作成していきましょう!
今回作成するAIモデルはDQNと呼ばれるものになります!
AIモデル作成開始
ではmodel.py
を作成して、
インポートするものはこちらです!
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
Qネットワーク
まずはpytorchでQネットワーク
の定義から作っていきましょう!
# Qネットワーク定義
class QNetwork(nn.Module):
def __init__(self):
super(QNetwork, self).__init__()
def forward(self, state):
pass
定義を用意したので、構成を考えていきましょう!
今回は単純にLinear三層で、入力から中間、出力にしたいと思います!
# 入力層から中間層へ
self.fc1 = nn.Linear(state_size, hidden_size)
# 中間層からさらに中間層へ
self.fc2 = nn.Linear(hidden_size, hidden_size)
# 中間層から出力層(行動数)へ
self.fc3 = nn.Linear(hidden_size, action_size)
pytorchでは、このようにネットワークの層を設定することができます!
これに合わせてforward(実行処理)を追記しましょう!
# Qネットワーク定義
class QNetwork(nn.Module):
def __init__(self, state_size, action_size, hidden_size=255):
super(QNetwork, self).__init__()
# 入力層から中間層へ
self.fc1 = nn.Linear(state_size, hidden_size)
# 中間層からさらに中間層へ
self.fc2 = nn.Linear(hidden_size, hidden_size)
# 中間層から出力層(行動数)へ
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, state):
# ReLU活性化関数を使用して、各層の出力を次の層に渡す
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
return self.fc3(x) # 最終層の出力を返す
試しに実行してみましょう。
q_network = QNetwork(3, 3)
state = [[1.0, 1.0, 1.0]]
state = torch.FloatTensor(state)
response = q_network(state)
print(f"response:{response}")
action = torch.argmax(response).item()
print(f"action:{action}")
私の場合、結果は以下のようになりました!
response:tensor([[-0.0297, -0.0628, -0.0253]], grad_fn=<AddmmBackward0>)
action:2
結果は固定ではないので、問題なく出力されればOKです!
Qネットワークはこれで完成です!
Qネットワーク定義全体
# Qネットワーク定義
class QNetwork(nn.Module):
def __init__(self, state_size, action_size, hidden_size=255):
super(QNetwork, self).__init__()
# 入力層から中間層へ
self.fc1 = nn.Linear(state_size, hidden_size)
# 中間層からさらに中間層へ
self.fc2 = nn.Linear(hidden_size, hidden_size)
# 中間層から出力層(行動数)へ
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, state):
# ReLU活性化関数を使用して、各層の出力を次の層に渡す
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
return self.fc3(x) # 最終層の出力を返す
優先度付き経験再生バッファ
Qネットワークができたので、経験を蓄積する仕組みと、取り出す仕組みを作っていきましょう!
経験に優先度を設定して優先度が高い経験ほど、学習に使われる確率が高くしていきます。
# 優先度付き経験再生バッファの定義
class PrioritizedReplayBuffer:
def __init__(self):
pass
# 経験をバッファに追加
def add(self):
pass
# 優先度に基づいて経験をサンプリング
def sample(self):
pass
# 優先度を更新
def update(self):
pass
def __len__(self):
pass
定義を用意したので、実装していきましょう!
初期化
まずは変数から設定していきましょう!
必要なのは
- どれくらい経験を蓄積できるかの容量
- 優先度をどれくらい活用するかの値
- 経験を蓄積するリスト(循環バッファー)
- 現在経験リスト上のどこにいるかわかる目印
- 優先度をつけたリスト
が必要だと考えました!
self.capacity = capacity # バッファの容量
self.alpha = alpha # 優先度をどの程度重視するかを決めるハイパーパラメータ
self.memory = [] # 経験を保存するリスト
self.pos = 0 # 現在の挿入位置
self.priorities = np.zeros((capacity,), dtype=np.float32) # 優先度のリスト
こんな感じですね!
def __init__(self, capacity, alpha):
self.capacity = capacity # バッファの容量
self.alpha = alpha # 優先度をどの程度重視するかを決めるハイパーパラメータ
self.memory = [] # 経験を保存するリスト
self.pos = 0 # 現在の挿入位置
self.priorities = np.zeros((capacity,), dtype=np.float32) # 優先度のリスト
経験を蓄積
次は経験を蓄積するメソッドadd
を実装しましょう!
このメソッドでは新しい経験をバッファに追加します。もしバッファがいっぱいなら、古い経験と置き換えます。
処理の流れは
- 現在の優先度の最大値を取得
- 経験を記録
- 優先度を設定
- 目印を移動
です!
現在の優先度の最大値を取得しましょう!
経験が空の場合は1.0にします。
max_priority = self.priorities.max() if self.memory else 1.0
受け取るデータはstate
action
reward
next_state
done
状況、行動、評価、行動後の状況、フラグ
にするつもりなので、タプルでまとめて経験にします!
experience = (state, action, reward, next_state, done)
経験を蓄積はself.memory.append(experience)
ですが、容量を決めているので、容量を超えているかどうかで処理を分岐しましょう。
容量がキャパシティ以下
if len(self.memory) < self.capacity
容量を超えていた場合は経験を上書きしましょう。
self.memory[self.pos] = experience
# メモリがバッファーの容量未満の場合、単純に経験を追加します。
if len(self.memory) < self.capacity:
self.memory.append(experience)
else:
# メモリがバッファーの容量に達している場合、古い経験を上書きします。
self.memory[self.pos] = experience
次に優先度を設定ですが、とりあえず、最大値にします。
self.priorities[self.pos] = max_priority
現在地の更新をしますが今回、経験は循環バッファーなので、終端に来たら先頭に戻します。
self.pos = (self.pos + 1) % self.capacity
完成したらこんな感じです!
def add(self, state, action, reward, next_state, done):
max_priority = self.priorities.max() if self.memory else 1.0
experience = (state, action, reward, next_state, done)
if len(self.memory) < self.capacity:
self.memory.append(experience)
else:
self.memory[self.pos] = experience
self.priorities[self.pos] = max_priority
self.pos = (self.pos + 1) % self.capacity
経験をサンプリング
次は経験を取り出すメソッド、sample
を実装しましょう!
このメソッドでは、経験を優先度に応じてランダムに取り出します。
処理の流れは
- 優先度を元にしてサンプリング確率を計算
- 確率に応じてバッチサイズの分取り出す
- 重み付けを計算
- 経験をもとのデータに戻して返却
ですね!
サンプリング確率を計算する前に経験の量で処理を変えます。
if len(self.memory) == self.capacity
メモリが最大になっているならそのまま使用します。
priorities = self.priorities
もしメモリが容量より少ないなら、現在の地点までの経験を使用しましょう。
priorities = self.priorities[: self.pos]
if len(self.memory) == self.capacity:
priorities = self.priorities
else:
priorities = self.priorities[: self.pos]
準備ができたので、処理を実装していきましょう。
まずは優先度で確率を計算しましょう。
優先度のリストに対して優先度の強さをかけて正規化し、確率分布に変換します。
probabilities = priorities**self.alpha
probabilities /= probabilities.sum()
次はバッチサイズの分データを取り出していきましょう
まずは確率をもとにバッチサイズの分、経験のキーのリストを生成します。
indices = np.random.choice(len(self.memory), batch_size, p=probabilities)
生成したキーのリストを使用してデータを取り出します。
experiences = [self.memory[idx] for idx in indices]
取り出したデータの重み付けを計算します。
サイズを取得し
total = len(self.memory)
重み付けを生成
weights = (total * probabilities[indices]) ** (-beta)
重みの合計が 1 になるように正規化
weights /= weights.max()
処理の実装が終わったので、データを返却しましょう。
データは経験としてタプルにしているので取り出します。
states, actions, rewards, next_states, dones = zip(*experiences)
取り出したデータとサンプリングのキーリストと重み付けを返却しましょう。
return states, actions, rewards, next_states, dones, indices, weights
優先度の更新
大体終わりましたが、経験の優先度を更新する必要があるので、メソッドupdate
が必要ですね!
self.priorities[idx] = priority
def update(self, idx, priority):
self.priorities[idx] = priority
これでupdateは実装完了です。
メモリ量を取得
最後に経験量を取得できるように__len__を実装しちゃいましょう。
def __len__(self):
return len(self.memory)
こうすることで、クラスのインスタンスに対してlenを使えば、経験量を取得できるようになりました!
PrioritizedReplayBuffer全体
# 優先度付き経験再生バッファの定義
class PrioritizedReplayBuffer:
def __init__(self, capacity, alpha):
self.capacity = capacity # バッファの容量
self.alpha = alpha # 優先度をどの程度重視するかを決めるハイパーパラメータ
self.memory = [] # 経験を保存するリスト
self.pos = 0 # 現在の挿入位置
self.priorities = np.zeros((capacity,), dtype=np.float32) # 優先度のリスト
# 経験をバッファに追加
def add(self, state, action, reward, next_state, done):
max_priority = self.priorities.max() if self.memory else 1.0
experience = (state, action, reward, next_state, done)
if len(self.memory) < self.capacity:
self.memory.append(experience)
else:
self.memory[self.pos] = experience
self.priorities[self.pos] = max_priority
self.pos = (self.pos + 1) % self.capacity
# 優先度に基づいて経験をサンプリング
def sample(self, batch_size, beta=0.4):
if len(self.memory) == self.capacity:
priorities = self.priorities
else:
priorities = self.priorities[: self.pos]
probabilities = priorities**self.alpha
probabilities /= probabilities.sum()
indices = np.random.choice(len(self.memory), batch_size, p=probabilities)
experiences = [self.memory[idx] for idx in indices]
total = len(self.memory)
weights = (total * probabilities[indices]) ** (-beta)
weights /= weights.max()
states, actions, rewards, next_states, dones = zip(*experiences)
return states, actions, rewards, next_states, dones, indices, weights
# 優先度を更新
def update(self, idx, priority):
self.priorities[idx] = priority
def __len__(self):
return len(self.memory)
DQN
まずは定義から
必要なメソッドは
- 初期化
- モデルの更新
- 経験を蓄積
- 行動
- 学習
です!
# DQNエージェントの定義
class DQNAgent:
def __init__(self):
pass
# ターゲットネットワークをポリシーネットワークのパラメータで更新
def update_target_network(self):
pass
# 経験をメモリに追加
def remember(self):
pass
# 行動を選択
def action(self):
pass
# 経験をリプレイしてネットワークを訓練
def replay(self):
pass
実装するとこんな感じです!
初期化
初期化処理を実装していきます!
今回必要なことは
- 入力次元数
- 出力(行動)数
- 優先度付き経験再生バッファを作成
- 割引率を設定
- 探索率を設定
- 探索率の減少率
- バッチサイズ
- 学習率
- ターゲットネットワークの更新頻度
- 優先度付き経験再生のサンプリングに使用する初期ベータ値
- ベータ値が1になるまでのフレーム数(再学習数)
- 現在のフレーム数
- ポリシーネットワーク
- ターゲットネットワーク
- ターゲットネットワークのパラメータをポリシーネットワークで初期化
- オプティマイザーの設定
ちょっと多いですね…?
頑張りましょう!
def __init__(
self,
state_size,
action_size,
buffer_size=10000,
batch_size=64,
gamma=0.99,
learning_rate=0.001,
target_update_freq=10,
alpha=0.6,
beta_start=0.4,
beta_frames=100000,
):
self.state_size = state_size # 状態の次元数
self.action_size = action_size # 行動の数
# 優先度付き経験再生バッファを初期化
self.memory = PrioritizedReplayBuffer(buffer_size, alpha)
self.gamma = gamma # 割引率
self.epsilon = 1.0 # 探索率
self.epsilon_decay = 0.995 # 探索率の減少率
self.epsilon_min = 0.01 # 最小探索率
self.batch_size = batch_size # ミニバッチのサイズ
self.learning_rate = learning_rate # 学習率
self.target_update_freq = target_update_freq # ターゲットネットワークの更新頻度
self.beta_start = beta_start # 優先度付き経験再生の初期β値
self.beta_frames = beta_frames # β値が1に達するまでのフレーム数
self.frame = 0 # フレーム数をトラッキングするための変数
# ポリシーネットワークのインスタンス
self.model = QNetwork(state_size, action_size)
# ターゲットネットワークのインスタンス
self.target_model = QNetwork(state_size, action_size)
# ターゲットネットワークをポリシーネットワークのパラメータで初期化
self.target_model.load_state_dict(self.model.state_dict())
# Adamオプティマイザを使用してネットワークのパラメータを最適化
self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
ターゲットネットワークの更新
update_target_network
を実装してターゲットネットワークをポリシーネットワークで初期化しましょう
初期化で実装しましたね!
def update_target_network(self):
self.target_model.load_state_dict(self.model.state_dict())
ということは初期化のほうも
# ターゲットネットワークをポリシーネットワークのパラメータで初期化
self.update_target_network()
メソッドを使用するように変更しましょう!
蓄積
次はさっき実装した経験を蓄積するメソッドを呼び出して経験を蓄積するメソッドremember
を実装しましょう。
self.memory.add(state, action, reward, next_state, done)
になるはずなので
def remember(self, state, action, reward, next_state, done):
self.memory.add(state, action, reward, next_state, done)
こうですね!
行動
では、DQN君が行動を選択するメソッドを実装してみましょうか。
今回は強化学習の手法の一つであるε-グリーディー法
を使用します!
最初はランダムに行動してほしいので、
if np.random.rand() <= self.epsilon
ランダムに行動を決定して返却。
return random.randrange(self.action_size)
状態をfloatに変換し、テンソルにしましょう
state = torch.FloatTensor(state).unsqueeze(0)
そのデータをネットワークに入力して各行動のQ値を計算してもらいましょう。
act_values = self.model(state)
一番Q値が大きかったものを採用して返却
return torch.argmax(act_values).item()
def action(self, state):
# 探索かネットワークの予測に基づいて行動を選択するか決定
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size) # ランダムに行動を選択
state = torch.FloatTensor(state).unsqueeze(
0
) # 状態をテンソルに変換してネットワークに入力
with torch.no_grad(): # 勾配計算を無効化
act_values = self.model(state) # 各行動のQ値を計算
return torch.argmax(act_values).item() # 最大のQ値を持つ行動を選択
学習
最後に経験をリプレイして学習していくメソッド、replay
を実装してみましょうか。
必要な処理の流れは
- ベータ値の更新
- 経験のメモリから経験を取り出す
- 一度Q値を計算
- ターゲットネットワークで次の状況でのQ値を計算
- 損失の計算
- 逆伝播して更新
- 優先度を更新
- 探索率を減少させる
- ターゲットネットワークを更新させる
です!
ちょっと大変ですが、一緒に頑張りましょう!
学習するための経験がそろっているかの確認が必要ですね…
if len(self.memory) < self.batch_size
この場合はそのまま終了しちゃいましょう!
経験がそろっていれば
フレーム数を更新して
self.frame += 1
ベータ値を設定しましょう
beta = min(1.0,self.beta_start + self.frame * (1.0 - self.beta_start) / self.beta_frames,)
経験を取り出します。
states, actions, rewards, next_states, dones, indices, weights = (self.memory.sample(self.batch_size, beta))
取り出したら、テンソルに変換します。
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions).unsqueeze(1)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
weights = torch.FloatTensor(weights).unsqueeze(1)
データの準備が完了したので学習処理を実装していきます!
まずは現在の状況からQ値を計算
current_q_values = self.model(states).gather(1, actions)
ターゲットネットワークを使って次の状況のQ値を計算
next_q_values = self.target_model(next_states).max(1)[0].detach()
学習に使うターゲットを設定
target_q_values = rewards + (1 - dones) * self.gamma * next_q_values.unsqueeze(1)
損失の計算を実装しましょう。
まずはTD誤差を二乗します。
(current_q_values - target_q_values).pow(2)
重み付けを掛けます。
loss = (current_q_values - target_q_values).pow(2) * weights
平均損失を計算します。
loss = loss.mean()
これにより、現在とターゲットの間でどれだけ学習する必要があるのか損失を出します。
必要な作業は終了したので、モデルに反映します。
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # 勾配を逆伝播
self.optimizer.step() # パラメータを更新
後処理として、優先度を更新し探索率を下げ、ターゲットネットワークを更新しましょう。
優先度を更新
for idx in indices:
self.memory.update(idx, loss.item())
探索率が最低値以上であれば
if self.epsilon > self.epsilon_min
探索率を減少
self.epsilon *= self.epsilon_decay
更新間隔であれば
if self.frame % self.target_update_freq == 0
ターゲットネットワークを更新
self.update_target_network()
はい!これでmodel.py
は完成です!
ためしに実行してみましょうか!
# エージェントの作成
state_size = 3 # 状態
action_size = 3 # 行動
model = DQNAgent(state_size, action_size, batch_size=1)
state = [1.0, 1.0, 1.0]
next_state = [1.0, 0, 1.0]
action = model.act(state)
print(action)
model.remember(state, action, reward=-100, next_state=next_state, done=True)
# 経験を記憶
model.replay() # 経験を用いて学習
model.update_target_network()
action = model.act(state)
print(action)
とりあえず、動いたら大丈夫です!
DQNAgent全体
# DQNエージェントの定義
class DQNAgent:
def __init__(
self,
state_size,
action_size,
buffer_size=10000,
batch_size=64,
gamma=0.99,
learning_rate=0.001,
target_update_freq=10,
alpha=0.6,
beta_start=0.4,
beta_frames=100000,
):
self.state_size = state_size # 状態の次元数
self.action_size = action_size # 行動の数
# 優先度付き経験再生バッファを初期化
self.memory = PrioritizedReplayBuffer(buffer_size, alpha)
self.gamma = gamma # 割引率
self.epsilon = 1.0 # 探索率
self.epsilon_decay = 0.995 # 探索率の減少率
self.epsilon_min = 0.01 # 最小探索率
self.batch_size = batch_size # ミニバッチのサイズ
self.learning_rate = learning_rate # 学習率
self.target_update_freq = target_update_freq # ターゲットネットワークの更新頻度
self.beta_start = beta_start # 優先度付き経験再生の初期β値
self.beta_frames = beta_frames # β値が1に達するまでのフレーム数
self.frame = 0 # フレーム数をトラッキングするための変数
# ポリシーネットワークのインスタンス
self.model = QNetwork(state_size, action_size)
# ターゲットネットワークのインスタンス
self.target_model = QNetwork(state_size, action_size)
# ターゲットネットワークをポリシーネットワークのパラメータで初期化
self.update_target_network()
# Adamオプティマイザを使用してネットワークのパラメータを最適化
self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
# ターゲットネットワークをポリシーネットワークのパラメータで更新
def update_target_network(self):
self.target_model.load_state_dict(self.model.state_dict())
# 経験をメモリに追加
def remember(self, state, action, reward, next_state, done):
self.memory.add(state, action, reward, next_state, done)
# 行動を選択
def action(self, state):
# 探索かネットワークの予測に基づいて行動を選択するか決定
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size) # ランダムに行動を選択
state = torch.FloatTensor(state).unsqueeze(
0
) # 状態をテンソルに変換してネットワークに入力
with torch.no_grad(): # 勾配計算を無効化
act_values = self.model(state) # 各行動のQ値を計算
return torch.argmax(act_values).item() # 最大のQ値を持つ行動を選択
# 経験をリプレイしてネットワークを訓練
def replay(self):
if len(self.memory) < self.batch_size:
return # メモリが十分に溜まるまではリプレイを実行しない
self.frame += 1 # フレーム数をカウント
# β値をフレーム数に基づいて更新
beta = min(
1.0,
self.beta_start + self.frame * (1.0 - self.beta_start) / self.beta_frames,
)
# メモリから優先度に基づいたサンプリングを実行
states, actions, rewards, next_states, dones, indices, weights = (
self.memory.sample(self.batch_size, beta)
)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions).unsqueeze(1)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
weights = torch.FloatTensor(weights).unsqueeze(1)
# 現在の状態でのQ値を取得
current_q_values = self.model(states).gather(1, actions)
# 次の状態での最大Q値をターゲットネットワークから取得
next_q_values = self.target_model(next_states).max(1)[0].detach()
# ターゲットQ値を計算
target_q_values = rewards + (1 - dones) * self.gamma * next_q_values.unsqueeze(
1
)
# TD誤差の二乗に重みを掛けた損失を計算
loss = (current_q_values - target_q_values).pow(2) * weights
loss = loss.mean() # 平均損失を計算
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # 勾配を逆伝播
self.optimizer.step() # パラメータを更新
# サンプルした経験の優先度を更新
for idx in indices:
self.memory.update(idx, loss.item())
# 探索率を減少
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# ターゲットネットワークの更新
if self.frame % self.target_update_freq == 0:
self.update_target_network()
【ゲームがプレイできるようにしましょうか】#4
修正
DQNモデルが完成したので、次は最初に作ったゲームをプレイしてもらえるように修正しましょう!
main.py
を開いてください。
importを追加しましょう。
from model import DQNAgent
DQNを作成しましょう。
state_size
は
- プレイヤーの現在地(x)
- アイテムの現在地のリスト(x, y)
で合計三つにです!
state_size = 3
です。
action_size
は
- 左
- 中央
- 右
の合計三つです
action_size = 3
です。
これらの変数を使って、DQNAgent
を作成します!
agent_model = DQNAgent(state_size, action_size)
game_loopに渡すことにします。
game_loop(agent_model)
受け取れるように定義を修正しましょう。
def game_loop(model)
現在の状況を取得する関数が必要ですね。
get_state
を定義し、実装しましょう。
# 状態を取得する関数
def get_state(player, items):
state = [float(player.lane)] # player.laneをfloatにキャストしておく
# 最新のアイテム max_items 個まで考慮
for obs in items[:max_items]: # 最新 max_items 個のアイテム
state.extend([float(obs.lane), float(obs.y)])
# アイテムが max_items より少ない場合、デフォルト値で埋める
while len(state) < 2 * max_items + 1: # プレイヤーのレーン + 2 * max_items
state.extend([0.0, 0.0]) # デフォルト値で埋める
return state
必要な情報は
- プレイヤーの現在地(x)
- アイテムの現在地のリスト(x, y)
でしたので、オブジェクトがなかったときは
[0.0, 0.0]で埋めておきましょう!
次はtake_action
関数を修正しましょう!
DQNの学習に使用するrewardを出力するように変更を加えましょう!
# 行動する
def take_action(car, items, action):
reward = 0
if action == 0:
car.move("left")
elif action == 2:
car.move("right")
point = 0
for obs in items:
# アイテムの移動
obs.move()
# プレイヤーに当たった場合
if (
car.lane == obs.lane
and car.y < obs.y + obs.height
and car.height + car.y > obs.y
):
items.remove(obs)
point += 1
reward = 10
# アイテムの削除
if obs.y > screen_height - 10:
items.remove(obs)
point -= 1
reward = -100
return reward, point
最後にgame_loopを対応させましょう!
アイテムの生成の下に
今作った状況取得関数を呼び
state = get_state(player, items)
DQNモデルに渡して行動を決めてもらいましょう。
action = model.act(state)
take_actionも変更があったので対応させましょう。
reward, point = take_action(player, items, action)
lifeの処理の下に行動後の状況を取得する処理を用意し
next_state = get_state(player, items) # 次の状態を取得
モデルの経験として蓄積しましょう。
model.remember(state, action, reward, next_state, done)
その後学習を呼び出しをして
model.replay()
完成です!
# メインループ
def game_loop(model):
# ゲームオブジェクト
player = Player(screen_height, lane_count, lane_width)
items = []
run = True
counter = 0
life = 0
while run:
action = 1
for event in pygame.event.get():
if event.type == pygame.QUIT:
run = False
# アイテムの生成
if max_items > len(items):
if random.randint(1, 10) == 1:
items.append(Item(lane_count, lane_width))
state = get_state(player, items) # 状態を取得
action = model.action(state) # AIによる行動選択
reward, point = take_action(player, items, action) # 行動の結果を反映
life += point
done = False
# lifeがなくなれば終了
if life < 0:
done = True
elif 10 < life:
life = 10
next_state = get_state(player, items) # 次の状態を取得
model.remember(state, action, reward, next_state, done) # 経験を記憶
model.replay() # 経験を用いて学習
# 描画
screen.fill(white)
player.draw(screen, red)
for obs in items:
obs.draw(screen, black)
# テキストをレンダリング
text = font.render(
f"time{int(counter / 30)}::life{life}", True, black
) # 黒色で描画
# テキストを左上に描画
screen.blit(text, (10, 10))
# 画面を更新
pygame.display.update()
clock.tick(30)
# カウンターを更新
counter += 1
# プレイヤーのHPが0より下
if done:
player = Player(screen_height, lane_count, lane_width)
items = []
counter = 0
life = 0
pygame.quit()
【最後にプレイさせてみましょうか】#5
ついに完成しました!
さっそく実行しましょう!
最初はランダムに行動し
だんだんプレイできるようになっていくと思います!
プレイ時間が伸びていきますね!
【おしまい 今日のソースコード】#6
お疲れ様でした!
今日はDQNでAIにゲームをプレイさせてみました!
この記事が皆さんのサポートになればうれしいです!
あなたのAI開発への挑戦を応援しています!
最後に今回のソースコードを共有します。
クリックして展開
MIT License
Copyright (c) 2024 mi_ztyanya
本ソフトウェアおよび関連文書ファイル(以下「本ソフトウェア」という、)
以下に定める条件に従い、本ソフトウェアの複製を取得するすべての人に対し、ソフトウェアを無制限に扱うことを無償で許可します。
これには、ソフトウェアの複製を使用、複写、変更、結合、掲載、頒布、サブライセンス、および/または販売する権利、およびソフトウェアを提供する相手に同じことを許可する権利も無制限に含まれます。
上記の著作権表示および本許諾表示を、ソフトウェアのすべての複製または重要な部分に記載するものとします。
ソフトウェアは「現状のまま」で、明示であるか暗黙であるかを問わず、何らの保証もなく提供されます。
ここでいう保証とは、商品性、特定の目的への適合性、および権利非侵害についての保証も含みますが、
それに限定されるものではありません。
作者または著作権者は、契約行為、不法行為、またはそれ以外であろうと、ソフトウェアに起因または関連し、
あるいはソフトウェアの使用またはその他の扱いによって生じる一切の請求、損害、
その他の義務について何らの責任も負わないものとします。
main.py
import random
import pygame
from entities import Player, Item
from model import DQNAgent
# メインループ
def game_loop(model):
# ゲームオブジェクト
player = Player(screen_height, lane_count, lane_width)
items = []
run = True
counter = 0
life = 0
while run:
action = 1
for event in pygame.event.get():
if event.type == pygame.QUIT:
run = False
# アイテムの生成
if max_items > len(items):
if random.randint(1, 10) == 1:
items.append(Item(lane_count, lane_width))
state = get_state(player, items) # 状態を取得
action = model.act(state) # AIによる行動選択
reward, point = take_action(player, items, action) # 行動の結果を反映
life += point
done = False
# lifeがなくなれば終了
if life < 0:
done = True
elif 10 < life:
life = 10
next_state = get_state(player, items) # 次の状態を取得
model.remember(state, action, reward, next_state, done) # 経験を記憶
model.replay() # 経験を用いて学習
# 描画
screen.fill(white)
player.draw(screen, red)
for obs in items:
obs.draw(screen, black)
# テキストをレンダリング
text = font.render(
f"time{int(counter / 30)}::life{life}", True, black
) # 黒色で描画
# テキストを左上に描画
screen.blit(text, (10, 10))
# 画面を更新
pygame.display.update()
clock.tick(30)
# カウンターを更新
counter += 1
# プレイヤーのHPが0より下
if done:
player = Player(screen_height, lane_count, lane_width)
items = []
counter = 0
life = 0
pygame.quit()
# 行動する
def take_action(car, items, action):
reward = 0
if action == 0:
car.move("left")
elif action == 2:
car.move("right")
point = 0
for obs in items:
# アイテムの移動
obs.move()
# プレイヤーに当たった場合
if (
car.lane == obs.lane
and car.y < obs.y + obs.height
and car.height + car.y > obs.y
):
items.remove(obs)
point += 1
reward = 10
# アイテムの削除
if obs.y > screen_height - 10:
items.remove(obs)
point -= 1
reward = -100
return reward, point
# 状態を取得する関数
def get_state(player, items):
state = [float(player.lane)] # player.laneをfloatにキャストしておく
# 最新のアイテム max_items 個まで考慮
for obs in items[:max_items]: # 最新 max_items 個のアイテム
state.extend([float(obs.lane), float(obs.y)])
# アイテムが max_items より少ない場合、デフォルト値で埋める
while len(state) < 2 * max_items + 1: # プレイヤーのレーン + 2 * max_items
state.extend([0.0, 0.0]) # デフォルト値で埋める
return state
# Pygameの初期化
pygame.init()
# フォントの設定
font = pygame.font.Font(None, 36) # デフォルトフォントをサイズ36で使用
# ゲームウィンドウのサイズ
screen_width = 800
screen_height = 600
screen = pygame.display.set_mode((screen_width, screen_height))
# 色の定義
white = (255, 255, 255)
black = (0, 0, 0)
red = (255, 0, 0)
# フレームレート調整用
clock = pygame.time.Clock()
# レーンの設定
lane_count = 11 # レーンの数
lane_width = screen_width // lane_count
# アイテムの最大数
max_items = 10
# エージェントの作成
state_size = 1 + (max_items * 2) # 状態空間のサイズ
action_size = 3 # 行動の数(左、そのまま、右)
agent_model = DQNAgent(state_size, action_size)
# ゲームの開始
game_loop(agent_model)
entities.py
import random
import pygame
# プレイヤー
class Player:
def __init__(self, screen_height, lane_count, lane_width):
self.width = 50
self.height = 70
self.lane = lane_count // 2 # 初期にいるレーンは中央
self.y = screen_height - self.height - 10
self.speed = 5
self.lane_count = lane_count
self.lane_width = lane_width
def draw(self, win, color):
"""
描画
:param win: 描画先
:param color: 色
:return:
"""
x = self.lane * self.lane_width + (self.lane_width - self.width) // 2
pygame.draw.rect(win, color, (x, self.y, self.width, self.height))
# 移動
def move(self, direction):
"""
車を移動させる
:param direction: 移動方向
:return: None
"""
if direction == "left" and self.lane > 0:
self.lane -= 1
elif direction == "right" and self.lane < self.lane_count - 1:
self.lane += 1
# アイテム
class Item:
def __init__(self, lane_count, lane_width):
self.width = 50
self.height = 70
self.lane = random.randint(0, lane_count - 1)
self.y = -self.height
self.speed = 5
self.lane_count = lane_count
self.lane_width = lane_width
def draw(self, win, color):
"""
描画
:param win: 描画先
:param color: 色
:return:
"""
x = self.lane * self.lane_width + (self.lane_width - self.width) // 2
pygame.draw.rect(win, color, (x, self.y, self.width, self.height))
def move(self):
"""
アイテムを移動させる
:return: None
"""
self.y += self.speed
model.py
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# Qネットワーク定義
class QNetwork(nn.Module):
def __init__(self, state_size, action_size, hidden_size=255):
super(QNetwork, self).__init__()
# 入力層から中間層へ
self.fc1 = nn.Linear(state_size, hidden_size)
# 中間層からさらに中間層へ
self.fc2 = nn.Linear(hidden_size, hidden_size)
# 中間層から出力層(行動数)へ
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, state):
# ReLU活性化関数を使用して、各層の出力を次の層に渡す
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
return self.fc3(x) # 最終層の出力を返す
# 優先度付き経験再生バッファの定義
class PrioritizedReplayBuffer:
def __init__(self, capacity, alpha):
self.capacity = capacity # バッファの容量
self.alpha = alpha # 優先度をどの程度重視するかを決めるハイパーパラメータ
self.memory = [] # 経験を保存するリスト
self.pos = 0 # 現在の挿入位置
self.priorities = np.zeros((capacity,), dtype=np.float32) # 優先度のリスト
# 経験をバッファに追加
def add(self, state, action, reward, next_state, done):
max_priority = self.priorities.max() if self.memory else 1.0
experience = (state, action, reward, next_state, done)
if len(self.memory) < self.capacity:
self.memory.append(experience)
else:
self.memory[self.pos] = experience
self.priorities[self.pos] = max_priority
self.pos = (self.pos + 1) % self.capacity
# 優先度に基づいて経験をサンプリング
def sample(self, batch_size, beta=0.4):
if len(self.memory) == self.capacity:
priorities = self.priorities
else:
priorities = self.priorities[: self.pos]
probabilities = priorities**self.alpha
probabilities /= probabilities.sum()
indices = np.random.choice(len(self.memory), batch_size, p=probabilities)
experiences = [self.memory[idx] for idx in indices]
total = len(self.memory)
weights = (total * probabilities[indices]) ** (-beta)
weights /= weights.max()
states, actions, rewards, next_states, dones = zip(*experiences)
return states, actions, rewards, next_states, dones, indices, weights
# 優先度を更新
def update(self, idx, priority):
self.priorities[idx] = priority
def __len__(self):
return len(self.memory)
# DQNエージェントの定義
class DQNAgent:
def __init__(
self,
state_size,
action_size,
buffer_size=10000,
batch_size=64,
gamma=0.99,
learning_rate=0.001,
target_update_freq=10,
alpha=0.6,
beta_start=0.4,
beta_frames=100000,
):
self.state_size = state_size # 状態の次元数
self.action_size = action_size # 行動の数
# 優先度付き経験再生バッファを初期化
self.memory = PrioritizedReplayBuffer(buffer_size, alpha)
self.gamma = gamma # 割引率
self.epsilon = 1.0 # 探索率
try:
with open("epsilon.txt", "r", encoding="utf-8") as file:
epsilon_value_str = file.read().strip()
if epsilon_value_str:
self.epsilon = float(epsilon_value_str)
print("探索率をロードしました。")
else:
# ファイルが空だった場合
print("探索率が見つかりませんでした。")
except FileNotFoundError:
# ファイルが存在しない場合
print("探索率が見つかりませんでした。")
except ValueError:
# ファイルから読み込んだ値がfloatに変換できなかった場合の処理
print("探索率のファイルから読み込んだ値が正しい形式ではありません。")
self.epsilon_decay = 0.995 # 探索率の減少率
self.epsilon_min = 0.01 # 最小探索率
self.batch_size = batch_size # ミニバッチのサイズ
self.learning_rate = learning_rate # 学習率
self.target_update_freq = target_update_freq # ターゲットネットワークの更新頻度
self.beta_start = beta_start # 優先度付き経験再生の初期β値
self.beta_frames = beta_frames # β値が1に達するまでのフレーム数
self.frame = 0 # フレーム数をトラッキングするための変数
# ポリシーネットワークのインスタンス
self.model = QNetwork(state_size, action_size)
# ターゲットネットワークのインスタンス
self.target_model = QNetwork(state_size, action_size)
# ターゲットネットワークをポリシーネットワークのパラメータで初期化
self.update_target_network()
# Adamオプティマイザを使用してネットワークのパラメータを最適化
self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
# ターゲットネットワークをポリシーネットワークのパラメータで更新
def update_target_network(self):
self.target_model.load_state_dict(self.model.state_dict())
# 経験をメモリに追加
def remember(self, state, action, reward, next_state, done):
self.memory.add(state, action, reward, next_state, done)
# ε-グリーディー法で行動を選択
def action(self, state):
# 探索かネットワークの予測に基づいて行動を選択するか決定
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size) # ランダムに行動を選択
state = torch.FloatTensor(state).unsqueeze(
0
) # 状態をテンソルに変換してネットワークに入力
with torch.no_grad(): # 勾配計算を無効化
act_values = self.model(state) # 各行動のQ値を計算
return torch.argmax(act_values).item() # 最大のQ値を持つ行動を選択
# 経験をリプレイしてネットワークを訓練
def replay(self):
if len(self.memory) < self.batch_size:
return # メモリが十分に溜まるまではリプレイを実行しない
self.frame += 1 # フレーム数をカウント
# β値をフレーム数に基づいて更新
beta = min(
1.0,
self.beta_start + self.frame * (1.0 - self.beta_start) / self.beta_frames,
)
# メモリから優先度に基づいたサンプリングを実行
states, actions, rewards, next_states, dones, indices, weights = (
self.memory.sample(self.batch_size, beta)
)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions).unsqueeze(1)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
weights = torch.FloatTensor(weights).unsqueeze(1)
# 現在の状態でのQ値を取得
current_q_values = self.model(states).gather(1, actions)
# 次の状態での最大Q値をターゲットネットワークから取得
next_q_values = self.target_model(next_states).max(1)[0].detach()
# ターゲットQ値を計算
target_q_values = rewards + (1 - dones) * self.gamma * next_q_values.unsqueeze(
1
)
# TD誤差の二乗に重みを掛けた損失を計算
loss = (current_q_values - target_q_values).pow(2) * weights
loss = loss.mean() # 平均損失を計算
self.optimizer.zero_grad() # 勾配をリセット
loss.backward() # 勾配を逆伝播
self.optimizer.step() # パラメータを更新
# サンプルした経験の優先度を更新
for idx in indices:
self.memory.update(idx, loss.item())
# 探索率を減少
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
with open("epsilon.txt", "w", encoding="utf-8") as file:
file.write(str(self.epsilon))
# ターゲットネットワークの更新
if self.frame % self.target_update_freq == 0:
self.update_target_network()
Discussion