Zhangzhe's Blog

The projection of my life.

0%

URL

TL;DR

  • 本文在 GShard 的基础上,提出了一种新的混合专家语言模型 DeepSeekMoE,通过 孤立共享专家细分专家 的方式,提高了模型性能并降低了计算复杂度。
  • 替换的也是 Transformer 中的 FFN 层。

Architecture

deepseekmoe.png

传统 MoE 模型(例如 GShard

  • 传统 MoE 如图 a 所示,核心思想是将 TransformerFFN 替换为 MoE,每个 token 通过 Gate 机制选择不同的 Expert 来处理。
  • 用公式表示为:
    • htl=i=1N(gi,tFFNi(utl))+utlh_t^l=\sum_{i=1}^{N}(g_{i,t}FFN_i(u^l_t))+u^l_t
    • gi,t={si,t,si,tTopK(sj,t1jN,K),0,otherwise,g_{i,t}=\left\{\begin{array}{ll}{s_{i,t},} & {s_{i,t}\in TopK({s_{j,t}|1\le j\le N}, K),} \\ {0,} & {otherwise,}\end{array}\right.
    • si,t=Softmax(utlTeil)s_{i,t}=Softmax({u_t^{l}}^Te_i^l)
    • 其中:
      • l 表示第 l
      • N 表示 Expert 的数量
      • K 表示每个 token 保留的 Expert 数量

细粒度 MoE 模型

  • 如图 b 所示,和传统 MoE 的区别是将专家切分的更小,专家数量更多,也可以理解为传统 MoE 中的 Expert 也是由多个 Sub-Expert 组成。
  • 用公式表示为:
    • htl=i=1mN(gi,tFFNi(utl))+utlh_t^l=\sum_{i=1}^{mN}(g_{i,t}FFN_i(u^l_t))+u^l_t
    • gi,t={si,t,si,tTopK(sj,t1jmN,mK),0,otherwise,g_{i,t}=\left\{\begin{array}{ll}{s_{i,t},} & {s_{i,t}\in TopK({s_{j,t}|1\le j\le mN}, mK),} \\ {0,} & {otherwise,}\end{array}\right.
    • si,t=Softmax(utlTeil)s_{i,t}=Softmax({u_t^{l}}^Te_i^l)
    • 其中:
      • l 表示第 l
      • N 表示 Expert 的数量
      • m 表示每个 Expert 中包含的 Sub-Expert 的数量
      • K 表示每个 token 保留的 Expert 数量

细粒度 MoE + 孤立共享专家

  • 如图 c 所示,在细粒度 MoE 的基础上,引入了 Isolated Shared Expert,这种专家不参与 Gate 选择,而是在所有 token 之间共享。
  • 用公式表示为:
    • htl=i=1KsFFNi(utl)+i=Ks+1mN(gi,tFFNi(utl))+utlh_t^l=\sum_{i=1}^{K_s}FFN_i(u^l_t)+\sum_{i=K_s+1}^{mN}(g_{i,t}FFN_i(u^l_t))+u^l_t
    • gi,t={si,t,si,tTopK(sj,tKs+1jmN,mKKs),0,otherwise,g_{i,t}=\left\{\begin{array}{ll}{s_{i,t},} & {s_{i,t}\in TopK({s_{j,t}|K_s+1\le j\le mN}, mK-K_s),} \\ {0,} & {otherwise,}\end{array}\right.
    • si,t=Softmax(utlTeil)s_{i,t}=Softmax({u_t^{l}}^Te_i^l)
    • 其中:
      • l 表示第 l
      • N 表示 Expert 的数量
      • m 表示每个 Expert 中包含的 Sub-Expert 的数量
      • K 表示每个 token 保留的 Expert 数量
      • KsK_s 表示 Isolated Shared Expert 的数量

Thoughts

  • 这篇论文名字起的有点大《迈向终极专家专业化的MoE语言模型》,但是实际上只是在 GShard 的基础上做了一些小的改进。

URL

TL;DR

  • DeepSeek 系列是国内一家名叫深度求索的科技公司推出的一种混合专家语言模型,这家公司背后是幻方量化。
  • DeepSeek-V2 的核心思想是 Multi-head Latent Attention (MLA)DeepSeekMoE 两个模块,这两个模块分别替换了传统 Transformer 层的 Attention ModuleFFN,是本博客想要探讨的重点。

Architecture

Multi-head Latent Attention (MLA)

MLA 总体计算流程

MLA.png

  • 核心思想是将 hidden feature 通过 MLP 映射到 latent space,降低计算(attention)和存储(kv cache)的复杂度。
  • 和主流的 kv cache 方案不同,MLA 只需要 cache ctKVc_t^{KV}ktRk_t^R 两部分。

MLA 详细计算流程

  • 输入:htRdh_t \in \mathbb R^d ,表示第 ttoken 在某个 attention layer 层的输入特征
  • 计算 key / value
    • ctKV=WDKVhtc_t^{KV}=W^{DKV}h_t ,其中 WDKVRdc×dW^{DKV} \in \mathbb R^{d_c\times d}DKV 表示 down-projection key value
    • ktC=WUKctKVk_t^C=W^{UK}c_t^{KV} ,其中 WUKRdhnh×dcW^{UK} \in \mathbb R^{d_hn_h\times d_c}UK 表示 up-projection keydcdhnhd_c \ll d_hn_h
    • vtC=WUVctKVv_t^C=W^{UV}c_t^{KV} ,其中 WUVRdhnh×dcW^{UV} \in \mathbb R^{d_hn_h\times d_c}UV 表示 up-projection valuedcdhnhd_c \ll d_hn_h
  • 计算 query
    • ctQ=WDQhtc_t^Q=W^{DQ}h_t ,其中 WDQRdc×dW^{DQ} \in \mathbb R^{d_c'\times d}DQ 表示 down-projection query
    • qtC=WUQctQq_t^C=W^{UQ}c_t^Q ,其中 WUQRdhnh×dcW^{UQ} \in \mathbb R^{d_hn_h\times d_c'}UQ 表示 up-projection querydcdhnhd_c' \ll d_hn_h
  • 计算 RoPE
    • RoPE 是一种在输入 attention layer 之前,对 querykeyposition encoding 的方法
    • RoPEMLA 在设计上是冲突的,因此 MLARoPE 做了一些修改,主要是:额外计算 multi-head query rotaryshared key rotary
    • [qt,1R,qt,2R,...,qt,nhR]=qtR=RoPE(WQRctQ)[q^R_{t,1},q^R_{t,2},...,q^R_{t,n_h}]=q_t^R=RoPE(W^{QR}c_t^Q) ,其中 WQRRdhRnh×dcW^{QR} \in \mathbb R^{d_h^Rn_h\times d_c'}QRQR 表示 query rotary
    • ktR=RoPE(WKRktC)k_t^R=RoPE(W^{KR}k_t^C) ,其中 WKRRdhR×dW^{KR} \in \mathbb R^{d_h^R\times d}KRKR 表示 key rotary
    • qt,i=[qt,iC,qt,iR]q_t,i=[q^C_{t,i},q^R_{t,i}] ,将 queryquery rotary 拼接,得到正式的 query
    • kt,i=[kt,iC,ktR]k_t,i=[k^C_{t,i},k^R_{t}] ,将 keykey rotary 拼接,得到正式的 key
  • 计算 attention
    • ot,i=j=1tSoftmaxj(qt,iTkj,idh+dhR)vj,iCo_{t,i}=\sum^t_{j=1}Softmax_j(\frac{q^T_{t,i}k_{j,i}}{\sqrt{d_h+d_h^R}})v^C_{j,i}
  • 计算 output
    • ut=WO[ot,1,ot,2,...,ot,nh]u_t=W^O[o_{t,1},o_{t,2},...,o_{t,n_h}] ,其中 WORd×dhnhW^O \in \mathbb R^{d\times d_hn_h}

MLA 优势

  • 大幅降低了 kv cache 的存储空间
    deepseek_mla_2.png

DeepSeekMoE

Thoughts

  • MLA 算是对 self-attention 比较深度的改进,兼顾了 kv cacheRoPE 的设计,比 MQAGQA 设计看上去更用心。
  • DeepSeekMoE 感觉在 GShard 上的改进并不大,主要是在 Expert 的基础上加入了 Sub-Expert 的概念,以及常开的 Isolated Shared Expert 设计。
  • 这二者结合,给了 DeepSeek-V2 一个很好的性能提升,可以在相同计算复杂度下,塞下更多的参数量,提高模型表现。

URL

TL;DR

  • LOMO 算法只能和 SGD 优化器联合使用,而在大模型微调阶段,SGD 优化器的实际效果通常不如 Adam 算法
  • 本文提出了 AdaLomo 算法,它将 LOMO 算法和 Adam 算法结合起来,实现了低内存占用的优化器

Algorithm

adalomo_algorithm.png

  • AdaLomo 仅仅是将 LOMOAdam 结合起来,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import math
import torch
from torch.optim import Optimizer
import torch.distributed as dist
from transformers.integrations.deepspeed import is_deepspeed_zero3_enabled
class AdaLomo(Optimizer):
"""
一个自定义的优化器类AdaLomo,用于在分布式训练中的梯度更新。
该类实现两个梯度更新函数 :meth:`fuse_update` 和 :meth:`fuse_update_zero3`,分别用于非ZeRO和ZeRO模式下的梯度更新。
:param model: 待优化的模型
:param lr: 学习率,默认值为1e-3
:param eps: 正则化系数。eps[0]防止梯度平方太小,eps[1]用于在根据参数的RMS放缩学习率时防止步长太大
:param clip_threshold: 归一化update矩阵时的阈值
:param decay_rate: 梯度平方移动平均的衰减率
:param clip_grad_norm: 梯度裁剪的范数阈值
.. note::
clip_grad_norm须为正数
:param clip_grad_value: 梯度裁剪的值域阈值
:param weight_decay: 权重衰减系数,默认值为0.0
:param loss_scale: 损失缩放系数,可以用来提高训练精度,但是太大可能会导致nan
"""
def __init__(
self,
model,
lr=1e-3,
loss_scale=2 ** 10,
eps=(1e-30, 1e-3),
clip_threshold=1.0,
decay_rate=-0.8,
clip_grad_norm=None,
clip_grad_value=None,
weight_decay=0.0,
):
self.model = model
self.lr = lr
self.clip_grad_norm = clip_grad_norm
self.clip_grad_value = clip_grad_value
self.weight_decay = weight_decay
self.loss_scale = loss_scale
if self.weight_decay > 0.0:
self.do_weight_decay = True
else:
self.do_weight_decay = False
self.eps = eps
self.step_num = 0
self.decay_rate = decay_rate
self.clip_threshold = clip_threshold
# for grad norm
if self.clip_grad_norm is not None and self.clip_grad_norm <= 0:
raise ValueError(
f"clip_grad_norm should be positive, got {self.clip_grad_norm}."
)
self.gather_norm = False
self.grad_norms = []
self.clip_coef = None
# check if zero3 is enabled
self.zero3_enabled = is_deepspeed_zero3_enabled()
if self.zero3_enabled: # zero3 is enabled
self.grad_func = self.fuse_update_zero3()
else:
self.grad_func = self.fuse_update()
self.exp_avg_sq = {}
self.exp_avg_sq_row = {}
self.exp_avg_sq_col = {}
# register hook function, which will be called through the backward process
for n, p in self.model.named_parameters():
if self.zero3_enabled:
if len(p.ds_shape) == 1:
self.exp_avg_sq[n] = torch.zeros(
p.ds_shape[0], dtype=torch.float32
).cuda()
else:
self.exp_avg_sq_row[n] = torch.zeros(
p.ds_shape[0], dtype=torch.float32
).cuda()
self.exp_avg_sq_col[n] = torch.zeros(
p.ds_shape[1], dtype=torch.float32
).cuda()
else:
if len(p.data.shape) == 1:
self.exp_avg_sq[n] = torch.zeros(
p.data.shape[0], dtype=torch.float32
).cuda()
else:
self.exp_avg_sq_row[n] = torch.zeros(
p.data.shape[0], dtype=torch.float32
).cuda()
self.exp_avg_sq_col[n] = torch.zeros(
p.data.shape[1], dtype=torch.float32
).cuda()
if p.requires_grad:
p.register_hook(self.grad_func)
defaults = dict(
lr=lr,
eps=eps,
weight_decay=weight_decay,
clip_grad_norm=clip_grad_norm,
clip_grad_value=clip_grad_value,
)
super(AdaLomo, self).__init__(self.model.parameters(), defaults)
@staticmethod
def _approx_sq_grad(exp_avg_sq_row, exp_avg_sq_col):
# copy from fairseq's adafactor implementation:
# https://github.com/huggingface/transformers/blob/8395f14de6068012787d83989c3627c3df6a252b/src/transformers/optimization.py#L505
r_factor = (
(exp_avg_sq_row / exp_avg_sq_row.mean(dim=-1, keepdim=True))
.rsqrt_()
.unsqueeze(-1)
)
c_factor = exp_avg_sq_col.unsqueeze(-2).rsqrt()
return torch.mul(r_factor, c_factor)
@staticmethod
def _rms(tensor):
return tensor.norm(2) / (tensor.numel() ** 0.5)
def fuse_update(self):
"""
在非ZeRO模式下更新模型参数的梯度。
:return: func,一个闭包函数,用于更新模型参数的梯度
"""
def func(x):
"""
闭包函数,用于更新模型参数的梯度。
"""
with torch.no_grad():
for n, p in self.model.named_parameters():
if p.requires_grad and p.grad is not None:
grad_fp32 = p.grad.to(torch.float32)
p.grad = None
if self.loss_scale:
grad_fp32.div_(self.loss_scale)
if self.gather_norm:
# we adopt two backward pass for gradient norm computation and parameter update, respectively.
self.grad_norms.append(torch.norm(grad_fp32, 2.0))
else:
# grad clip or norm
if (
self.clip_grad_value is not None
and self.clip_grad_value > 0
):
# Clipping gradients by their value
grad_fp32.clamp_(
min=-self.clip_grad_value, max=self.clip_grad_value
)
if (
self.clip_grad_norm is not None
and self.clip_grad_norm > 0
and self.clip_coef is not None
):
# Normalize the gradient according to its norm (computed in another pass)
grad_fp32.mul_(self.clip_coef)
beta2t = 1.0 - math.pow(self.step_num, self.decay_rate)
update = (grad_fp32 ** 2) + self.eps[0]
if len(p.data.shape) > 1:
self.exp_avg_sq_row[n].mul_(beta2t).add_(
update.mean(dim=-1), alpha=1.0 - beta2t
)
self.exp_avg_sq_col[n].mul_(beta2t).add_(
update.mean(dim=-2), alpha=1.0 - beta2t
)
update = self._approx_sq_grad(
self.exp_avg_sq_row[n], self.exp_avg_sq_col[n]
)
update.mul_(grad_fp32)
else:
self.exp_avg_sq[n].mul_(beta2t).add_(
update, alpha=1.0 - beta2t
)
update = self.exp_avg_sq[n].rsqrt().mul_(grad_fp32)
update.div_(
(self._rms(update) / self.clip_threshold).clamp_(
min=1.0
)
)
p_fp32 = p.data.to(torch.float32)
p_rms = torch.norm(p_fp32, 2.0) / math.sqrt(p.numel())
lr = self.lr
param_scale = max(self.eps[1], p_rms)
lr = lr * param_scale
if self.do_weight_decay:
p_fp32.mul_(1.0 - lr * self.weight_decay)
p_fp32.add_(update, alpha=-lr)
p.data.copy_(p_fp32)
return x
return func
def fuse_update_zero3(self):
"""
在ZeRO模式下更新模型参数的梯度。
:return: func,一个闭包函数,用于更新模型参数的梯度。
"""
def func(x):
with torch.no_grad():
for n, p in self.model.named_parameters():
if p.grad is not None:
torch.distributed.all_reduce(
p.grad, op=torch.distributed.ReduceOp.AVG, async_op=False
)
grad_fp32 = p.grad.to(torch.float32)
p.grad = None
if self.loss_scale:
grad_fp32.div_(self.loss_scale)
if self.gather_norm:
# we adopt two backward pass for gradient norm computation and parameter update, respectively.
self.grad_norms.append(torch.norm(grad_fp32, 2.0))
else: # update param
partition_size = p.ds_tensor.numel()
start = partition_size * self.dp_rank
end = min(start + partition_size, grad_fp32.numel())
if self.clip_grad_value is not None:
# Clipping gradients by their value
grad_fp32.clamp_(
min=-self.clip_grad_value, max=self.clip_grad_value
)
if (
self.clip_grad_norm is not None
and self.clip_grad_norm > 0
and self.clip_coef is not None
):
# Normalize the gradient according to its norm (computed in another pass)
grad_fp32.mul_(self.clip_coef)
beta2t = 1.0 - math.pow(self.step_num, self.decay_rate)
update = (grad_fp32 ** 2) + self.eps[0] # 改成addcmul_
if len(p.ds_shape) > 1:
self.exp_avg_sq_row[n].mul_(beta2t).add_(
update.mean(dim=-1), alpha=1.0 - beta2t
)
self.exp_avg_sq_col[n].mul_(beta2t).add_(
update.mean(dim=-2), alpha=1.0 - beta2t
)
update = self._approx_sq_grad(
self.exp_avg_sq_row[n], self.exp_avg_sq_col[n]
)
update.mul_(grad_fp32)
else:
self.exp_avg_sq[n].mul_(beta2t).add_(
update, alpha=1.0 - beta2t
)
update = self.exp_avg_sq[n].rsqrt().mul_(grad_fp32)
update.div_(
(self._rms(update) / self.clip_threshold).clamp_(
min=1.0
)
)
one_dim_update = update.view(-1)
partitioned_update = one_dim_update.narrow(
0, start, end - start
)
param_fp32 = p.ds_tensor.to(torch.float32)
partitioned_p = param_fp32.narrow(0, 0, end - start)
p_rms = torch.norm(partitioned_p, 2.0) ** 2
dist.all_reduce(p_rms, op=torch.distributed.ReduceOp.SUM)
p_rms = (p_rms / p.ds_numel).sqrt()
lr = self.lr
param_scale = max(self.eps[1], p_rms)
lr = lr * param_scale
if self.do_weight_decay:
partitioned_p.mul_(1.0 - lr * self.weight_decay)
partitioned_p.add_(partitioned_update, alpha=-lr)
p.ds_tensor[: end - start] = partitioned_p
return x
return func
def fused_backward(self, loss, lr):
"""
执行一步反向传播并更新模型的梯度。
:param loss: 模型的loss值
:param lr: 学习率
"""
self.lr = lr
if self.loss_scale:
loss = loss * self.loss_scale
self.step_num += 1
loss.backward()
# update the last parameter since the last parameter in the computaiton graph is not ready when calling hook functions
# the argument of grad_func is just a placeholder, and it can be anything.
self.grad_func(0)
def grad_norm(self, loss):
"""
计算梯度的范数。
:param loss: 模型的loss值
"""
self.gather_norm = True
self.grad_norms = []
if self.loss_scale:
loss = loss * self.loss_scale
loss.backward(retain_graph=True)
# update the last parameter since the last parameter in the computaiton graph is not ready when calling hook functions
# the argument of grad_func is just a placeholder, and it can be anything.
self.grad_func(0)
with torch.no_grad():
# The norm is computed over all gradients together, as if they were
# concatenated into a single vector. Gradients are modified in-place.
self.grad_norms = torch.stack(self.grad_norms)
total_norm = torch.norm(self.grad_norms, 2.0)
self.clip_coef = float(self.clip_grad_norm) / (total_norm + 1e-6)
self.clip_coef = torch.clamp(self.clip_coef, max=1.0)
self.gather_norm = False

Thoughts

  • LOMO 只能用 SGD 优化器这一点限制了它的使用场景,因此 AdaLomo 则将 LOMOAdam 结合起来,可以适应主流大模型的微调场景
  • AdaLomo 需要存储 Adam 的一阶和二阶动量,这又带来了额外的内存开销,感觉和 LOMO 的初衷有些违背

URL

TL;DR

  • 本文提出一种名为 LOMO 的大模型微调方法,它在梯度计算和优化过程中的内存消耗方面进行了精细的优化设计,使得有限资源的设备能够实现对大型语言模型的全参数微调。

Algorithm

  • LOMO 的全称是 LOw Memory Optimizer,它的核心思想是:
    1. SGD 代替 Adam 优化器
    2. 融合梯度计算和梯度更新
    3. 拆分全局梯度和局部梯度

1. 优化器

  • Adam 优化器比 SGD 优化器更消耗显存
  • LOMO 认为 SGD 优化器在大模型微调任务中已足够

2. 融合梯度计算和梯度更新

  • 传统深度学习框架中,梯度计算和梯度更新是分开的,即先将所有层的梯度计算完毕后,再统一进行梯度更新
  • 这样做的问题是,需要保存所有层的梯度,占用大量显存
  • LOMO 提出了一种融合梯度计算和梯度更新的方法,即每计算完一层的梯度,就立即进行梯度更新
    LOMO.png

3. 拆分全局梯度和局部梯度

  • 在大模型微调任务中,有一个常用的稳定训练的策略是在模型梯度上加入梯度裁剪,梯度裁剪方式一般有两种:
    • Clipping Gradient Value:直接裁剪梯度到一个固定范围
    • Clipping Gradient Norm:裁剪梯度的范数到一个固定值
  • Clipping Gradient Value 无需全局梯度
  • Clipping Gradient Norm 需要全局梯度信息,这和 LOMO 的设计相违背,因此 LOMO 不得不单独处理这种情况
  • 具体的处理方式是,如果模型训练过程中需要 Clipping Gradient Norm,则 每个 Step 做两次 forward - backward
    • 第一次用于计算全局梯度的总范数,根据总范数和设定的阈值计算裁剪比例
    • 第二次做真正的 backward,并在 backward 过程中对梯度进行裁剪 + 局部参数更新

4. 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class LOMO(Optimizer):
"""
一个自定义的优化器类LOMO,用于在分布式训练中的梯度更新。
该类实现两个梯度更新函数 :meth:`fuse_update` 和 :meth:`fuse_update_zero3`,分别用于非ZeRO和ZeRO模式下的梯度更新。
:param model: 待优化的模型
:param lr: 学习率,默认值为1e-3
:param clip_grad_norm: 梯度裁剪的范数阈值
.. note::
clip_grad_norm须为正数
:param clip_grad_value: 梯度裁剪的值域阈值
"""
def __init__(self, model, lr=1e-3, clip_grad_norm=None, clip_grad_value=None):
self.model = model
self.lr = lr
self.local_rank = int(os.environ["LOCAL_RANK"])
self.world_size = dist.get_world_size()
self.clip_grad_norm = clip_grad_norm
self.clip_grad_value = clip_grad_value
# for grad norm
if self.clip_grad_norm is not None and self.clip_grad_norm <= 0:
raise ValueError(
f"clip_grad_norm should be positive, got {self.clip_grad_norm}."
)
self.gather_norm = False
self.grad_norms = []
self.clip_coef = None
# check if zero3 is enabled
p0 = list(self.model.parameters())[0]
if hasattr(p0, "ds_tensor"): # zero3 is enabled
self.grad_func = self.fuse_update_zero3()
else:
self.grad_func = self.fuse_update()
# check if fp16 is enabled
if p0.dtype == torch.float16:
self.loss_scaler = DynamicLossScaler(
init_scale=2 ** 16,
) # TODO: add args
if self.clip_grad_norm is None:
raise ValueError(
"Loss scaling is recommended to be used with grad norm to get better performance."
)
else:
self.loss_scaler = None
# register hook function, which will be called through the backward process
for n, p in self.model.named_parameters():
if p.requires_grad:
p.register_hook(self.grad_func)
defaults = dict(
lr=lr, clip_grad_norm=clip_grad_norm, clip_grad_value=clip_grad_value
)
super(LOMO, self).__init__(self.model.parameters(), defaults)
def fuse_update(self):
"""
在非ZeRO模式下更新模型参数的梯度。
:return: func,一个闭包函数,用于更新模型参数的梯度
"""
def func(x):
"""
闭包函数,用于更新模型参数的梯度。
"""
with torch.no_grad():
for n, p in self.model.named_parameters():
if p.requires_grad and p.grad is not None:
if self.loss_scaler:
if (
self.loss_scaler.has_overflow_serial
or self.loss_scaler._has_inf_or_nan(p.grad)
):
# if the overflow is detected, drop the gradient
p.grad = None
self.loss_scaler.has_overflow_serial = True
break
grad_fp32 = p.grad.to(torch.float32)
p.grad = None
if self.loss_scaler:
grad_fp32.div_(self.loss_scaler.loss_scale)
if self.gather_norm:
# we adopt two backward pass for gradient norm compuation and parameter update, respectively.
self.grad_norms.append(torch.norm(grad_fp32, 2.0))
else:
if (
self.clip_grad_value is not None
and self.clip_grad_value > 0
):
# Clipping gradients by their value
grad_fp32.clamp_(
min=-self.clip_grad_value, max=self.clip_grad_value
)
if (
self.clip_grad_norm is not None
and self.clip_grad_norm > 0
and self.clip_coef is not None
):
# Normalize the gradient according to its norm (computed in another pass)
grad_fp32.mul_(self.clip_coef)
p_fp32 = p.data.to(torch.float32)
p_fp32.add_(grad_fp32, alpha=-self.lr)
p.data.copy_(p_fp32)
return x
return func
def fuse_update_zero3(self):
"""
在ZeRO模式下更新模型参数的梯度。
:return: func,一个闭包函数,用于更新模型参数的梯度。
"""
def func(x):
with torch.no_grad():
for n, p in self.model.named_parameters():
if p.grad is not None:
torch.distributed.all_reduce(
p.grad, op=torch.distributed.ReduceOp.AVG, async_op=False
)
if self.loss_scaler:
if (
self.loss_scaler.has_overflow_serial
or self.loss_scaler._has_inf_or_nan(p.grad)
):
# if the overflow is detected, drop the gradient
p.grad = None
self.loss_scaler.has_overflow_serial = True
break
grad_fp32 = p.grad.to(torch.float32)
p.grad = None
param_fp32 = p.ds_tensor.to(torch.float32)
if self.loss_scaler:
grad_fp32.div_(self.loss_scaler.loss_scale)
if self.gather_norm:
# we adopt two backward pass for gradient norm compuation and parameter update, respectively.
self.grad_norms.append(torch.norm(grad_fp32, 2.0))
else: # update param
one_dim_grad_fp32 = grad_fp32.view(-1)
partition_size = p.ds_tensor.numel()
start = partition_size * self.local_rank
end = min(start + partition_size, grad_fp32.numel())
partitioned_grad_fp32 = one_dim_grad_fp32.narrow(
0, start, end - start
)
if self.clip_grad_value is not None:
# Clipping gradients by their value
partitioned_grad_fp32.clamp_(
min=-self.clip_grad_value, max=self.clip_grad_value
)
if (
self.clip_grad_norm is not None
and self.clip_grad_norm > 0
and self.clip_coef is not None
):
# Normalize the gradient according to its norm (computed in another pass)
partitioned_grad_fp32.mul_(self.clip_coef)
partitioned_p = param_fp32.narrow(0, 0, end - start)
partitioned_p.add_(partitioned_grad_fp32, alpha=-self.lr)
p.ds_tensor[: end - start] = partitioned_p
return x
return func
def fused_backward(self, loss, lr):
"""
执行一步反向传播并更新模型的梯度(真正计算梯度和更新参数)。
:param loss: 模型的loss值
:param lr: 学习率
"""
self.lr = lr
# Users need call grad_norm themselves and then call backward_step
if (
self.clip_grad_norm is not None
and self.clip_grad_norm > 0
and self.clip_coef is None
):
raise ValueError(
"clip_grad_norm is not None, but clip_coef is None. "
"Please call optimizer.grad_norm() before optimizer.fused_backward()."
)
if self.loss_scaler:
loss = loss * self.loss_scaler.loss_scale
loss.backward()
# update the last parameter since the last parameter in the computaiton graph is not ready when calling hook functions
# the argument of grad_func is just a placeholder, and it can be anything.
self.grad_func(0)
def grad_norm(self, loss):
"""
计算梯度的范数(虽然做了一次 forward + backward,但实际上只是用于计算梯度的范数,后续做 clip grad norm 用)。
:param loss: 模型的loss值
"""
self.gather_norm = True
self.grad_norms = []
if self.loss_scaler:
self.loss_scaler.has_overflow_serial = False
loss = loss * self.loss_scaler.loss_scale
loss.backward(retain_graph=True)
# update the last parameter since the last parameter in the computaiton graph is not ready when calling hook functions
# the argument of grad_func is just a placeholder, and it can be anything.
self.grad_func(0)
if self.loss_scaler and self.loss_scaler.has_overflow_serial:
self.loss_scaler.update_scale(overflow=True)
with torch.no_grad(): # clear gradients
for n, p in self.model.named_parameters():
p.grad = None
return
with torch.no_grad():
# The norm is computed over all gradients together, as if they were
# concatenated into a single vector. Gradients are modified in-place.
self.grad_norms = torch.stack(self.grad_norms)
total_norm = torch.norm(self.grad_norms, 2.0)
self.clip_coef = float(self.clip_grad_norm) / (total_norm + 1e-6)
self.clip_coef = torch.clamp(self.clip_coef, max=1.0)
self.gather_norm = False
  • 以上代码是 LOMO 核心优化器的实现,来自于官方 Github 仓库
  • LOMO 可以和 DeepSpeed 一起使用,实现更好的性能

Thoughts

  • LOMO 原创性的东西感觉不多,很多优化技巧都是借鉴了其他优化方法,比如:
    • 混合精度训练
    • Activation Checkpointing 来做 Activation 的重计算
  • 不过确实是工程上的一个很好的实践

URL

TL;DR

  • BAdam 全称是 Blockwise Adam,是一种内存高效的大型语言模型参数优化方法。
  • BAdam 的核心思想是将模型参数分块,每次只更新一个块的参数,而且对每个块用 Adam 连续更新 K 次,然后再更新下一个块。

Algorithm

BAdam.png

  • BAdam 的算法流程如上图所示,其中 K 是一个超参数,表示每个块的参数用 Adam 更新次数。
  • 当一个块的参数更新 K 次后,就丢掉 这个块的所有优化器信息,包括梯度、一阶动量、二阶动量
  • 假设模型有 M billion 参数,最低内存占用:
    • Adam
      • 参数:2M GBfp16
      • 优化器:
        • 梯度:4M GBfp32
        • 参数:4M GBfp32
        • 一阶动量:4M GBfp32
        • 二阶动量:4M GBfp32
      • 总共:18M GB
    • BAdam
      • 参数:2M GBfp16
      • 优化器:
        • 梯度:4MD\frac{4M}{D} GBfp32
        • 参数:4MD\frac{4M}{D} GBfp32
        • 一阶动量:4MD\frac{4M}{D} GBfp32
        • 二阶动量:4MD\frac{4M}{D} GBfp32
      • 总共:2M+16MD2M+\frac{16M}{D} GB,其中 D 是块的数量。

Thoughts

  • BAdam 的核心思想是将模型参数分块更新,但实际上和 Adam 的思想是 完全不一样
  • 因为 Adam 想要追求的是全训练过程的步长自适应,即每一个 step 的步长都来自于只有所有历史信息。
  • BAdam 只能保证当前步长最多由之前 Kstep 的信息决定,所以 BAdam 的收敛性和 Adam 是不一样的。

URL

TL;DR

  • 本文提出一种梯度低秩映射方法,和 LoRA 思想类似,但支持全参数优化,无需冻结模型也无需额外的训练参数,只需在梯度更新时进行低秩映射,从而在训练时节省显存。
  • 由于优化器状态是大模型微调过程中的主要显存消耗,因此本算法本质是在优化器中记录梯度矩阵的低秩近似,从而减少显存消耗。

Algorithm

总体流程

galore.png

  1. 计算参数的梯度 GRm×nG\in\mathbb R^{m\times n}
  2. 计算梯度的低秩近似(通常用 SVD 奇异值分解法), GUVT,URm×r,VRn×rG\approx UV^T,U\in \mathbb R^{m\times r},V\in \mathbb R^{n\times r}
  3. 记录低秩近似的优化器状态信息,例如:
    1. UUVV
    2. UUVV 的动量信息
    3. UUVV 的二阶动量信息
  4. 将低秩近似的梯度 UVTUV^T 合并变成近似的梯度 GG',并用 GG' 更新参数
  5. 由于梯度具有一定程度的稳定性,为了节省奇异值分解带来的损失,UUVV 不会在每次迭代中都重新计算,而是每隔 TT 次迭代更新一次

用公式表述

  1. 分解梯度 GGUUVV 的乘积
    1. GRm×nG\in \mathbb R^{m\times n}
    2. U,Σ,VT=SVD(G)U,\Sigma,V^T=SVD(G)
    3. URm×m,VRn×n,ΣRmU\in \mathbb R^{m\times m},V\in \mathbb R^{n\times n},\Sigma\in\mathbb R^m
    4. rr 是低秩近似的秩
    5. Ur=U[:,:r],Vr=V[:,:r],UrRm×r,VrRn×rU_r=U[:,:r],V_r=V[:,:r],U_r\in \mathbb R^{m\times r},V_r\in \mathbb R^{n \times r}
    6. Ur=UrΣU_r = U_r \Sigma
    7. G=UrVrTG' = U_r V_r^T
  2. 记录优化器状态信息,以 Adam 优化器为例
    1. mtU=β1mt1U+(1β1)Ur ,  mtV=β1mt1V+(1β1)Vrm_t^U = \beta_1 m_{t-1}^U + (1-\beta_1)U_r\ ,\ \ m_t^V = \beta_1 m_{t-1}^V + (1-\beta_1)V_r
    2. vtU=β2vt1U+(1β2)Ur2 ,  vtV=β2vt1V+(1β2)Vr2v_t^U = \beta_2 v_{t-1}^U + (1-\beta_2)U_r^2\ ,\ \ v_t^V = \beta_2 v_{t-1}^V + (1-\beta_2)V_r^2
    3. mtU=mtU/(1β1t) ,  mtV=mtV/(1β1t)m_t^U = m_t^U/(1-\beta_1^t)\ ,\ \ m_t^V = m_t^V/(1-\beta_1^t)
    4. vtU=vtU/(1β2t) ,  vtV=vtV/(1β2t)v_t^U = v_t^U/(1-\beta_2^t)\ ,\ \ v_t^V = v_t^V/(1-\beta_2^t)
  3. 计算低秩近似梯度并更新参数
    1. G=U×mtVvtV+ϵ+V×mtUvtU+ϵG'=U\times \frac{m_t^V}{\sqrt{v_t^V}+\epsilon} + V\times \frac{m_t^U}{\sqrt{v_t^U}+\epsilon}
    2. ΘΘηG\Theta \leftarrow \Theta - \eta G'
    3. Θ\Theta 是模型参数,η\eta 是学习率
  4. 更新分解矩阵 UUVV
    1. 每隔 TT 次迭代更新一次,也就是说 GGUUVV 可能不是同一个时间步的,GG 是当前时间步的梯度,而 UUVV 是之前更新的
    2. U,Σ,VT=SVD(G)U,\Sigma,V^T=SVD(G)
    3. Ur=U[:,:r],Vr=V[:,:r],UrRm×r,VrRn×rU_r=U[:,:r],V_r=V[:,:r],U_r\in \mathbb R^{m\times r},V_r\in \mathbb R^{n \times r}
  5. 正交化和秩调整(可选)
    1. 为了保持 UUVV 各自的正交性,可以对 UUVV 分别进行正交化处理
    2. UTU=I,VTV=IU^TU=I,V^TV=I
    3. 动态调整秩 rr 以适应训练过程中的梯度变化

Thought

  • 听上去很像 LoRA,终究是效果和显存占用的平衡问题。
  • 大模型微调能改的东西不多,所以传统机器学习用到的一些算法又可以在大模型炒炒冷饭了…

URL

TL;DR

  • 本文是上手训练大模型系列的第二篇,透过 LlamaFactory 视角审视大模型微调全流程,包括:SFTRLHFDPO
  • LlamaFactory 是目前社区内最受欢迎的开源的大模型全流程微调工具,用低代码的方式支持 100+ 大模型微调任务。
  • LlamaFactory 集成了 PytorchtransformersTRLPEFT 等主流框架。

分层看待大模型微调

llamafactory_1.png

  • LlamaFactory 认为,大模型微调框架主要由三部分组成:
    • Model loader:负责操纵大模型完成微调任务,模型结构包括 Large Language ModelVision Language Model 等。
    • Data worker:通过精心设计的数据处理流程,处理不同任务的数据,并支持单轮、多轮对话。
    • Trainer:实施不同的训练/微调策略和方法。

Model loader

  • Model loader 主要分为五个部分:
    • 模型初始化:用 transformers 库加载预训练模型,具体来说:
      • AutoModelForVision2Seq 加载 VLM 模型
      • AutoModelForCausalLM 加载其他类型的模型
      • AutoTokenizer 加载 tokenizer
      • 对于词表大小超过 embedding 层尺寸的模型,需要对 embedding 层进行 resize,并填充噪声
      • 处理 RoPEscaling factor
    • 模型修补:适配不同代码结构,包括:
      • S2S^2 attention:用 Monkey Patching 方式实现
      • flash attention:用 transformers 库实现
      • MoE:为了防止 DeepSpeed ZeRO stage-3 过度分配动态层,将 MoE 层设置为叶子模块
    • 模型量化
      • LLM.int8() 动态量化到 4 / 8 bits 可以由 bitsandbytes 库实现
      • 对于 4bits 量化,用 QLoRA 中的 NF4 数据类型 + 两次量化
      • 支持由 GPTQ / AWQ / AQLM 等量化方法 PTQ 量化后的模型的微调,但只支持 Adapter-base 的微调而不是 weight 直接微调
    • 附加适配器
      • PEFT 库提供了非常方便的适配器添加方法,例如 LoRA / rsLoRA / DoRA / PiSSA 等,只需要替换后端为 Unsloth 库做训练加速即可
      • 为了做 RLHF,需要在模型 head 层添加一个 value head,将每一个 token 映射为一个标量值
    • 精度自适应:为不同的设备选择不同的精度类型,例如:
      • 对于 NVIDIA 设备,如果计算能力大于 8.0,可以选择 BF16 (Brain Float 16) 精度(这里的计算能力是 NVIDIA 标记 GPU 硬件功能的指标,A100 GPU 计算能力是 8H100 GPU 计算能力是 8.9),否则选择 FP16 精度
      • 对于 NPUAMD GPU 设备,可以选择 FP16 精度
      • 对于没有 CUDA 的设备,需选择 FP32 精度
      • 对于混合精度训练,强制所有可训练参数类型为 FP32,目的是为了训练的稳定性
      • 对于半精度训练,则设置所有可训练参数类型为 BF16 (Brain Float 16)

Data worker

  • Data worker 主要由四部分组成:
    • 数据加载:用 HuggingFace 开源的 Datasets 库加载数据集,这个库支持远程数据读取或本地数据读取,并采用 Arrow 格式存储数据,大幅降低数据的存储开销
    • 数据对齐:将不同数据集类型(例如:Plain textAlpaca-like dataShareGPT-like data)对齐到标准格式(LLAMAFACTORY 定义)
      llamafacotry_2.png
    • 数据合并:数据合并是将不同的数据集合并到一起,由于数据已经对齐,所以合并相对容易
      • 对于非流式数据,可以直接在 shuffle 之前合并
      • 对于流式数据,需要在不同数据集之间交替读取
    • 数据预处理
      • 目前大模型的主要应用方向是 Chat,因此 LLAMAFACTORY 提供了许多 Chat template,可自动化根据模型类型匹配对应的模板。
      • 默认只对生成内容监督,不监督 prompt (典型 SFT 模式)
      • 可选用 Squence packing 技术,将多组训练数据打包成一个 batch,提高训练效率

Trainer

  • 高效训练
    • 支持多种微调方法,包括:LoRA / LoRA+ / Galore / BAdam
    • 使用 transformers 库做 pre-trainSFT
    • 使用 TRL 库做 RLHFDPO,以及高级表现优化方法(例如:KTO / ORPO 等),并为这些算法提供了特殊的数据整理流程(通常需要 2n 个样本,其中 nchosen examplesnreject examples
  • 模型共享的 RLHF
    • 传统 RLHF 过程非常复杂,因为需要四个模型:
      • Policy model / Base model:用于生成 action,是待优化的模型
      • Reward model:用于评估 action 的好坏,由人类偏好数据训练,输入是策略模型的回答或一对 chosen / reject 回答,输出是一个分数或更优回答的类别
      • Reference model / Frozen model:基准模型,是 Policy model 的冻结副本,用于计算策略模型的优势,即奖励信号需要减去基准奖励,以减少训练的不稳定性
      • Value model / Critic model:评估策略模型的预期回报,输入是环境状态(即用户输入)和策略模型的回答,输出是预期的回报,可以和策略模型共享参数也可以是单独的模型
    • 本文提出一种四合一的 RLHF 架构,具体步骤如下:
      1. SFT 模型基础上添加要给 Adapter 得到 Policy model
      2. SFT 模型上添加一个 Adapter,并将自带的 Vocab head 替换为输出标量的 Value head,在人类偏好数据上训练,得到 Reward model
      3. Policy model 的基础上替换替换 Vocab head 为一个输出标量的 Value head 其他参数和 Policy model 共享,得到 Value model
      4. 再结合冻结的 SFT 模型作为 Reference model
      5. 即:
        1. 策略模型:SFT model + Adapter 1
        2. 价值模型:SFT model + Adapter 1 + Value head 1除了 head 之外都和策略模型完全共享
        3. 奖励模型:SFT model + Adapter 2 + Value head 2
        4. 基准模型:SFT model
        5. 这里提到的 Adapter 可理解为 LoRA 等适配器训练得到的一组参数,附加在原始模型参数上,在 PEFT 库中可用 set_adapter / disable_adapter 等方法灵活控制
  • 分布式训练:主要借助 DeepSpeed 库实现

其他部分

  • 模型推理:用 transformersvLLM 库实现
  • 模型评估:支持多选任务(例如:MMLU / CMMLU / C-Eval)和计算文本相似度分数任务(例如:BLEU-4 / ROUGE

Thoughts

  • LLAMAFACTORY 主要面向大模型微调而不是预训练,确实做了很多工程化的工作,使得不同预训练模型 + 不同数据集 + 不同微调方法的组合变得更加容易,github 上恐怖的 star 数也是有目共睹
  • 虽然是基于多个基础库的封装,但想要做到如此必须有及其广阔的大模型知识面以及对其进行抽象的能力,这一点是难得可贵的
  • 不过随着大模型范式的不断更新,维护一套这样的工程是非常有挑战性的

TL;DR

  • 本文是上手训练大模型系列的第一篇,主要介绍了如何用指令微调数据集去微调一个大模型。
  • 为了快速走通流程,本文选择了较小的指令微调数据集和较小的模型。

详细介绍

SFT

  • SFT 全称是 Supervised Fine-Tuning,监督微调,是预训练大模型在变成实际可用模型必须经过的一个步骤。
  • 训练的基础配置(例如:optimizerlearning reteweight decay)等一般都是从预训练模型继承过来的。
  • 但实际上训练范式已经完全变化了,从无监督的预训练变成了有监督的微调,具体来说:
    • 预训练过程中,模型只需要预测下一个 token,直到输出 EOS 或者达到最大长度。
    • Instruct-SFT 阶段,模型的输入是一条指令(通常是 Instruction + Input,即指令和输入),输出是指令对应的输出。指令部分的预测不会被用于 loss 的计算,只有输入部分的预测会被用于 loss 的计算。
  • 用一个例子来说明,假如有一条数据:“请计算:1 + 1 = 2”
    • 在预训练阶段
      • 模型 input 是 [“请”,“计算”,“:”,“1”,“+”,“1”,“=”,“2”]
      • 模型 label 是 [“计算”,“:”,“1”,“+”,“1”,“=”,“2”, “pad”],其中 pad 是占位符,表示这个位置不需要预测。
      • pad 之外,其他位置的预测都会被用于 loss 的计算。
    • Instruct-SFT 阶段
      • 模型 input 是 [“请”,“计算”,“:”,“1”,“+”,“1”,“=”]
      • 模型 label 是 [“pad”, “pad”, “pad”, “pad”, “pad”, “pad”, “2”]
      • 只有最后一个位置的预测会被用于 loss 的计算。
  • 从实际推理效果看:
    • SFT 前,模型的输出重复性较高,指令遵循能力很差。
    • SFT 后,模型的输出重复性降低,指令遵循能力明显提升,但可能产生格式依赖问题。

Alpaca-cleaned 数据集

  • Alpaca-cleaned 是一个指令微调数据集,数据集地址:Alpaca-cleaned
  • 数据集包含 51,760 条数据,每条数据包含 instructioninputoutput 三个字段。
  • 其中 instruction 是指令,input 是输入,output 是输出,input 字段可能为空。
  • 例如:
    • instruction 是 “请计算如下数学题:”
    • input 是 “1 + 1 =”
    • output 是 “2”

Llama-3.2-3B

  • Llama-3.2-3Bmeta 开源的预训练大模型(未经过 SFTRLHF),模型地址:Llama-3.2-3B
  • 词表大小 128,256,支持英语、德语、法语、意大利语、葡萄牙语、印地语、西班牙语和泰语等多种语言(就是不支持中文,好烦)。
  • 模型有 32Transformer,每个 Transformer32Head,使用了 GQA 降低了 Transformer 的计算复杂度,hidden size4096

指令微调

  • 指令微调的流程如下:
    1. 加载预训练模型和 tokenizer
    2. 加载数据集并进行预处理
    3. 分布式训练
  • 具体实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import torch
import torch.distributed as dist
from datasets import load_dataset
from datasets import Split
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.utils.data import IterableDataset, DataLoader
import deepspeed
from torch.utils.tensorboard import SummaryWriter
# 初始化 DeepSpeed
deepspeed.init_distributed()
# 设置环境变量以避免 tokenizers 并行化问题
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# 设置分布式训练所需的环境变量
local_rank = int(os.getenv("LOCAL_RANK", "0"))
world_size = int(os.getenv("WORLD_SIZE", "1"))
os.environ["RANK"] = str(local_rank)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "23333"
# NCCL 超时参数配置
os.environ["NCCL_BLOCKING_WAIT"] = "1"
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
os.environ["NCCL_DEBUG"] = "INFO"
os.environ["NCCL_TIMEOUT"] = "600"
# 初始化进程组
if not dist.is_initialized():
dist.init_process_group(backend="nccl")
# config
model_path = "./models/meta-llama/Llama-3.2-3B"
dataset_path = "./dataset/LLM/yahma___alpaca-cleaned"
ds_config = "./ds_config.json"
log_dir = "./logs"
max_length = 1024
batch_size = 8
train_epochs = 3
# Load the local alpaca dataset
dataset_stream = load_dataset(dataset_path, split=Split.TRAIN, streaming=True)
# tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path, token=True)
tokenizer.pad_token = tokenizer.eos_token # llama-3.2 tokenizer padding token not set
# model
model = AutoModelForCausalLM.from_pretrained(model_path, token=True)
model.to(f"cuda:{local_rank}") # 将模型移动到对应的 GPU
# dataset
class StreamDataset(IterableDataset):
def __init__(self, data_stream, tokenizer, max_length=max_length):
self.data_stream = data_stream
self.tokenizer = tokenizer
self.max_length = max_length
def __iter__(self):
for data in self.data_stream:
instruction = data["instruction"]
input_text = data["input"]
output = data["output"]
# 构造 Prompt
if input_text.strip():
prompt = f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n"
else:
prompt = f"### Instruction:\n{instruction}\n\n### Response:\n"
# 拼接完整输入序列:prompt + output
full_text = prompt + output
# Tokenize
input_ids = tokenizer(
full_text,
truncation=True,
max_length=self.max_length,
padding="max_length",
)["input_ids"]
# 构造 labels: 仅监督 output 部分
# 找到 output 起始位置
prompt_length = len(
tokenizer(prompt, padding="do_not_pad", truncation=False)["input_ids"]
)
labels = [-100] * prompt_length + input_ids[prompt_length:] # 忽略 prompt 的部分
labels = labels[: self.max_length] # 截断
labels = labels + [-100] * (max_length - len(labels)) # Padding 保持一致长度
yield {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"labels": torch.tensor(labels, dtype=torch.long),
}
def __len__(self):
return 51760 # cleaned alpaca dataset rows
def train():
# DataLoader
train_dataset = StreamDataset(dataset_stream, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
# TensorBoard SummaryWriter
writer = SummaryWriter(log_dir=log_dir) if dist.get_rank() == 0 else None
# DeepSpeed 初始化
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config=ds_config,
training_data=train_dataset,
)
# 训练循环
for epoch in range(train_epochs): # num_train_epochs
for step, batch in enumerate(train_dataloader):
inputs = {
key: value.to(f"cuda:{local_rank}") for key, value in batch.items()
}
outputs = model_engine(**inputs) # 添加 labels 参数
loss = outputs.loss
model_engine.backward(loss)
model_engine.step()
# 日志打印和 TensorBoard 记录
if dist.get_rank() == 0:
if step % 10 == 0:
print(
f"Epoch: {epoch}, Step: {step}/{len(train_dataloader)}, Loss: {loss.item()}"
)
writer.add_scalar(
"Loss/train", loss.item(), epoch * len(train_dataloader) + step
)
checkpoint_path = f"./checkpoints"
model_engine.save_checkpoint(checkpoint_path, tag=f"epoch_{epoch}")
print(f"Checkpoint saved at {checkpoint_path}")
# 关闭 TensorBoard SummaryWriter
if writer:
writer.close()
if __name__ == "__main__":
train()
  • 总体上使用了:
    • transformers 加载模型和 tokenizer
    • datasets 加载开源数据集。
    • deepspeed 进行分布式训练,包含大规模分布式训练和零冗余优化以及混合精度训练等。

模型格式转换

  • 使用 deepspeed 训练和保存的模型格式是 deepspeed 特有的,无法直接加载到 transformers 中,因此需要进行模型格式转换。
  • deepspeed 保存的模型结构如下:
1
2
3
4
5
6
7
8
checkpoints/
├── epoch_xxx
│ ├── mp_rank_00_model_states.pt
│ ├── zero_pp_rank_0_mp_rank_00_optim_states.pt
│ ├── zero_pp_rank_1_mp_rank_00_optim_states.pt
│ ├── zero_pp_rank_2_mp_rank_00_optim_states.pt
│ └── zero_pp_rank_3_mp_rank_00_optim_states.pt
└── latest
  • 转换过程分成两步:
    1. 使用 deepspeed 在保存模型同时提供的 zero_to_fp32.py 脚本将模型转换为 fp32 格式(主要是将不同的 partfp16 模型参数用 ring reduce 聚合成完整的并转成 fp32),转换后的模型格式是 pytorch 适用的(zero_to_fp32.py 只会转换 latest 指向的 epoch)。
    2. pytorch 格式的模型转成 transformers 格式。
  • 第一步代码 deepseed 提供了,第二步代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
from transformers import AutoModelForCausalLM, AutoTokenizer
final_model_save_path = "./checkpoints/final_transformers_model" # 保存最终模型的路径
original_model_path = "./models/meta-llama/Llama-3.2-3B" # 原始模型的路径
sharded_model_dir = "./checkpoints/fp32_checkpoints" # SFT fp32 分片模型的路径
# 1. 将原始模型配置复制到最终模型目录,需要让 transformers 自动识别模型
os.system(
f"cp {os.path.join(original_model_path, 'config.json')} {final_model_save_path}"
)
# 2. 将分片模型转换为 transformers 模型
model = AutoModelForCausalLM.from_pretrained(sharded_model_dir)
model.save_pretrained(final_model_save_path)
# 3. 补全 tokenizer 配置
tokenizer = AutoTokenizer.from_pretrained(original_model_path, token=True)
tokenizer.save_pretrained(final_model_save_path)
  • 转换后的模型变成了 transformers 可以识别的格式,可以直接加载到 transformers 中进行推理。

推理

  • 将转换后的模型加载到 transformers 中,即可使用 pipeline 进行推理。
  • 推理代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
from transformers import pipeline
model_id = "./checkpoints/final_transformers_model"
pipe = pipeline(
"text-generation", model=model_id, torch_dtype=torch.bfloat16, device_map="auto"
)
instruction = "Use Python code to implement."
input_text = "Find prime numbers within 100."
total_input = f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n"
print(
pipe(total_input, max_length=1024)[0][
"generated_text"
]
)

Thoughts

  • 大模型是个非常依赖工具的领域,transformersdatasetsdeepspeed 等工具的使用对于大模型的最终效果非常重要。
  • 大模型即使在微调阶段,能做的也不多,什么都不能改,数据集的质量对于模型的影响非常大。
  • 指令微调前后,模型差距不是一星半点。

URL

TL;DR

  • Actor-Critic 算法是一种结合了策略梯度和值函数的方法,Actor 负责输出策略,Critic 负责输出状态价值函数。和 REINFORCE 算法相比,Actor-Critic 算法多引入了一个 Critic 网络,用于评估 Actor 的策略。
  • A2C 是对 Actor-Critic 算法的改进,引入了 Advantage 项来评估当前状态的优势,优化了 Actor 的损失计算。
  • A3C 是在 A2C 的基础上引入了多个并行的 Actor-Critic 网络,用于加速训练。
  • PPO 也是 Actor-Critic 架构下的一种算法,引入了 Clipped Surrogate ObjectiveGeneralized Advantage Estimation (GAE),用于提高策略梯度的稳定性和效率。

Algorithms

1. Actor-Critic Algorithms

公式表述

  • 这是一篇 1999 年的强化学习论文,是对于 1992REINFORCE 算法论文 Simple Statistical Gradient-Following Algorithms for Connectionist Reinforcement Learning 的扩展。
  • REINFORCE 算法是一种纯粹的策略梯度算法,也就是只有 Actor 网络,没有 Critic 网络。
  • Actor-Critic 算法在 REINFORCE 算法的基础上引入了一个 Critic 网络,用于评估当前状态的价值(或当前状态-空间的价值)。
  • Actor 的输入是 state,输出是 action 的分布均值和标准差,在均值和标准差组成的正态分布中采样得到 action
  • Critic 的输入是 statestate-action,输出是当前状态的价值,即 V(s)Q(s, a)
  • Actor 的损失:

Loss=t=1Tlogπθ(atst)G(st,at)Loss = -\sum_{t=1}^T \log \pi_{\theta}(a_t|s_t) \cdot G(s_t,a_t)

  • 其中,G(st,at)G(s_t,a_t) 是当前状态的实际累积回报
  • Critic 的损失:

Loss=t=1TMSE(G(st,at)Vθπ(st))Loss=\sum_{t=1}^T MSE(G(s_t,a_t)- V_{\theta}^{\pi}(s_t))

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class ActorCriticNetwork(nn.Module):
"""Actor-Critic 网络,包含策略网络和价值网络。"""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""初始化 Actor-Critic 网络。
参数:
obs_space_dims: 观测空间维度
action_space_dims: 动作空间维度
"""
super().__init__()
hidden_space1 = 16
hidden_space2 = 32
# 共享网络
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# 策略网络输出动作的均值和标准差
self.policy_mean_net = nn.Linear(hidden_space2, action_space_dims)
self.policy_stddev_net = nn.Linear(hidden_space2, action_space_dims)
# 价值网络输出状态价值估计
self.value_net = nn.Linear(hidden_space2, 1)
def forward(self, x: torch.Tensor):
"""前向传播,输出动作参数和状态价值。
参数:
x: 环境的观测值
返回:
action_means: 动作均值
action_stddevs: 动作标准差
state_values: 状态价值估计
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
state_values = self.value_net(shared_features)
return action_means, action_stddevs, state_values
class ActorCriticAgent:
"""Actor-Critic 算法的实现。"""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""初始化 Agent。
参数:
obs_space_dims: 观测空间维度
action_space_dims: 动作空间维度
"""
self.learning_rate = 1e-4
self.gamma = 0.99
self.eps = 1e-6
self.log_probs = []
self.state_values = []
self.rewards = []
self.net = ActorCriticNetwork(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray):
"""根据策略采样动作。
参数:
state: 环境的观测值
返回:
action: 采样的动作
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs, state_value = self.net(state)
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
log_prob = distrib.log_prob(action)
action = action.numpy()
self.log_probs.append(log_prob)
self.state_values.append(state_value)
return action
def update(self):
"""更新策略网络和价值网络。"""
# 将列表转换为张量
log_probs = torch.stack(self.log_probs)
state_values = torch.cat(self.state_values).squeeze()
rewards = self.rewards
# 计算折扣回报
returns = []
G = 0
for R in rewards[::-1]:
G = R + self.gamma * G
returns.insert(0, G)
returns = torch.tensor(returns)
# 计算损失
policy_loss = -(log_probs * returns.detach()).sum()
value_loss = nn.functional.mse_loss(state_values, returns)
loss = policy_loss + value_loss
# 更新网络参数
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 清空存储的值
self.log_probs = []
self.state_values = []
self.rewards = []
  • 实现上和 REINFORCE 算法的代码相比,有以下几点不同:
    • 引入了 Critic 网络,用于评估当前状态的价值(可以理解为训练一个可微的环境仿真器)。
    • 损失函数中包含了 Critic 的损失,用于更新 Critic 网络。

2. Advantage Actor-Critic (A2C)

公式表述

  • A2C 的全称是 Advantage Actor-Critic,是对于 Actor-Critic 算法的改进,引入了 Advantage 项,用于评估当前状态的优势。
  • 具体来说:在计算 Actor 的损失时,使用的是 Advantage 项,而不是直接使用回报,即:

Loss=t=1Tlogπθ(atst)A(st,at)A(st,at)=G(st,at)V(st)Loss = -\sum_{t=1}^T \log \pi_{\theta}(a_t|s_t) \cdot A(s_t,a_t)\\A(s_t,a_t)=G(s_t,a_t) - V(s_t)

  • 其中,A(st,at)A(s_t,a_t) 是当前状态的优势,G(st,at)G(s_t,a_t) 是当前状态的实际回报,V(st)V(s_t) 是当前状态的价值估计。
  • 其他部分和 Actor-Critic 算法基本一致。

代码表述

1
2
3
4
5
# Actor-Critic 算法的实现
actor_loss = -(log_probs * returns.detach()).sum()
# Advantage Actor-Critic 算法的实现
advantages = returns - state_values
actor_loss = -(log_probs * advantages.detach()).sum()

3. Asynchronous Advantage Actor-Critic (A3C)

  • A3C 是在 A2C 的基础上引入了多个并行的 Actor-Critic 网络,用于加速训练。
  • 由于强化学习的数据来自于和环境交互,因此如果可以让多个 worker 并行地和环境交互,然后将数据汇总到一个 learner 网络中,就可以加速训练。A3C 就是这样一种算法。
  • 更偏工程化一些,主要涉及到多线程/多进程的并行计算,以及梯度的更新和参数同步等。

4. Proximal Policy Optimization (PPO)

公式表述

  • PPO 是一种 Actor-Critic 算法,核心思想是:在更新策略参数时,限制新策略和旧策略之间的差异,以此来保证策略更新的稳定性
  • PPO 相较于 A2C
    1. 限制更新幅度:引入了 Clipped Surrogate Objective,限制新策略和旧策略之间的差异,即:

    LCLIP(θ)=Et[min(rt(θ)At,clip(rt(θ),1ϵ,1+ϵ)At)]rt(θ)=πθ(atst)πθold(atst)L^{CLIP}(\theta) = \mathbb{E}_t[\min(r_t(\theta) \cdot A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) \cdot A_t)]\\r_t(\theta)=\frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}

    • 其中:
      • LCLIP(θ)L^{CLIP}(\theta)Clipped Surrogate Objective
      • rt(θ)r_t(\theta) 是新旧策略在状态 sts_t 采取动作 ata_t 的概率比
      • AtA_t 是优势函数
      • ϵ\epsilon 是区间宽度超参数。
    1. 优势函数估计:引入了广义优势估计 Generalized Advantage Estimation (GAE),用于提高策略梯度估计的准确性和平滑性。

    At=l=0(γλ)lδt+lδt=rt+γVθ(st+1)Vθ(st)A_t = \sum_{l=0}^{\infty}(\gamma \lambda)^l \delta_{t+l}\\\delta_t=r_t+\gamma V_\theta(s_{t+1}) - V_\theta(s_t)

    • 其中:
      • AtA_t 是广义优势估计
      • δt\delta_t 是时序差分 (TD Error)
      • λ\lambda 是折扣因子
      • γ\gamma 是优势函数的折扣因子
      • Vθ(st)V_\theta(s_t) 是状态价值估计
    1. 多次采样和小批量更新:引入了记忆、多次采样和小批量更新,用于提高数据的利用效率,即:存储一个或多个 episode 所有轨迹,然后每次更新时从中随机抽取数据分 batch 训练
    2. 一阶优化算法:使用一阶优化算法,如 Adam / SGD 等,而不是 TRPO 等算法用到的二阶优化算法,训练复杂度低效率高。
    3. 联合优化:同时优化策略和价值网络,即:

    L(θ)=LCLIP(θ)+c1E[(Vθ(st)Vttarget)2]+c2H(πθ)L(\theta)= L^{CLIP}(\theta) + c_1 \cdot \mathbb E[(V_{\theta}(s_t) - V_t^{target})^2] + c_2 \cdot H(\pi_\theta)

    • 其中:
      • LCLIP(θ)L^{CLIP}(\theta)Clipped Surrogate Objective,用于优化策略
      • 第二项是价值网络的损失,用于优化价值网络
      • 第三项是熵正则项,用于提高策略的探索性

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class PPOActorCriticNetwork(nn.Module):
""" PPO Actor-Critic 网络,包含策略网络和价值网络。 """
def __init__(self, obs_space_dims: int, action_space_dims: int):
super().__init__()
hidden_space1 = 16
hidden_space2 = 32
# 共享网络
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# 策略网络输出动作的均值和标准差
self.policy_mean_net = nn.Linear(hidden_space2, action_space_dims)
self.policy_stddev_net = nn.Linear(hidden_space2, action_space_dims)
# 价值网络输出状态价值估计
self.value_net = nn.Linear(hidden_space2, 1)
def forward(self, x: torch.Tensor):
"""前向传播,输出动作参数和状态价值。"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.nn.functional.softplus(
self.policy_stddev_net(shared_features)
)
action_stddevs = torch.clamp(action_stddevs, min=1e-3) # 标准差裁剪
state_values = self.value_net(shared_features)
return action_means, action_stddevs, state_values
class PPOAgent:
""" PPO 算法的实现。 """
def __init__(self, obs_space_dims: int, action_space_dims: int):
self.learning_rate = 1e-5
self.gamma = 0.99
self.lam = 0.95
self.eps_clip = 0.1
self.entropy_coeff = 0.01
self.epochs = 10
self.batch_size = 64
self.net = PPOActorCriticNetwork(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=self.learning_rate)
self.memory = []
def sample_action(self, state: np.ndarray):
"""根据策略采样动作。"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs, state_value = self.net(state)
distrib = Normal(action_means[0], action_stddevs[0] + 1e-3)
action = distrib.sample()
action = torch.clamp(action, -3.0, 3.0)
log_prob = distrib.log_prob(action).sum()
return action.numpy(), log_prob, state_value
def store_transition(self, transition):
"""存储轨迹数据。"""
self.memory.append(transition)
def process_batch(self):
"""处理并计算优势函数和目标回报值。"""
states, actions, log_probs, rewards, dones, state_values = zip(*self.memory)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.float32)
log_probs = torch.tensor(log_probs, dtype=torch.float32)
state_values = torch.tensor(state_values, dtype=torch.float32).squeeze()
rewards = torch.tensor(rewards, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.float32)
# 在 state_values 末尾添加一个 0,以表示最后一个状态的下一个状态价值
next_state_values = torch.cat([state_values[1:], torch.tensor([0.0])]) * (
1 - dones
)
# 计算 delta
deltas = rewards + self.gamma * next_state_values * (1 - dones) - state_values
# 计算优势函数
advantages = torch.zeros_like(rewards)
running_adv = 0
for t in reversed(range(len(rewards))):
running_adv = deltas[t] + self.gamma * self.lam * running_adv * (
1 - dones[t]
)
advantages[t] = running_adv
# 计算目标回报值
returns = advantages + state_values
# 标准化优势函数
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 清空记忆
self.memory = []
return states, actions, log_probs, returns, advantages
def update(self):
"""更新策略和价值网络。"""
states, actions, old_log_probs, returns, advantages = self.process_batch()
dataset = torch.utils.data.TensorDataset(
states, actions, old_log_probs, returns, advantages
)
loader = torch.utils.data.DataLoader(
dataset, batch_size=self.batch_size, shuffle=True
)
for _ in range(self.epochs):
for (
batch_states,
batch_actions,
batch_old_log_probs,
batch_returns,
batch_advantages,
) in loader:
action_means, action_stddevs, state_values = self.net(batch_states)
distrib = Normal(action_means, action_stddevs + 1e-3)
log_probs = distrib.log_prob(batch_actions).sum(axis=-1)
entropy = distrib.entropy().sum(axis=-1)
# 计算 PPO 损失
# 1. 计算裁剪的策略损失
ratios = torch.exp(log_probs - batch_old_log_probs)
surr1 = ratios * batch_advantages
surr2 = (
torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip)
* batch_advantages
)
policy_loss = -torch.min(surr1, surr2).mean()
# 2. 计算价值损失
value_loss = nn.functional.mse_loss(
state_values.squeeze(), batch_returns
)
# 3. 计算熵损失
entropy_loss = -entropy.mean()
# 4. 加权得到总损失
loss = (
policy_loss + 0.5 * value_loss + self.entropy_coeff * entropy_loss
)
# 更新网络参数
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.net.parameters(), max_norm=0.1)
self.optimizer.step()

Thoughts

  • Actor-Critic 算法是一种结合了策略梯度和值函数的方法,多用于连续动作空间的强化学习任务。
  • 结合多种改进算法,Actor-Critic 算法占据了强化学习领域的重要地位。

URL

TL;DR

  • Double DQN: 解决了 DQN 中的过估计问题,使用目标网络和当前网络来共同计算 Q 值,从而减少 Q 值的偏差。
  • Dueling DQN: 提出了对偶网络,将 Q 函数分为价值函数和优势函数,这样可以更好地估计状态的价值,减少动作选择的随机性。
  • Prioritized Experience Replay: 改进了经验回放机制,通过给经验赋予优先级,使得学习过程更加高效。
  • Noisy DQN: 在网络中引入噪声以探索更加多样的策略,提高探索效率。
  • C51: 传统基于 Q 值的学习方式只关注期望值不关心分布,Distributional RL 引入了基于分布的强化学习的概念。
  • Rainbow: 将上述五种方法结合起来,另外还加入了 Multi-step Learning(多步学习),六合一行成了一个强大的 DQN 变体。

Algorithm

1. Double DQN

公式表述

  • DQN 中计算目标 Q 值时,使用的是目标网络的最大 Q 值,即 yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-) ,这样会导致过估计问题
  • Double DQN 对其改进非常简单,使用 Q 值网络来选择动作,使用目标网络来评估该动作的 Q 值,即 yt=rt+1+γQ(st+1,argmaxaQ(st+1,a;θ);θ)y_t=r_{t+1}+\gamma Q(s_{t+1},\arg\max_{a'}Q(s_{t+1},a';\theta);\theta^-)

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
# DQN
q_values = self.q_network(states)
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.max(dim=1)[0] # 直接使用目标网络的最大 Q 值
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values
# Double DQN
q_values = self.q_network(states)
next_q_values = self.q_network(next_states)
next_actions = torch.argmax(next_q_values, dim=1) # 使用 Q 值网络选择动作
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(1) # 使用目标网络评估该动作的 Q 值
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values

2. Dueling DQN

公式表述

  • 状态价值函数和状态动作价值函数的定义:请参考 这里
  • 动作优势函数的定义:A(s,a)A(s,a) 表示在状态 ss 下选择动作 aa相对优势
  • DQN / Double DQN 中的 Q 函数是直接输出动作的 Q 值,这样会导致动作选择的随机性
  • Dueling DQN 提出了对偶网络,Q 函数分为价值函数和优势函数,即 Q(s,a)=V(s)+A(s,a)Q(s,a)=V(s)+A(s,a) ,这样可以更好地估计状态的价值
  • 原理也很简单:价值函数表示状态的价值,优势函数表示此状态下动作的相对优势,最后再将二者相加得到 Q
  • 实际使用时,为了保证优势函数的相对性,需要对优势函数进行零均值化处理,即 Q(s,a)=V(s)+A(s,a)1AaA(s,a)Q(s,a)=V(s)+A(s,a)-\frac{1}{|A|}\sum_{a'}A(s,a')

代码表述

1
2
3
4
5
6
# DQN
q_values = self.q_network(states)
# Dueling DQN
value = self.value_network(states)
advantage = self.advantage_network(states)
q_values = value + advantage - advantage.mean(dim=1, keepdim=True)
  • 其他部分和 Double DQN 完全一样
  • 深度学习领域有个说法:如果一个变量可以被拆成多个变量的组合,那么理论上会更好(参数量和计算量都高了,自然是好事)

3. Prioritized Experience Replay

公式表述

  • DQN 中的经验回放机制是随机采样的或顺序采样的,这样会导致一些重要的经验被忽略
  • Prioritized Experience Replay 改进了经验回放机制,通过给经验赋予优先级,使得学习过程更加高效
  • 具体做法是:根据 TD 误差(时序差分误差 Temporal Difference Error)的大小来给经验赋予优先级,然后按照优先级采样
  • 时序差分误差的定义:当前状态下预测值和实际回报之间的差异,即 TD=Rt+1+γV(St+1)V(St)TD=R_{t+1}+\gamma V(S_{t+1})-V(S_t)
    • Rt+1R_{t+1} 表示在 t+1t+1 时刻的奖励
    • V(St+1)V(S_{t+1}) 表示在 t+1t+1 时刻的状态价值
    • V(St)V(S_t) 表示在 tt 时刻的状态价值(即对状态 StS_t 未来回报的预期)
    • γ\gamma 表示折扣因子

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class SumTree:
"""
SumTree data structure to implement Prioritized Experience Replay
"""
def __init__(self, capacity):
self.capacity = capacity
self.size = 0
self.tree = np.zeros(2 * capacity - 1)
self.data = [None] * capacity
def add(self, priority, data):
# 更新 data 节点的值
self.data[self.size % self.capacity] = data
# 更新 tree 节点的值,同时更新父节点
idx = self.size % self.capacity + self.capacity - 1
change = priority - self.tree[idx]
self.tree[idx] = priority
while idx != 0:
idx = (idx - 1) // 2
self.tree[idx] += change
self.size += 1
def get(self, priority_sum):
idx = 0
while idx < self.capacity - 1:
left = 2 * idx + 1
right = left + 1
if priority_sum <= self.tree[left]:
idx = left
else:
priority_sum -= self.tree[left]
idx = right
data_idx = idx - self.capacity + 1
return self.tree[idx], self.data[data_idx]
def total_priority(self):
return self.tree[0]
class PERAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
alpha: float = 0.6, # 权重参数,决定优先级的影响
beta: float = 0.4, # 用于重要性采样的权重,随着训练增加
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.alpha = alpha
self.beta = beta
# 初始化 SumTree 作为经验池
self.memory = SumTree(memory_size)
# Q 网络和目标网络
self.q_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network.load_state_dict(self.q_network.state_dict()) # 初始化目标网络
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: Tuple[int, int, bool]) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return self.env.action_space.sample() # 探索:随机选择动作
else:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
return torch.argmax(q_values).item() # 利用:选择 Q 值最大的动作
def update(self):
"""从优先经验回放池中抽取一个批次的经验,进行 Q 网络的更新"""
if self.memory.size < self.batch_size:
return # 如果经验回放池中的样本不足一个批次,则不进行更新
batch_indices = np.random.uniform(
0, self.memory.total_priority(), size=self.batch_size
)
# 批量从 SumTree 中获取经验
batch = []
for priority in batch_indices:
_, data = self.memory.get(priority) # 获取对应的经验样本
batch.append(data)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.int64)
# 计算 Q 值:使用当前网络和目标网络
q_values = self.q_network(states)
next_q_values = self.q_network(next_states)
next_actions = torch.argmax(next_q_values, dim=1)
# 计算 next_states 对应的目标 Q 值
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(
1
)
# 计算目标 Q 值
target_q_values = (
rewards + (1 - dones) * self.discount_factor * max_next_q_values
)
# 计算当前 Q 值和目标 Q 值的损失
q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
# 计算重要性采样权重
weights = torch.tensor(
[(1.0 / self.memory.size) ** self.beta for _ in range(self.batch_size)]
)
loss = (weights * (q_value - target_q_values) ** 2).mean()
# 优化 Q 网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def store_experience(self, obs, action, reward, next_obs, done):
"""计算 TD 误差并存储到优先经验回放池"""
obs_tensor = torch.tensor([obs], dtype=torch.float32)
next_obs_tensor = torch.tensor([next_obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
next_q_values = self.q_network(next_obs_tensor)
next_actions = torch.argmax(next_q_values, dim=1)
# 计算 TD 误差
target_q_values = (
self.target_network(next_obs_tensor)
.gather(1, next_actions.unsqueeze(1))
.squeeze(1)
)
q_value = q_values.gather(
1, torch.tensor([action], dtype=torch.int64).unsqueeze(1)
).squeeze(1)
td_error = abs(target_q_values - q_value).item()
# 将经验存储到回放池,TD 误差用作优先级
self.memory.add(td_error + 1e-6, (obs, action, reward, next_obs, done))
def decay_epsilon(self):
"""逐渐减少 epsilon"""
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def update_target_network(self):
"""每隔一段时间更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
  • 代码中的 SumTree 类是一个优先级经验回放池,用于存储经验和计算优先级
  • 实现上,SumTree 由两部分组成:
    • tree:用于存储优先级,是一个完全二叉树
      • 叶子节点存储优先级,长度为 Capacity
      • 非叶子节点存储子节点的优先级之和,长度为 Capacity - 1
      • 总长度为 2 * Capacity - 1,根节点存储所有优先级之和
      • 叶子节点之间无需排序,这就是 SumTree 的优势,可以用 O(logN) 的时间复杂度实现优先级采样
    • data:用于存储经验,是一个列表
      • 长度为 Capacity,存储 tree 中叶子节点对应的经验,和 tree 中优先级一一对应,同步更新
  • SumTree 的作用:输入 [0, total_priority] 范围的均匀分布,输出非均匀的优先级采样结果
  • 在取数据时,只需要在 [0, total_priority] 范围内随机取样,然后 SumTree 的返回结果就是已经按照优先级采样的数据

4. Noisy DQN

公式表述

  • 传统 Q Learning / DQN 都采用 epsilon-greedy,即以 ϵ\epsilon 的概率随机选择动作,以 1ϵ1-\epsilon 的概率选择 Q 值最大的动作
  • 但是,这种方法有一个问题:随机性不够,可能会导致陷入局部最优解;随机性太大,会浪费太多时间在无效搜索上
  • Noisy DQN 引入了噪声网络的概念,即在网络中引入噪声以探索更加多样的策略,提高探索效率
  • 具体做法是:训练阶段在网络的全连接层中引入高斯噪声,即:

y=wx+b   =>   y=(μw+σwϵw)x+μb+σbϵby=wx+b\ \ \ =>\ \ \ y=(\mu^w+\sigma^w\odot \epsilon^w)x+\mu^b+\sigma^b\odot \epsilon^b

  • 其中,μw,μb,σw,σb\mu_w,\mu_b,\sigma_w,\sigma_b 是可训练参数,ϵw,ϵb\epsilon^w,\epsilon^b 是动态高斯噪声
  • 对于噪声的引入,有两种方式:
    • Independent Gaussian Noise(独立高斯噪声):每个权重和偏置都有都独立同分布
      • ϵwRfan_in×fan_outN(0,1)\epsilon^w\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal N(0,1)
      • ϵbRfan_outN(0,1)\epsilon^b\in\mathbb R^{fan\_out}\sim \mathcal N(0,1)
      • μinitwRfan_in×fan_outU(3fan_in,3fan_in)\mu^w_{init}\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal U(-\sqrt\frac{3}{fan\_in},\sqrt\frac{3}{fan\_in})
      • μinitbRfan_outU(3fan_in,3fan_in)\mu^b_{init}\in\mathbb R^{fan\_out}\sim \mathcal U(-\sqrt\frac{3}{fan\_in},\sqrt\frac{3}{fan\_in})
      • σinitw[0.017]fan_in×fan_out\sigma^w_{init}\in [0.017]^{fan\_in\times fan\_out}
      • σinitb[0.017]fan_out\sigma^b_{init}\in [0.017]^{fan\_out}
    • Factorized Gaussian Noise (分解高斯噪声):将 ϵw\epsilon_w 分解为两个矩阵的乘积,以减少参数量
      • ϵinRfan_inN(0,1)\epsilon^{in}\in\mathbb R^{fan\_in}\sim \mathcal N(0,1)
      • ϵoutRfan_outN(0,1)\epsilon^{out}\in\mathbb R^{fan\_out}\sim \mathcal N(0,1)
      • ϵw=ϵinϵout\epsilon^w=\epsilon^{in}\otimes\epsilon^{out}
      • ϵb=ϵout\epsilon^b=\epsilon^{out}
      • μinitwRfan_in×fan_outU(1fan_in,1fan_in)\mu^w_{init}\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal U(-\frac{1}{\sqrt {fan\_in}},\frac{1}{\sqrt{fan\_in}})
      • μinitbRfan_outU(1fan_in,1fan_in)\mu^b_{init}\in\mathbb R^{fan\_out}\sim \mathcal U(-\frac{1}{\sqrt {fan\_in}},\frac{1}{\sqrt{fan\_in}})
      • σinitw[0.5fan_in]fan_in×fan_out\sigma^w_{init}\in[\frac{0.5}{\sqrt{fan\_in}}]^{fan\_in\times fan\_out}
      • σinitb[0.5fan_in]fan_out\sigma^b_{init}\in[\frac{0.5}{\sqrt{fan\_in}}]^{fan\_out}
  • inference 阶段,直接使用 μw\mu^wμb\mu^b 进行计算,不使用 ϵ\epsilonσ\sigma

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import torch
import torch.nn as nn
class NoisyLinear(nn.Module):
def __init__(self, in_features, out_features, sigma_init=0.017):
super(NoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
# 可训练参数
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.sigma_init = sigma_init
self.reset_parameters()
def reset_parameters(self):
mu_range = 1 / torch.sqrt(torch.tensor(self.in_features, dtype=torch.float32))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_init)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_init)
def forward(self, input):
if self.training:
# 动态生成噪声 epsilon
weight_epsilon = torch.randn(self.out_features, self.in_features).to(
input.device
)
bias_epsilon = torch.randn(self.out_features).to(input.device)
weight = self.weight_mu + self.weight_sigma * weight_epsilon
bias = self.bias_mu + self.bias_sigma * bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(input, weight, bias)
class FactorisedNoisyLinear(nn.Module):
def __init__(self, in_features, out_features, sigma_zero=0.5):
super(FactorisedNoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.sigma_zero = sigma_zero / torch.sqrt(
torch.tensor(self.in_features, dtype=torch.float32)
)
# 可训练参数
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.reset_parameters()
def reset_parameters(self):
mu_range = 1 / torch.sqrt(torch.tensor(self.in_features, dtype=torch.float32))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_zero)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_zero)
def _scale_noise(self, size):
x = torch.randn(size).to(self.weight_mu.device)
return x.sign() * x.abs().sqrt()
def forward(self, input):
if self.training:
# 动态生成噪声 epsilon
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
weight_epsilon = torch.ger(epsilon_out, epsilon_in)
bias_epsilon = epsilon_out
weight = self.weight_mu + self.weight_sigma * weight_epsilon
bias = self.bias_mu + self.bias_sigma * bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(input, weight, bias)

5. C51

公式表述

  • C51 (Categorical 51 atoms) 直译是:离散 51 个原子,是 Distributional RL 的一种算法,即基于分布的强化学习。
  • 传统基于 Q 值的学习方式只关注期望值(Q 值,即未来回报的期望)不关心分布(未来回报的概率分布),C51 改进了这一点
  • C51 提出 概率密度函数 Z(s, a),即在状态 s 下选择动作 a未来回报分布
  • 同时提出了基于分布的 Bellman 方程,即:

Z(s,a)=R(s,a)+γZ(s,a)Z(s,a)=R(s,a)+\gamma Z(s',a')

  • 其中:
    • Z(s,a)Z(s,a) 是在状态 s 下选择动作 a 的未来回报分布
    • R(s,a)R(s,a) 是在状态 s 下选择动作 a 的即时回报
    • γ\gamma 是折扣因子
    • Z(s,a)Z(s',a') 是下一个状态 s' 下选择动作 a' 的未来回报分布
  • Q 函数和 Z 函数的关系:Q(s,a)=zZ(s,a)zQ(s,a)=\sum_{z\in Z(s,a)}z
  • 由于连续分布不方便计算,C51 使用离散分布来近似 Z 函数,离散分布由 N 个均匀分布的原子组成,每个原子表示一个未来的回报值和其概率,原子被定义为:

zi=vmin+iΔzΔz=VmaxVminN1z_i=v_{min}+i\Delta z\\\Delta z=\frac{V_{max}-V_{min}}{N-1}

  • 其中,vminv_{min}vmaxv_{max} 分别是未来回报的最小值和最大值,Δz\Delta z 是原子之间的间隔
  • 同时,扩展 Bellman 方程到离散分布:

T^zi=r+γzi\hat{T}z_i=r+\gamma z_i

  • 离散分布的更新需要用到投影,也就是将更新后的分布重新映射到原子上

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class C51(nn.Module):
def __init__(self, input_dim, output_dim, n_atoms, V_min, V_max):
super(C51, self).__init__()
self.n_atoms = n_atoms
self.output_dim = output_dim
self.V_min = V_min
self.V_max = V_max
self.delta_z = (V_max - V_min) / (n_atoms - 1)
self.input_dim = input_dim
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_dim * n_atoms)
def forward(self, x):
x = torch.nn.functional.one_hot(
x.to(torch.int64), num_classes=self.input_dim
).float()
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
x = x.view(-1, self.output_dim, self.n_atoms)
# shape: (batch_size, action_space, n_atoms)
return torch.softmax(x, dim=-1)
class C51Agent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
n_atoms: int = 51,
V_min: float = -10.0,
V_max: float = 10.0,
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.n_atoms = n_atoms
self.V_min = V_min
self.V_max = V_max
self.delta_z = (V_max - V_min) / (n_atoms - 1)
self.z = torch.linspace(V_min, V_max, n_atoms).to(torch.float32)
self.memory = deque(maxlen=memory_size)
self.training_error = []
self.input_dim = env.observation_space.n
self.output_dim = env.action_space.n
self.q_network = C51(
input_dim=self.input_dim,
output_dim=self.output_dim,
n_atoms=n_atoms,
V_min=V_min,
V_max=V_max,
)
self.target_network = C51(
input_dim=self.input_dim,
output_dim=self.output_dim,
n_atoms=n_atoms,
V_min=V_min,
V_max=V_max,
)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: int) -> int:
if np.random.random() < self.epsilon:
return self.env.action_space.sample()
else:
obs_tensor = torch.tensor([obs], dtype=torch.int64)
dist = self.q_network(obs_tensor)
# 用离散概率密度函数计算Q值
q_values = torch.sum(dist * self.z, dim=2)
# 选择最大Q值对应的动作
action = torch.argmax(q_values, dim=1).item()
return action
def update(self):
if len(self.memory) < self.batch_size:
return
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.int64)
actions = torch.tensor(actions, dtype=torch.long)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.int64)
dones = torch.tensor(dones, dtype=torch.float32)
batch_size = states.size(0)
dist = self.q_network(states)
dist = dist[range(batch_size), actions]
with torch.no_grad():
next_dist = self.target_network(next_states)
next_q_values = torch.sum(next_dist * self.z, dim=2)
next_actions = torch.argmax(next_q_values, dim=1)
next_dist = next_dist[range(batch_size), next_actions]
# 离散化Bellman更新
Tz = rewards.unsqueeze(1) + (
1 - dones.unsqueeze(1)
) * self.discount_factor * self.z.unsqueeze(0)
Tz = Tz.clamp(self.V_min, self.V_max)
# 计算投影
b = (Tz - self.V_min) / self.delta_z
l = b.floor().long()
u = b.ceil().long()
l[(u > 0) * (l == u)] -= 1
u[(l < (self.n_atoms - 1)) * (l == u)] += 1
m = torch.zeros(batch_size, self.n_atoms)
offset = (
torch.linspace(0, ((batch_size - 1) * self.n_atoms), batch_size)
.unsqueeze(1)
.expand(batch_size, self.n_atoms)
.long()
)
m.view(-1).index_add_(
0, (l + offset).view(-1), (next_dist * (u.float() - b)).view(-1)
)
m.view(-1).index_add_(
0, (u + offset).view(-1), (next_dist * (b - l.float())).view(-1)
)
dist = dist + 1e-8
loss = -torch.sum(m * torch.log(dist), dim=1).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.training_error.append(loss.item())
def decay_epsilon(self):
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def store_experience(self, obs, action, reward, next_obs, done):
self.memory.append((obs, action, reward, next_obs, done))
def update_target_network(self):
self.target_network.load_state_dict(self.q_network.state_dict())
  • C51 算法对 DQN 的优化实际上和 Dueling DQN 有些类似,都是将 Q 函数分解成多个部分,让其计算量和参数量更大,但不对拆解后的部分做监督,最终将所有部分合并起来成为 Q 值,依据 Q 值来选择动作

6. Rainbow

1. Multi-step Learning

  • 出自论文:《Multi-Step Reinforcement Learning: A Unifying Algorithm》
  • DQN 中的目标 Q 值是单步的,即目标 Q 值为: yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-)
  • Multi-step Learning 引入了多步学习,即目标 Q 值为: yt=rt+1+γ2rt+2+γ3rt+3+...+γNrt+N+γN+1maxaQ(st+N+1,a;θ)y_t=r_{t+1}+\gamma^2r_{t+2}+\gamma^3r_{t+3}+...+\gamma^Nr_{t+N}+\gamma^{N+1}\max_{a'}Q(s_{t+N+1},a';\theta^-)
    • 其中 NN 为多步数
    • θ\theta^- 是目标网络的参数
  • 这样可以更好地估计长期回报,减少 Q 值的偏差

2. Rainbow

  • Rainbow = DQN + Double DQN + Dueling DQN + Prioritized Experience Replay + Noisy DQN + Distributional RL + Multi-step Learning

Thought

  • 这些算法都分别从不同的角度对 DQN 进行了改进,很多都是非常小的改进,容易理解
  • DQN 系列强化学习算法只能用于离散状态动作空间,绝大多数强化学习都是连续状态动作空间,所以 DQN 系列算法的应用场景有限