強化学習「GRPO」をCartPoleタスクで実装しながら解説
DeepSeek-R1 で有名になった「GRPO」を CartPoleタスクに対して実装しながら解説します(Google Colab上にて)。Pendulum タスク版の実装も用意しています。
強化学習手法「GRPO(Group Relative Policy Optimization)」を実装面から理解したい方におすすめの内容です。
実装には PyTorchを使用しています。計算実行時間はGoogle Colabの CPU環境で約7分です。今回はGPUは使用していません。
本記事で紹介する実装コード(Google ColaboratoryのNotebook)は以下に置いています。
タスクが「CartPole版」と「Pendulum版」の2種類を用意しています。
link: GRPO_CartPole_Yutaro_Ogawa_20250208.ipynb
link: GRPO_Pendulum_Yutaro_Ogawa_20250208.ipynb
本記事の目次です
-
はじめに
0.1 GRPO概説 -
ひとまず適当にCartPole環境を動かしてみます
1.1 CartPole環境を作成
1.2 CartPoleが倒れるまで適当に動かす
1.3 実行結果を動画で可視化 -
GRPOに必要な関数群を定義する
2.1 Policy Networkを定義
2.2 1エピソードを実行し、各stepでのstateやactionなどを収集する関数を定義
2.3 GRPOに基づくAdvantage計算を定義
2.4 GRPOに基づくPolicy Netの重み更新を定義 -
GRPOでPolicy Netを更新しながら、CartPoleを実行する
3.1 初期化とハイパラ設定
3.2 訓練開始(100エピソードを1トライアルとして、繰り返す) -
訓練後のPolicy NetでCartPoleを制御する様子を可視化する
4.1 1エピソードの実施
4.2 実行結果を動画で可視化 -
さいごに
0. はじめに
0.1 GRPO概説
GRPO(Group Relative Policy Optimization)は、LLMの「DeepSeek-R1」 で有名になった強化学習手法の一種です。
ただしその初出は同じ著者らの「DeepSeekMath」論文になります。
-
DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning [link]
-
DeepSeekMath: Pushing the Limits of Mathematical Reasoning in Open Language Models [link]
以下画像はDeepSeek-R1論文内のGRPOの解説部分です。
「GRPOという手法は以下の通りです。以上」という感じなのですが、これだけでは分かりづらいため(少なくとも私には)、本記事では実装しながら理解を深めることにします。
なお本記事では、GRPOの理論的な詳細解説は実施いたしません。
アルゴリズムの流れを数式とともに解説しているコンテンツとしては、horomaryさんの「どこから見てもメンダコ」ブログの記事が素晴らしいので、こちらをご覧ください。
とはいえ「GRPO」実装にあたりその特徴は重要ですので、以下に整理します。
- LLMの強化学習に使われた手法であり、疎な報酬環境(最終時のみに報酬発生)に対応できる
- PPO と REINFORCE をベースに、Advantageの計算に工夫を入れている
- Advantage とは、とあるstepでの状態価値 V と実際の行動 aで得られる価値の差を示す
- 従来手法はAdvantageを求めるために状態価値 V を求めるNetworkを構築・訓練していたが、GRPOではAdvantageを複数試行の報酬の平均値とその差分で表現する(ここが真髄)
- ゆえに状態価値 V を求めるNetworkは不要となる
では、実際に上記を実装するとどうなるのかですが、Google Colab上で CartPoleタスクを対象に実装しながら解説を進めます。
なお実装に際しては以下のブログ記事を参考にさせていただきました。
ブログ: 「group relative policy optimization (GRPO)」 [link]
1. ひとまず適当にCartPole環境を動かしてみます
1.1 CartPole環境を作成
ひとまず、適当な動作(action=0 は Cart を左に押す、1 は右に押す)の状態で、Google Colab上で動作させ、動作する様子を動画としてを描画するまでの内容を実装します。
本記事において、ここから先の部分は、解説がほぼ実装コードのコメント文になっています。
そのため「実装コードのコメント文」も読みながら、記事をご一読いただけますと幸いです。
最初に Gym の CartPole環境を用意します。
import gym
# version確認
print(gym.__version__) # 0.25.2でした
# CartPoleのenvを用意
env = gym.make('CartPole-v0')
1.2 CartPoleが倒れるまで適当に動かす
Actionの Cartをどちら方向に押すかはランダムに決定して、倒れるまで動かしてみます。
なお、動作の様子をのちほど動画で可視化したいので、画面の画像をキャプチャして保管しています。
import random
# 適当に動かす
# ====================
# [0] 環境をリセット+初期変数の設定
state = env.reset()
done = False # episodeの終わりを判定するフラグ
episode_reward = 0 # episodeの総報酬を格納する
frames = [] # 画像保存用リスト
# 倒れてdone=Trueになるまで続ける
while not done:
# [1] 画面キャプチャ
frames.append(env.render(mode="rgb_array")) # ndarray (400, 600, 3)
# [2] Actionを適当に 0 or 1 から選ぶ(左か右に押す)
action = random.choice([0, 1])
# [3] CartPoleの台車にActionを適用してstateを更新する
state, reward, done, _ = env.step(action)
episode_reward += reward
# 実行結果
print("episode_reward:", episode_reward)
env.close()
CartPole では 1 step 立っていると、1の報酬(reward)が与えられます。
その総和である episode_reward は、立ち続けていた step数と等しく、最初は約10から25くらいの値になります。最大で200です(200 step立ち続けると終了になります)。
1.3 実行結果を動画で可視化
倒れて終了するまでの様子を変数frames
に格納したので、それを可視化します。
最初に可視化関数を定義し、実行します。
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import rc
def show_animation(imgs):
"""動画表示する関数。引数 imgs は ndarray (400, 600, 3) のリストを想定"""
rc("animation", html="jshtml")
fig, ax = plt.subplots(1, 1, figsize=(5, 3))
frames = []
# テキストを初期化(step 数)
text = ax.text(10, 20, "", fontsize=12, color="black")
for i, img in enumerate(imgs):
frame = [ax.imshow(img, animated=True)] # 画像を描画
frame.append(ax.text(10, 20, f"Step: {i+1}", animated=True)) # Step数表示
frames.append(frame)
ax.axis("off")
ani = animation.ArtistAnimation(fig, frames, interval=100, blit=True)
# 動画保存
ani.save("cartpole_grpo.mp4", writer="ffmpeg")
ani.save("cartpole_grpo.gif", writer="pillow")
plt.close(fig) # ここで閉じる
return ani
# 可視化の実行
show_animation(frames)
上記を実行すると、mp4ファイルとgifファイルが保存されます。そして、以下のような動画再生HTMLが表示されます。
今何step目なのかが分かりづらいので、動画再生関数内で現在のstep数を画像に追記して描画しています。
2. GRPOに必要な関数群を定義する
2.1 Policy Networkを定義
CartPoleをGoogle Colab上でひとまずは適当に動かすことができたので、次はGRPOで訓練して、立ち続けるようにします。
最初に状態 state(Cartの位置、速度、棒の角度・各速度)の4変数を入力とし、action(左に押すか右に押すか)の離散な2値(0 or 1)を出力する、Policy Neural Network(Policy Net)を用意します。
最終目的は、このPolicy Net の重みをいい感じに訓練して、Poleが立ち続けるようにすることです。
import torch
class PolicyNet(torch.nn.Module):
def __init__(self):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(4, 64)
self.fc2 = torch.nn.Linear(64, 2) # 左か右に押すので、最後は2ノード
def forward(self, state):
x = torch.nn.functional.relu(self.fc1(state))
logits = self.fc2(x)
return logits
2.2 1エピソードを実行し、各stepでのstateやactionなどを収集する関数を定義
次に1エピソード(Poleが倒れるか、200 stepが経過するまで)を実行し、各 step での状態 stateや実際に採用した行動 actionなどを格納する実行・収集関数を定義します。
def collect_trajectory(env, net):
# [0] 環境をリセット+初期変数の設定
state = env.reset()
states, log_probs,chosen_actions = [],[],[] # ここに溜めていきます
episode_reward = 0
done = False
while not done:
# [1] そのstepでの各情報を求めます
states.append(state)
logits = net(torch.from_numpy(state).float())
# logitsにsoftmaxで、行動確率を求める
probs = torch.nn.functional.softmax(logits, dim=0)
# 実行するactionを行動確率に基づき選択する
action = torch.multinomial(probs, 1).item()
# 選んだactionが選択される確率のlog値を求める
log_prob = torch.log(probs[action])
log_probs.append(log_prob.item())
# 選んだactionを格納する
chosen_actions.append(action)
# [2] CartPoleの台車にActionを適用してstateを更新する
state, reward, done, _ = env.step(action)
episode_reward += reward
# [3] エピソードの総報酬を最大step数である200で正規化します
normalized_reward = episode_reward / 200.0
return states, log_probs, chosen_actions, normalized_reward
今回重要な変更点として、報酬が各stepで配られるのではなく、1エピソードが終了した時点で、総報酬(=立ち続けたstep数)が渡される、という設定にします。
これはLLMで各単語(正確にはトークン)が生成され、最終的な文章が評価されて報酬が決定されるという、報酬環境が疎(最後にのみ与えられる)という状況に合わせた設定です。
そして報酬を正規化するために、最大step数である200で割り算しています
(※ ただし後続部分で報酬の標準化が入るので今回の場合はこの正規化は不要な気はしています)。
2.3 GRPOに基づくAdvantage計算を定義
ここが、GRPO(Group Relative Policy Optimization)の真髄です。
PPO や その他の Actor-Critic 系のアルゴリズムであれば、価値関数を求める「状態価値関数 V(state)」のNetworkを使用して、Advantage(≒そのstateでの平均価値と実際の行動の価値の差)を求めます。
しかし、GRPOは報酬のみからAdvantageを計算します。
そのため、状態価値(Value)を求めるNeural Networkが不要となります。
実際の計算方法は非常に単純で以下の通りです。
import numpy as np
def calc_advantages_with_grpo(trajectories):
"""trajectoriesから報酬のみを取り出し、各エピソードの報酬を標準化します"""
rewards = [r for o, l, a, r in trajectories] # 最終報酬を取り出して、
mean_reward = sum(rewards) / len(rewards) # 平均値求めて、
std_reward = np.std(rewards) + 1e-8 # 標準偏差求めて(1e-8は0除算対策)
# 最後に各エピソードについて標準化
advantages = [(r - mean_reward) / std_reward for r in rewards]
return advantages
単に複数試行の標準化を実施しています。結果、報酬が多かった試行は大きなAdvantageが割り当てられ、報酬が少なかった試行はマイナスのAdvantageが割り当てられます。
2.4 GRPOに基づくPolicy Netの重み更新を定義
最後に、GRPOで求めた各エピソードのAdvantageを用いて、損失を計算し、勾配法でPolicy Netの重みを更新します。
この重み更新は基本的にはPPOアルゴリズムと同じです。
実装の各行の意味をコメントに記載しています。
def grpo_update(trajectories, net, optimizer, n_iterations=20, eps=0.2):
# [1] [2-3]のGRPOの関数で、各エピソードの標準化されたAdvantageを求めます
advantages = calc_advantages_with_grpo(trajectories)
# [2] Policy NN を更新します。n_iterations回、更新をかけます
for i_iter in range(n_iterations):
loss = 0
for traj, advantage in zip(trajectories, advantages):
# 1エピソードの蓄えられた内容を取り出します
(states, log_probs, chosen_actions, _) = traj
# 1エピソードの損失を0に初期化
trajectory_loss = 0
# [3] エピソード内の各ステップについて損失(loss)を計算
for t in range(len(states)):
# 以下はPPOと同じ手続きです
# ====================================
# [3-1] 更新されたPolicyNNで、実際に step t で
# 選択された行動がどれくらい尤もらしいかを求めます
new_policy_probs = torch.nn.functional.softmax(net(torch.from_numpy(states[t]).float()), dim=0)
new_log_probs = torch.log(new_policy_probs)[chosen_actions[t]]
# [3-2] 更新前のPolicyNNで選択されたactionを選ぶ確率と、
# 更新後のNNの場合の比を求める(対数確率なので引き算してexpする)
ratio = torch.exp(new_log_probs - log_probs[t])
# [3-3] 更新後の選ばれる確率が非常に大きくなってしまうと、
# 更新しすぎなので、epsの範囲にとどめる
clipped_ratio = torch.clamp(ratio, min=1 - eps, max=1 + eps)
# [3-4] 報酬を最大化したいので、マイナスを掛け算して最小化問題に置き換える
# そして平均より良かった施行を増やし、悪かったケースを減らすように更新
trajectory_loss += -clipped_ratio * advantage
# [4] エピソードのsteps数で正規化
trajectory_loss /= len(states)
loss += trajectory_loss
# [5] エピソード数で正規化
loss /= len(trajectories)
# [6] Policy NN の重みを更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
return None # 特に何も返さない
以上の4つの関数の準備でGRPOを実施する用意が整いました。
次章では200 step立ち続けるように実際に訓練を行います。
3. GRPOでPolicy Netを更新しながら、CartPoleを実行する
3.1 初期化とハイパラ設定
# [1] 初期化と初期設定
env = gym.make('CartPole-v0')
net = PolicyNet()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
# [2] GRPO で PocicyNetを更新する際のTrajectoryを溜める回数の設定
trajectories_per_update = 5 # group size
変数 trajectories_per_update
は、何エピソード分のデータが溜まったら、GRPOでのAdvantageの計算とPolicy Netの重み更新を実施するかを設定しています。
今回は5エピソードとしています。
3.2 訓練開始(100エピソードを1トライアルとして、繰り返す)
今回は100エピソードを1つの塊とし、その中で5エピソードずつデータを溜めては重みを更新します。
そしてこの100エピソードの総報酬の平均値が195を超えた場合に訓練を終了させます。
# [3] エピソードのループ100回を、平均報酬が195を超えるまで実施
trial_num = 0 # 100回を何回tryしたかを示す
while(True):
# [4] エピソードのループ100回を開始
# trajectories_per_updateが5なので、20で100回になります
for i_episode in range(20):
trajectories, episode_rewards = [], []
# episode_rewardsには実際の総報酬を格納
# [5] GRPO で PocicyNetを更新する際のTrajectoryを溜める(エピソード5回分)
for _ in range(trajectories_per_update):
states, log_probs, chosen_actions, normalized_reward = collect_trajectory(env, net)
trajectories.append((states, log_probs, chosen_actions, normalized_reward))
episode_rewards.append(normalized_reward * 200) # 実際の総報酬を格納
# [6] GRPO で PociyNetの重みを更新
grpo_update(trajectories, net, optimizer)
# [7] 100回の平均報酬を求める
avg_reward = sum(episode_rewards) / len(episode_rewards)
trial_num+=1
# [8] 終了判定
if avg_reward > 195:
print('訓練完了です。トライアル回数: ', trial_num)
break
else:
print(f'トライアル {trial_num}回目, avg reward: {avg_reward:.2f}')
env.close()
おおよそ4トライアル、Google Colabの CPU環境では約7分で訓練が完了します。
最後に訓練後の動作を可視化して確認します。
4. 訓練後のPolicy NetでCartPoleを制御する様子を可視化する
4.1 1エピソードの実施
# [0] 環境をリセット+初期変数の設定
state = env.reset()
done = False # episodeの終わりを判定するフラグ
episode_reward = 0 # episodeの総報酬を格納する
frames = [] # 画像保存用リスト
# 倒れてdone=Trueになるまで続ける
while not done:
# [1] 画面キャプチャ
frames.append(env.render(mode="rgb_array")) # ndarray (400, 600, 3)
# [2] ActionをPolicyNetから選ぶ(左か右に押す)
logits = net(torch.from_numpy(state).float())
# logitsにsoftmaxで、行動確率を求める
probs = torch.nn.functional.softmax(logits, dim=0)
action = torch.multinomial(probs, 1).item() # 実行するactionを確率的に選択
# [3] CartPoleの台車にActionを適用してstateを更新する
state, reward, done, _ = env.step(action)
episode_reward += reward
# 実行結果
print("episode_reward:", episode_reward)
env.close()
4.2 実行結果を動画で可視化
# 可視化
show_animation(frames)
以下に実行結果のGif画像を掲載します。200 step 立ち続けています。
5. さいごに
以上、DeepSeek-R1 で有名になった GRPOの実装を Google Colab上で CartPoleタスクにて解説・例示いたしました。
本記事のJupyter Notebookへのリンクを再掲します。
タスク「CartPole版」と「Pendulum版」を用意しています。
link: GRPO_CartPole_Yutaro_Ogawa_20250208.ipynb
link: GRPO_Pendulum_Yutaro_Ogawa_20250208.ipynb
なおPendulumの場合は行動が連続になるため、Policy Netの出力を「行動値を決める正規分布の平均値(と標準偏差)」に変更しています(これはGRPOとは直接は関係ない変更です)。
本記事の実装方法は分かりやすさ優先としているため、計算効率が悪いです。
CartPoleよりも難しいタスクである Pendulum を GRPOで訓練するには、Google Colabの「T4-GPU環境」で、約50分かかりました(そして私の実装例では、訓練結果は完璧ではなく、いつも直立できるとは限りません)。
PendulumをGRPOで訓練後の様子を以下に示します。
以下の動画の長さは 200 stepです。60 step目あたりから直立します。
ぜひ上記のリンクからNotebookファイルをご自身のGoogle Colab環境でも試してみてください。
最後に雑感を。
人間の知能において、「価値関数の計算・推定部分が存在しない」ケースはほぼないのではと(私は)思います。
そのためPendulumのような、上方向に直立するには一度逆側に振って勢いをつける必要があるタイプのタスク(その他 MountainCar等)のようなものは、全時系列を一律評価する「GRPO」では解きにくいと想定しています(私は)。
上記のPendulumの試行は確かにうまく直立していますが、今回のNotebookの実装例では、成功しないケースもあります。
報酬という観点では、人間が実施するタスクの多くは、途中途中で外界から部分報酬(密な報酬)が与えられることは少なく、最終結果としてうまくできたか、できなかったかが判明し、報酬が最後にのみ発生するタスク・仕事が多いと感じます。
そのため、タスクを自分なりに分解し、分解された小タスクの途中で、自分でそこまでをうまくできているか自己判定し、自分で部分報酬のようなものを与えている行為に近い気がしています。
「GRPO」にはタスク分解や部分報酬といった概念はありませんが、最終結果のみに報酬が与えられる問題を解くという方向性は、人間の仕事・タスクと似ているように感じます。
方策の改善という観点では、人間も確かに上手くできるまでタスク(や仕事)を繰り返しますが、だからといって、価値関数的なもので途中途中に都度の状態評価をすることは少ないでしょう。
それよりも、「あ、さっきよりうまくできた」、「うーん、前々回の方がうまくいっていたな~」、といった「試行間の比較」を通じた行動改善が、直感的にはまずまず正しい気がします。
この「試行間のうまくいった度合いの比較」を直接的に実施している「GRPO」アルゴリズムは「人間の改善方法の直感と合う部分が多いな~」と私は面白く感じ、今回筆を執った次第です。
以上、本記事が読者のみなさまにとって、参考になる点がございましたら、幸甚の至りです。
長文を一読いただき、誠にありがとうございました。
小川 雄太郎
株式会社松尾研究所 シニア・リサーチャー。「知能を創る」PJTに従事
Discussion
これで学習できるのは驚きですよね。 ベースラインを工夫してKL項追加すれば様々なタスクに応用がききそうです。
余談ですがgrpo_updateをsonnetに改善させたら20倍くらい速くなりました。
Yo Mo さま
コメントありがとうございます。嬉しいです!
改善まで試してくださり、ありがとうございます。