Zhangzhe's Blog

The projection of my life.

0%

强化学习(5)——DQN的变体们

URL

TL;DR

  • Double DQN: 解决了 DQN 中的过估计问题,使用目标网络和当前网络来共同计算 Q 值,从而减少 Q 值的偏差。
  • Dueling DQN: 提出了对偶网络,将 Q 函数分为价值函数和优势函数,这样可以更好地估计状态的价值,减少动作选择的随机性。
  • Prioritized Experience Replay: 改进了经验回放机制,通过给经验赋予优先级,使得学习过程更加高效。
  • Noisy DQN: 在网络中引入噪声以探索更加多样的策略,提高探索效率。
  • C51: 传统基于 Q 值的学习方式只关注期望值不关心分布,Distributional RL 引入了基于分布的强化学习的概念。
  • Rainbow: 将上述五种方法结合起来,另外还加入了 Multi-step Learning(多步学习),六合一行成了一个强大的 DQN 变体。

Algorithm

1. Double DQN

公式表述

  • DQN 中计算目标 Q 值时,使用的是目标网络的最大 Q 值,即 yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-) ,这样会导致过估计问题
  • Double DQN 对其改进非常简单,使用 Q 值网络来选择动作,使用目标网络来评估该动作的 Q 值,即 yt=rt+1+γQ(st+1,argmaxaQ(st+1,a;θ);θ)y_t=r_{t+1}+\gamma Q(s_{t+1},\arg\max_{a'}Q(s_{t+1},a';\theta);\theta^-)

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
# DQN
q_values = self.q_network(states)
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.max(dim=1)[0] # 直接使用目标网络的最大 Q 值
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values
# Double DQN
q_values = self.q_network(states)
next_q_values = self.q_network(next_states)
next_actions = torch.argmax(next_q_values, dim=1) # 使用 Q 值网络选择动作
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(1) # 使用目标网络评估该动作的 Q 值
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values

2. Dueling DQN

公式表述

  • 状态价值函数和状态动作价值函数的定义:请参考 这里
  • 动作优势函数的定义:A(s,a)A(s,a) 表示在状态 ss 下选择动作 aa相对优势
  • DQN / Double DQN 中的 Q 函数是直接输出动作的 Q 值,这样会导致动作选择的随机性
  • Dueling DQN 提出了对偶网络,Q 函数分为价值函数和优势函数,即 Q(s,a)=V(s)+A(s,a)Q(s,a)=V(s)+A(s,a) ,这样可以更好地估计状态的价值
  • 原理也很简单:价值函数表示状态的价值,优势函数表示此状态下动作的相对优势,最后再将二者相加得到 Q
  • 实际使用时,为了保证优势函数的相对性,需要对优势函数进行零均值化处理,即 Q(s,a)=V(s)+A(s,a)1AaA(s,a)Q(s,a)=V(s)+A(s,a)-\frac{1}{|A|}\sum_{a'}A(s,a')

代码表述

1
2
3
4
5
6
# DQN
q_values = self.q_network(states)
# Dueling DQN
value = self.value_network(states)
advantage = self.advantage_network(states)
q_values = value + advantage - advantage.mean(dim=1, keepdim=True)
  • 其他部分和 Double DQN 完全一样
  • 深度学习领域有个说法:如果一个变量可以被拆成多个变量的组合,那么理论上会更好(参数量和计算量都高了,自然是好事)

3. Prioritized Experience Replay

公式表述

  • DQN 中的经验回放机制是随机采样的或顺序采样的,这样会导致一些重要的经验被忽略
  • Prioritized Experience Replay 改进了经验回放机制,通过给经验赋予优先级,使得学习过程更加高效
  • 具体做法是:根据 TD 误差(时序差分误差 Temporal Difference Error)的大小来给经验赋予优先级,然后按照优先级采样
  • 时序差分误差的定义:当前状态下预测值和实际回报之间的差异,即 TD=Rt+1+γV(St+1)V(St)TD=R_{t+1}+\gamma V(S_{t+1})-V(S_t)
    • Rt+1R_{t+1} 表示在 t+1t+1 时刻的奖励
    • V(St+1)V(S_{t+1}) 表示在 t+1t+1 时刻的状态价值
    • V(St)V(S_t) 表示在 tt 时刻的状态价值(即对状态 StS_t 未来回报的预期)
    • γ\gamma 表示折扣因子

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class SumTree:
"""
SumTree data structure to implement Prioritized Experience Replay
"""
def __init__(self, capacity):
self.capacity = capacity
self.size = 0
self.tree = np.zeros(2 * capacity - 1)
self.data = [None] * capacity
def add(self, priority, data):
# 更新 data 节点的值
self.data[self.size % self.capacity] = data
# 更新 tree 节点的值,同时更新父节点
idx = self.size % self.capacity + self.capacity - 1
change = priority - self.tree[idx]
self.tree[idx] = priority
while idx != 0:
idx = (idx - 1) // 2
self.tree[idx] += change
self.size += 1
def get(self, priority_sum):
idx = 0
while idx < self.capacity - 1:
left = 2 * idx + 1
right = left + 1
if priority_sum <= self.tree[left]:
idx = left
else:
priority_sum -= self.tree[left]
idx = right
data_idx = idx - self.capacity + 1
return self.tree[idx], self.data[data_idx]
def total_priority(self):
return self.tree[0]
class PERAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
alpha: float = 0.6, # 权重参数,决定优先级的影响
beta: float = 0.4, # 用于重要性采样的权重,随着训练增加
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.alpha = alpha
self.beta = beta
# 初始化 SumTree 作为经验池
self.memory = SumTree(memory_size)
# Q 网络和目标网络
self.q_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network.load_state_dict(self.q_network.state_dict()) # 初始化目标网络
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: Tuple[int, int, bool]) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return self.env.action_space.sample() # 探索:随机选择动作
else:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
return torch.argmax(q_values).item() # 利用:选择 Q 值最大的动作
def update(self):
"""从优先经验回放池中抽取一个批次的经验,进行 Q 网络的更新"""
if self.memory.size < self.batch_size:
return # 如果经验回放池中的样本不足一个批次,则不进行更新
batch_indices = np.random.uniform(
0, self.memory.total_priority(), size=self.batch_size
)
# 批量从 SumTree 中获取经验
batch = []
for priority in batch_indices:
_, data = self.memory.get(priority) # 获取对应的经验样本
batch.append(data)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.int64)
# 计算 Q 值:使用当前网络和目标网络
q_values = self.q_network(states)
next_q_values = self.q_network(next_states)
next_actions = torch.argmax(next_q_values, dim=1)
# 计算 next_states 对应的目标 Q 值
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(
1
)
# 计算目标 Q 值
target_q_values = (
rewards + (1 - dones) * self.discount_factor * max_next_q_values
)
# 计算当前 Q 值和目标 Q 值的损失
q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
# 计算重要性采样权重
weights = torch.tensor(
[(1.0 / self.memory.size) ** self.beta for _ in range(self.batch_size)]
)
loss = (weights * (q_value - target_q_values) ** 2).mean()
# 优化 Q 网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def store_experience(self, obs, action, reward, next_obs, done):
"""计算 TD 误差并存储到优先经验回放池"""
obs_tensor = torch.tensor([obs], dtype=torch.float32)
next_obs_tensor = torch.tensor([next_obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
next_q_values = self.q_network(next_obs_tensor)
next_actions = torch.argmax(next_q_values, dim=1)
# 计算 TD 误差
target_q_values = (
self.target_network(next_obs_tensor)
.gather(1, next_actions.unsqueeze(1))
.squeeze(1)
)
q_value = q_values.gather(
1, torch.tensor([action], dtype=torch.int64).unsqueeze(1)
).squeeze(1)
td_error = abs(target_q_values - q_value).item()
# 将经验存储到回放池,TD 误差用作优先级
self.memory.add(td_error + 1e-6, (obs, action, reward, next_obs, done))
def decay_epsilon(self):
"""逐渐减少 epsilon"""
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def update_target_network(self):
"""每隔一段时间更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
  • 代码中的 SumTree 类是一个优先级经验回放池,用于存储经验和计算优先级
  • 实现上,SumTree 由两部分组成:
    • tree:用于存储优先级,是一个完全二叉树
      • 叶子节点存储优先级,长度为 Capacity
      • 非叶子节点存储子节点的优先级之和,长度为 Capacity - 1
      • 总长度为 2 * Capacity - 1,根节点存储所有优先级之和
      • 叶子节点之间无需排序,这就是 SumTree 的优势,可以用 O(logN) 的时间复杂度实现优先级采样
    • data:用于存储经验,是一个列表
      • 长度为 Capacity,存储 tree 中叶子节点对应的经验,和 tree 中优先级一一对应,同步更新
  • SumTree 的作用:输入 [0, total_priority] 范围的均匀分布,输出非均匀的优先级采样结果
  • 在取数据时,只需要在 [0, total_priority] 范围内随机取样,然后 SumTree 的返回结果就是已经按照优先级采样的数据

4. Noisy DQN

公式表述

  • 传统 Q Learning / DQN 都采用 epsilon-greedy,即以 ϵ\epsilon 的概率随机选择动作,以 1ϵ1-\epsilon 的概率选择 Q 值最大的动作
  • 但是,这种方法有一个问题:随机性不够,可能会导致陷入局部最优解;随机性太大,会浪费太多时间在无效搜索上
  • Noisy DQN 引入了噪声网络的概念,即在网络中引入噪声以探索更加多样的策略,提高探索效率
  • 具体做法是:训练阶段在网络的全连接层中引入高斯噪声,即:

y=wx+b   =>   y=(μw+σwϵw)x+μb+σbϵby=wx+b\ \ \ =>\ \ \ y=(\mu^w+\sigma^w\odot \epsilon^w)x+\mu^b+\sigma^b\odot \epsilon^b

  • 其中,μw,μb,σw,σb\mu_w,\mu_b,\sigma_w,\sigma_b 是可训练参数,ϵw,ϵb\epsilon^w,\epsilon^b 是动态高斯噪声
  • 对于噪声的引入,有两种方式:
    • Independent Gaussian Noise(独立高斯噪声):每个权重和偏置都有都独立同分布
      • ϵwRfan_in×fan_outN(0,1)\epsilon^w\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal N(0,1)
      • ϵbRfan_outN(0,1)\epsilon^b\in\mathbb R^{fan\_out}\sim \mathcal N(0,1)
      • μinitwRfan_in×fan_outU(3fan_in,3fan_in)\mu^w_{init}\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal U(-\sqrt\frac{3}{fan\_in},\sqrt\frac{3}{fan\_in})
      • μinitbRfan_outU(3fan_in,3fan_in)\mu^b_{init}\in\mathbb R^{fan\_out}\sim \mathcal U(-\sqrt\frac{3}{fan\_in},\sqrt\frac{3}{fan\_in})
      • σinitw[0.017]fan_in×fan_out\sigma^w_{init}\in [0.017]^{fan\_in\times fan\_out}
      • σinitb[0.017]fan_out\sigma^b_{init}\in [0.017]^{fan\_out}
    • Factorized Gaussian Noise (分解高斯噪声):将 ϵw\epsilon_w 分解为两个矩阵的乘积,以减少参数量
      • ϵinRfan_inN(0,1)\epsilon^{in}\in\mathbb R^{fan\_in}\sim \mathcal N(0,1)
      • ϵoutRfan_outN(0,1)\epsilon^{out}\in\mathbb R^{fan\_out}\sim \mathcal N(0,1)
      • ϵw=ϵinϵout\epsilon^w=\epsilon^{in}\otimes\epsilon^{out}
      • ϵb=ϵout\epsilon^b=\epsilon^{out}
      • μinitwRfan_in×fan_outU(1fan_in,1fan_in)\mu^w_{init}\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal U(-\frac{1}{\sqrt {fan\_in}},\frac{1}{\sqrt{fan\_in}})
      • μinitbRfan_outU(1fan_in,1fan_in)\mu^b_{init}\in\mathbb R^{fan\_out}\sim \mathcal U(-\frac{1}{\sqrt {fan\_in}},\frac{1}{\sqrt{fan\_in}})
      • σinitw[0.5fan_in]fan_in×fan_out\sigma^w_{init}\in[\frac{0.5}{\sqrt{fan\_in}}]^{fan\_in\times fan\_out}
      • σinitb[0.5fan_in]fan_out\sigma^b_{init}\in[\frac{0.5}{\sqrt{fan\_in}}]^{fan\_out}
  • inference 阶段,直接使用 μw\mu^wμb\mu^b 进行计算,不使用 ϵ\epsilonσ\sigma

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import torch
import torch.nn as nn
class NoisyLinear(nn.Module):
def __init__(self, in_features, out_features, sigma_init=0.017):
super(NoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
# 可训练参数
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.sigma_init = sigma_init
self.reset_parameters()
def reset_parameters(self):
mu_range = 1 / torch.sqrt(torch.tensor(self.in_features, dtype=torch.float32))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_init)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_init)
def forward(self, input):
if self.training:
# 动态生成噪声 epsilon
weight_epsilon = torch.randn(self.out_features, self.in_features).to(
input.device
)
bias_epsilon = torch.randn(self.out_features).to(input.device)
weight = self.weight_mu + self.weight_sigma * weight_epsilon
bias = self.bias_mu + self.bias_sigma * bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(input, weight, bias)
class FactorisedNoisyLinear(nn.Module):
def __init__(self, in_features, out_features, sigma_zero=0.5):
super(FactorisedNoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.sigma_zero = sigma_zero / torch.sqrt(
torch.tensor(self.in_features, dtype=torch.float32)
)
# 可训练参数
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.reset_parameters()
def reset_parameters(self):
mu_range = 1 / torch.sqrt(torch.tensor(self.in_features, dtype=torch.float32))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_zero)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_zero)
def _scale_noise(self, size):
x = torch.randn(size).to(self.weight_mu.device)
return x.sign() * x.abs().sqrt()
def forward(self, input):
if self.training:
# 动态生成噪声 epsilon
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
weight_epsilon = torch.ger(epsilon_out, epsilon_in)
bias_epsilon = epsilon_out
weight = self.weight_mu + self.weight_sigma * weight_epsilon
bias = self.bias_mu + self.bias_sigma * bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(input, weight, bias)

5. C51

公式表述

  • C51 (Categorical 51 atoms) 直译是:离散 51 个原子,是 Distributional RL 的一种算法,即基于分布的强化学习。
  • 传统基于 Q 值的学习方式只关注期望值(Q 值,即未来回报的期望)不关心分布(未来回报的概率分布),C51 改进了这一点
  • C51 提出 概率密度函数 Z(s, a),即在状态 s 下选择动作 a未来回报分布
  • 同时提出了基于分布的 Bellman 方程,即:

Z(s,a)=R(s,a)+γZ(s,a)Z(s,a)=R(s,a)+\gamma Z(s',a')

  • 其中:
    • Z(s,a)Z(s,a) 是在状态 s 下选择动作 a 的未来回报分布
    • R(s,a)R(s,a) 是在状态 s 下选择动作 a 的即时回报
    • γ\gamma 是折扣因子
    • Z(s,a)Z(s',a') 是下一个状态 s' 下选择动作 a' 的未来回报分布
  • Q 函数和 Z 函数的关系:Q(s,a)=zZ(s,a)zQ(s,a)=\sum_{z\in Z(s,a)}z
  • 由于连续分布不方便计算,C51 使用离散分布来近似 Z 函数,离散分布由 N 个均匀分布的原子组成,每个原子表示一个未来的回报值和其概率,原子被定义为:

zi=vmin+iΔzΔz=VmaxVminN1z_i=v_{min}+i\Delta z\\\Delta z=\frac{V_{max}-V_{min}}{N-1}

  • 其中,vminv_{min}vmaxv_{max} 分别是未来回报的最小值和最大值,Δz\Delta z 是原子之间的间隔
  • 同时,扩展 Bellman 方程到离散分布:

T^zi=r+γzi\hat{T}z_i=r+\gamma z_i

  • 离散分布的更新需要用到投影,也就是将更新后的分布重新映射到原子上

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class C51(nn.Module):
def __init__(self, input_dim, output_dim, n_atoms, V_min, V_max):
super(C51, self).__init__()
self.n_atoms = n_atoms
self.output_dim = output_dim
self.V_min = V_min
self.V_max = V_max
self.delta_z = (V_max - V_min) / (n_atoms - 1)
self.input_dim = input_dim
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_dim * n_atoms)
def forward(self, x):
x = torch.nn.functional.one_hot(
x.to(torch.int64), num_classes=self.input_dim
).float()
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
x = x.view(-1, self.output_dim, self.n_atoms)
# shape: (batch_size, action_space, n_atoms)
return torch.softmax(x, dim=-1)
class C51Agent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
n_atoms: int = 51,
V_min: float = -10.0,
V_max: float = 10.0,
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.n_atoms = n_atoms
self.V_min = V_min
self.V_max = V_max
self.delta_z = (V_max - V_min) / (n_atoms - 1)
self.z = torch.linspace(V_min, V_max, n_atoms).to(torch.float32)
self.memory = deque(maxlen=memory_size)
self.training_error = []
self.input_dim = env.observation_space.n
self.output_dim = env.action_space.n
self.q_network = C51(
input_dim=self.input_dim,
output_dim=self.output_dim,
n_atoms=n_atoms,
V_min=V_min,
V_max=V_max,
)
self.target_network = C51(
input_dim=self.input_dim,
output_dim=self.output_dim,
n_atoms=n_atoms,
V_min=V_min,
V_max=V_max,
)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: int) -> int:
if np.random.random() < self.epsilon:
return self.env.action_space.sample()
else:
obs_tensor = torch.tensor([obs], dtype=torch.int64)
dist = self.q_network(obs_tensor)
# 用离散概率密度函数计算Q值
q_values = torch.sum(dist * self.z, dim=2)
# 选择最大Q值对应的动作
action = torch.argmax(q_values, dim=1).item()
return action
def update(self):
if len(self.memory) < self.batch_size:
return
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.int64)
actions = torch.tensor(actions, dtype=torch.long)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.int64)
dones = torch.tensor(dones, dtype=torch.float32)
batch_size = states.size(0)
dist = self.q_network(states)
dist = dist[range(batch_size), actions]
with torch.no_grad():
next_dist = self.target_network(next_states)
next_q_values = torch.sum(next_dist * self.z, dim=2)
next_actions = torch.argmax(next_q_values, dim=1)
next_dist = next_dist[range(batch_size), next_actions]
# 离散化Bellman更新
Tz = rewards.unsqueeze(1) + (
1 - dones.unsqueeze(1)
) * self.discount_factor * self.z.unsqueeze(0)
Tz = Tz.clamp(self.V_min, self.V_max)
# 计算投影
b = (Tz - self.V_min) / self.delta_z
l = b.floor().long()
u = b.ceil().long()
l[(u > 0) * (l == u)] -= 1
u[(l < (self.n_atoms - 1)) * (l == u)] += 1
m = torch.zeros(batch_size, self.n_atoms)
offset = (
torch.linspace(0, ((batch_size - 1) * self.n_atoms), batch_size)
.unsqueeze(1)
.expand(batch_size, self.n_atoms)
.long()
)
m.view(-1).index_add_(
0, (l + offset).view(-1), (next_dist * (u.float() - b)).view(-1)
)
m.view(-1).index_add_(
0, (u + offset).view(-1), (next_dist * (b - l.float())).view(-1)
)
dist = dist + 1e-8
loss = -torch.sum(m * torch.log(dist), dim=1).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.training_error.append(loss.item())
def decay_epsilon(self):
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def store_experience(self, obs, action, reward, next_obs, done):
self.memory.append((obs, action, reward, next_obs, done))
def update_target_network(self):
self.target_network.load_state_dict(self.q_network.state_dict())
  • C51 算法对 DQN 的优化实际上和 Dueling DQN 有些类似,都是将 Q 函数分解成多个部分,让其计算量和参数量更大,但不对拆解后的部分做监督,最终将所有部分合并起来成为 Q 值,依据 Q 值来选择动作

6. Rainbow

1. Multi-step Learning

  • 出自论文:《Multi-Step Reinforcement Learning: A Unifying Algorithm》
  • DQN 中的目标 Q 值是单步的,即目标 Q 值为: yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-)
  • Multi-step Learning 引入了多步学习,即目标 Q 值为: yt=rt+1+γ2rt+2+γ3rt+3+...+γNrt+N+γN+1maxaQ(st+N+1,a;θ)y_t=r_{t+1}+\gamma^2r_{t+2}+\gamma^3r_{t+3}+...+\gamma^Nr_{t+N}+\gamma^{N+1}\max_{a'}Q(s_{t+N+1},a';\theta^-)
    • 其中 NN 为多步数
    • θ\theta^- 是目标网络的参数
  • 这样可以更好地估计长期回报,减少 Q 值的偏差

2. Rainbow

  • Rainbow = DQN + Double DQN + Dueling DQN + Prioritized Experience Replay + Noisy DQN + Distributional RL + Multi-step Learning

Thought

  • 这些算法都分别从不同的角度对 DQN 进行了改进,很多都是非常小的改进,容易理解
  • DQN 系列强化学习算法只能用于离散状态动作空间,绝大多数强化学习都是连续状态动作空间,所以 DQN 系列算法的应用场景有限