Zhangzhe's Blog

The projection of my life.

0%

强化学习(1)——用REINFORCE算法训练Agent玩倒立摆游戏

URL

TL;DR

  • 本博客是从零开始学习强化学习系列的第一篇,重点在于介绍强化学习的基础概念。主要介绍了 REINFORCE 算法的基本原理,并用 REINFORCE 算法训练 Agent 玩倒立摆游戏
  • REINFORCE 算法是一种基于梯度的策略优化算法,提出时间是 1992 年,算是强化学习的基础算法之一
  • 倒立摆游戏是一个非常简单的强化学习环境,但是可以很好地展示 REINFORCE 算法的效果

Algorithm

1. 强化学习基础

AE_loop_dark.png

  • 强化学习的基本流程如上图所示,主要包括:
    1. Agent:智能体,即我们要训练的模型
    2. Environment:环境,即智能体需要与之交互的环境,比如倒立摆游戏
    3. State:状态,也被称为 Observation (观测) 即环境的状态,比如倒立摆的角度
    4. Action:动作,即智能体在某个状态下可以采取的动作,比如向左或向右
    5. Reward:奖励,即智能体在某个状态下采取某个动作后得到的奖励,比如倒立摆保持平衡时给予正奖励
    6. Policy:策略,是智能体的核心部分,即智能体在某个状态下采取某个动作的概率分布,智能体需要根据策略来选择动作

2. 倒立摆游戏

episode-episode-8000.gif

  • 上图是用强化学习实际学习得到的倒立摆游戏效果,目标推动小车让杆尽可能竖直,这个游戏在 Gymnasium 库中,被定义为:
    1. Observation SpaceBox(-inf, inf, (4,), float64),观测状态用一个长度为 4 的向量表示,每个元素的取值范围为任意实数,其中每个维度数值的含义如下:
      1. 小车的位置
      2. 小车上杆子的垂直角度
      3. 小车的速度
      4. 小车上杆子的角速度
    2. Action SpaceBox(-3.0, 3.0, (1,), float32),动作空间为 [-3, 3] 之间的一个浮点数,表示智能体推小车的力(带方向)
    3. Reward:目标是使倒立摆尽可能长时间直立(在一定角度限制内),因此,当杆直立的每个时间步都会获得 +1 的奖励
    4. Starting State:起始状态为 (0, 0, 0, 0),然后随机施加 [-0.01, 0.01] 的均匀随机噪声
    5. Episode End:一次游戏结束,判定条件为:
      1. Truncation:游戏累积 1000 个时间步
      2. Termination:状态空间中元素出现无穷 或 立杆的垂直角度大于 0.2 弧度(约 11.5 度)

3. REINFORCE算法

  • REINFORCE 算法是一种基于策略梯度的强化学习算法,其核心思想是通过采样得到的轨迹来估计策略梯度,并通过梯度上升的方法来优化策略

3.1 从公式角度讲

  • 具体步骤如下:
    1. 初始化策略:随机初始化策略参数 θ\theta
    2. 采样轨迹:在当前策略 πθ\pi_\theta 下采样一条轨迹(状态、动作、奖励在时间维度上组成的序列) τ=(s0,a0,r1,s1,a1,r2,,sT)\tau = (s_0, a_0, r_1, s_1, a_1, r_2, \ldots, s_T)
    3. 计算累积回报:对于轨迹中的每个时间步 tt,计算从时间步 tt 开始的累积回报 Gt=k=tTγktrkG_t = \sum_{k=t}^T \gamma^{k-t} r_k,其中 γ\gamma 是折扣因子
    4. 计算累积回报期望:计算轨迹中每个时间步的累积回报期望 J(θ)=Eτπθ[t=0Tlogπθ(atst)Gt]J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \log \pi_\theta(a_t | s_t) G_t \right]
    5. 更新策略参数:根据轨迹中的状态、动作和回报,计算策略梯度 θJ(θ)=Eτπθ[t=0Tθlogπθ(atst)Gt]\nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t | s_t) G_t \right],并使用梯度上升法更新策略参数 θθ+αθJ(θ)\theta \leftarrow \theta + \alpha \nabla_\theta J(\theta),其中 α\alpha 是学习率
  • 通过不断重复上述步骤,策略会逐渐优化,使得智能体在环境中的表现越来越好
  • REINFORCE 算法的优点是简单易实现,但缺点是方差较大,收敛速度较慢
  • 代码实现可以参考 Gymnasium 教程

3.2 从实现代码角度讲

  1. 构建 Policy
    1. 构成:Policy 是一个 MLP 网络
    2. 输入:State(一个长度为 4 的浮点数向量)
    3. 输出:两个标量,分别表示正态分布的均值和标准差
  2. 构建 Agent
    1. 构成:一个 Agent 包含一个 Policy 以及对此 Policy 的使用和更新方法
    2. 使用:即如何使用 Agent 根据当前状态选择动作
    3. 更新:即如何根据环境的反馈(奖励)更新 Policy 的参数
  3. 训练 AgentAgentEnv 交互):
    1. 初始化 Agent 和环境
    2. 采样轨迹:在当前策略下采样动作,形成一条轨迹
    3. 计算回报:计算轨迹中每个时间步的回报
    4. 更新策略:根据策略梯度更新策略参数
    5. 重复上述步骤直到策略收敛
      代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import random
import numpy as np
import pandas as pd
import seaborn as sns
from typing import Tuple
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import matplotlib.pyplot as plt
import gymnasium as gym
plt.rcParams["figure.figsize"] = (10, 5)
class Policy_Network(nn.Module):
"""Parametrized Policy Network."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes a neural network that estimates the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
super().__init__()
hidden_space1 = 16 # Nothing special with 16, feel free to change
hidden_space2 = 32 # Nothing special with 32, feel free to change
# Shared Network
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# Policy Mean specific Linear Layer
self.policy_mean_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
# Policy Std Dev specific Linear Layer
self.policy_stddev_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Conditioned on the observation, returns the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
x: Observation from the environment
Returns:
action_means: predicted mean of the normal distribution
action_stddevs: predicted standard deviation of the normal distribution
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
return action_means, action_stddevs
class REINFORCE:
"""REINFORCE algorithm."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes an agent that learns a policy via REINFORCE algorithm [1]
to solve the task at hand (Inverted Pendulum v4).
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.gamma = 0.99 # Discount factor
self.eps = 1e-6 # small number for mathematical stability
self.probs = [] # Stores probability values of the sampled action
self.rewards = [] # Stores the corresponding rewards
self.net = Policy_Network(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray) -> float:
"""Returns an action, conditioned on the policy and observation.
Args:
state: Observation from the environment
Returns:
action: Action to be performed
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs = self.net(state)
# create a normal distribution from the predicted
# mean and standard deviation and sample an action
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
prob = distrib.log_prob(action)
action = action.numpy()
self.probs.append(prob)
return action
def update(self):
"""Updates the policy network's weights."""
running_g = 0
gs = []
# Discounted return (backwards) - [::-1] will return an array in reverse
for R in self.rewards[::-1]:
running_g = R + self.gamma * running_g
gs.insert(0, running_g)
deltas = torch.tensor(gs)
log_probs = torch.stack(self.probs)
# Update the loss with the mean log probability and deltas
# Now, we compute the correct total loss by taking the sum of the element-wise products.
loss = -torch.sum(log_probs * deltas)
# Update the policy network
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Empty / zero out all episode-centric/related variables
self.probs = []
self.rewards = []
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
wrapped_env = gym.wrappers.RecordVideo(
env,
video_folder="./InvertedPendulum_video",
episode_trigger=lambda episode_id: episode_id % 2000 == 0,
name_prefix="episode",
)
# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
agent = REINFORCE(obs_space_dims, action_space_dims)
reward_over_episodes = []
for episode in range(total_num_episodes):
obs, info = wrapped_env.reset()
done = False
while not done:
action = agent.sample_action(obs)
obs, reward, terminated, truncated, info = wrapped_env.step(action)
agent.rewards.append(reward)
done = terminated or truncated
reward_over_episodes.append(wrapped_env.return_queue[-1])
agent.update() # 每完成一次轨迹才会更新一次策略

重点代码分析:

  1. Policy 预测采样动作的均值和标准差:
    1
    2
    3
    4
    5
    shared_features = self.shared_net(x.float())
    action_means = self.policy_mean_net(shared_features) # 直接预测采样动作的均值
    action_stddevs = torch.log(
    1 + torch.exp(self.policy_stddev_net(shared_features))
    ) # 预测采样动作的标准差,保证标准差为正
  2. 采样动作:
    1
    2
    3
    distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)  # 根据 Policy 预测的均值和标准差构建正态分布
    action = distrib.sample() # 从正态分布中采样动作
    prob = distrib.log_prob(action) # 同时计算采样动作的概率,用于后续计算策略梯度来更新策略
  3. 更新策略:
    1
    2
    3
    4
    5
    6
    7
    8
    running_g = 0
    gs = []
    for R in self.rewards[::-1]:
    running_g = R + self.gamma * running_g
    gs.insert(0, running_g)
    deltas = torch.tensor(gs) # 计算折扣累积回报
    log_probs = torch.stack(self.probs)
    loss = -torch.sum(log_probs * deltas) # 根据策略累积折扣回报和策略概率计算期望策略累积折扣期望,目标是最大化期望

最终效果:
reinforce_learning_v2.png

可以看出,Agent 在训练过程中逐渐学会了如何控制小车,使得倒立摆尽可能直立,训练 5000 步就可以将倒立摆稳定保持 200 时间步

4. 思考和尝试

  • 由于长期做有监督深度学习项目,所以会思考:如果使用深度有监督学习模型来解决倒立摆问题,会有什么不同?
  • 但直接使用深度有监督学习模型来解决倒立摆问题是不现实的,因为不管是用奖励计算损失还是用观测状态计算损失,都无法通过梯度反向传播来优化模型,因为环境并不可微
  • 环境不可微 是强化学习和深度学习的根本区别之一,那么如何解决 “深度有监督学习无法解决倒立摆问题” 呢?
  • 一个简单有效的方法是使用两个阶段的模型:
    1. 第一阶段:训练一个深度有监督学习模型,作为环境仿真器
      1. 输入:状态(观测)+ 随机动作
      2. 输出:预测的下一个状态 + 预测的奖励
      3. 监督:真实环境下,输入随机动作后的新状态和奖励
    2. 第二阶段:训练一个深度有监督学习模型,作为智能体
      1. 输入:状态(观测)
      2. 输出:动作
      3. 监督:环境仿真器(冻结)预测的下一个状态和奖励(目标是奖励尽可能高 且 立杆尽可能竖直 且 小车速度尽可能小 且 立杆线速度尽可能小)
  • 通过两个阶段的模型训练,可以将环境的不可微性质转化为可微性质,从而使用深度有监督学习模型来解决倒立摆问题
  • 实现代码:
  1. 训练环境仿真器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import random
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
class EnvPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims + action_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, obs_space_dims + reward_space_dims),
)
def forward(self, x: torch.Tensor):
return self.net(x.float())
class SupervisedAgent:
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.net = EnvPredNet(obs_space_dims, action_space_dims, reward_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
self.loss_fn = nn.MSELoss(reduce="sum")
def sample_action(self) -> float:
random_action = torch.clamp(torch.randn(1), -3, 3)
return random_action
def pred_state(self, state, action):
state = torch.tensor(np.array([state]))
state_action = torch.cat((state, action.unsqueeze(0)), dim=1)
next_state = self.net(state_action)
return next_state
def update(self, pred_state, gt_state, reward):
loss = self.loss_fn(
pred_state, torch.tensor([[*gt_state, reward]], dtype=torch.float32)
)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e4)
obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
reward_space_dims = 1
agent = SupervisedAgent(obs_space_dims, action_space_dims, reward_space_dims)
agent.net.train()
for episode in range(total_num_episodes):
state, info = wrapped_env.reset()
done = False
while not done:
action = agent.sample_action()
gt_next_state, reward, terminated, truncated, info = wrapped_env.step(action)
pred_state = agent.pred_state(state, action)
loss = agent.update(pred_state, gt_next_state, reward)
state = gt_next_state
done = terminated or truncated
print(f"Episode: {episode}, Loss: {loss}")
torch.save(agent.net.state_dict(), "env_predict_model.pth")
  1. 训练智能体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import random
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
class EnvPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims + action_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, obs_space_dims + reward_space_dims),
)
def forward(self, x: torch.Tensor):
return self.net(x.float())
class ActionPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, action_space_dims),
nn.Tanh(),
)
def forward(self, x: torch.Tensor):
action = self.net(x.float()) * 3
return action
class SupervisedAgent:
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
self.env_pred_net = EnvPredNet(obs_space_dims, action_space_dims, reward_space_dims)
self.action_pred_net = ActionPredNet(obs_space_dims, action_space_dims)
self.env_pred_net.load_state_dict(torch.load("env_predict_model.pth"))
self.env_pred_net.eval()
self.action_pred_net.train()
self.learning_rate = 1e-4
self.optimizer = torch.optim.AdamW(
self.action_pred_net.parameters(), lr=self.learning_rate
)
def get_action(self, state) -> float:
action = self.action_pred_net(state)
return action
def pred_state_reward(self, state, action):
state_action = torch.cat([torch.tensor([state]), action], dim=1)
return self.env_pred_net(state_action)
def update(self, pred_state_reward, state):
loss = (
pred_state_reward[0, 1].abs() * 10 # 立杆尽可能竖直
+ pred_state_reward[0, 2:4].abs().sum() # 小车速度和立杆角速度尽可能小
+ (pred_state_reward[0, 0] - state[0]).abs() * 0.1 # 小车位置尽可能不变
- pred_state_reward[0, 4].abs() # 奖励尽可能高
)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e4) # Total number of episodes
obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
reward_space_dims = 1
agent = SupervisedAgent(obs_space_dims, action_space_dims, reward_space_dims)
for episode in range(total_num_episodes):
state, info = wrapped_env.reset()
done = False
reward_sum = 0
while not done:
action = agent.get_action(torch.tensor([state]))
gt_next_state, reward, terminated, truncated, info = wrapped_env.step(
action.detach().numpy()[0]
)
pred_state_reward = agent.pred_state_reward(state, action)
loss = agent.update(pred_state_reward, state)
state = gt_next_state
done = terminated or truncated
reward_sum += reward
print(f"Episode: {episode}, Reward: {reward_sum}, Loss: {loss}")
torch.save(agent.action_pred_net.state_dict(), "action_predict_model.pth")

5. 总结

  • 通过结合 REINFORCE 算法和倒立摆任务,本文展示了强化学习的基本原理和具体实现。
  • 同时,提出了针对环境不可微问题的创新方法,即通过建立环境仿真器来将不可微问题转化为可微问题,从而使得深度学习能够在强化学习任务中发挥作用。
  • 有监督学习和强化学习的对比:
    • 二阶段有监督学习适合在已知环境模型的基础上,快速训练并优化策略,特别是离线学习和仿真场景。
    • 强化学习则适用于更为复杂、不确定的环境,尤其是无法精确建模的动态场景,并且可以在实时交互中自我改进。