Zhangzhe's Blog

The projection of my life.

0%

URL

TL;DR

  • 本文是上手训练大模型系列的第二篇,透过 LlamaFactory 视角审视大模型微调全流程,包括:SFTRLHFDPO
  • LlamaFactory 是目前社区内最受欢迎的开源的大模型全流程微调工具,用低代码的方式支持 100+ 大模型微调任务。
  • LlamaFactory 集成了 PytorchtransformersTRLPEFT 等主流框架。

分层看待大模型微调

llamafactory_1.png

  • LlamaFactory 认为,大模型微调框架主要由三部分组成:
    • Model loader:负责操纵大模型完成微调任务,模型结构包括 Large Language ModelVision Language Model 等。
    • Data worker:通过精心设计的数据处理流程,处理不同任务的数据,并支持单轮、多轮对话。
    • Trainer:实施不同的训练/微调策略和方法。

Model loader

  • Model loader 主要分为五个部分:
    • 模型初始化:用 transformers 库加载预训练模型,具体来说:
      • AutoModelForVision2Seq 加载 VLM 模型
      • AutoModelForCausalLM 加载其他类型的模型
      • AutoTokenizer 加载 tokenizer
      • 对于词表大小超过 embedding 层尺寸的模型,需要对 embedding 层进行 resize,并填充噪声
      • 处理 RoPEscaling factor
    • 模型修补:适配不同代码结构,包括:
      • S2S^2 attention:用 Monkey Patching 方式实现
      • flash attention:用 transformers 库实现
      • MoE:为了防止 DeepSpeed ZeRO stage-3 过度分配动态层,将 MoE 层设置为叶子模块
    • 模型量化
      • LLM.int8() 动态量化到 4 / 8 bits 可以由 bitsandbytes 库实现
      • 对于 4bits 量化,用 QLoRA 中的 NF4 数据类型 + 两次量化
      • 支持由 GPTQ / AWQ / AQLM 等量化方法 PTQ 量化后的模型的微调,但只支持 Adapter-base 的微调而不是 weight 直接微调
    • 附加适配器
      • PEFT 库提供了非常方便的适配器添加方法,例如 LoRA / rsLoRA / DoRA / PiSSA 等,只需要替换后端为 Unsloth 库做训练加速即可
      • 为了做 RLHF,需要在模型 head 层添加一个 value head,将每一个 token 映射为一个标量值
    • 精度自适应:为不同的设备选择不同的精度类型,例如:
      • 对于 NVIDIA 设备,如果计算能力大于 8.0,可以选择 BF16 (Brain Float 16) 精度(这里的计算能力是 NVIDIA 标记 GPU 硬件功能的指标,A100 GPU 计算能力是 8H100 GPU 计算能力是 8.9),否则选择 FP16 精度
      • 对于 NPUAMD GPU 设备,可以选择 FP16 精度
      • 对于没有 CUDA 的设备,需选择 FP32 精度
      • 对于混合精度训练,强制所有可训练参数类型为 FP32,目的是为了训练的稳定性
      • 对于半精度训练,则设置所有可训练参数类型为 BF16 (Brain Float 16)

Data worker

  • Data worker 主要由四部分组成:
    • 数据加载:用 HuggingFace 开源的 Datasets 库加载数据集,这个库支持远程数据读取或本地数据读取,并采用 Arrow 格式存储数据,大幅降低数据的存储开销
    • 数据对齐:将不同数据集类型(例如:Plain textAlpaca-like dataShareGPT-like data)对齐到标准格式(LLAMAFACTORY 定义)
      llamafacotry_2.png
    • 数据合并:数据合并是将不同的数据集合并到一起,由于数据已经对齐,所以合并相对容易
      • 对于非流式数据,可以直接在 shuffle 之前合并
      • 对于流式数据,需要在不同数据集之间交替读取
    • 数据预处理
      • 目前大模型的主要应用方向是 Chat,因此 LLAMAFACTORY 提供了许多 Chat template,可自动化根据模型类型匹配对应的模板。
      • 默认只对生成内容监督,不监督 prompt (典型 SFT 模式)
      • 可选用 Squence packing 技术,将多组训练数据打包成一个 batch,提高训练效率

Trainer

  • 高效训练
    • 支持多种微调方法,包括:LoRA / LoRA+ / Galore / BAdam
    • 使用 transformers 库做 pre-trainSFT
    • 使用 TRL 库做 RLHFDPO,以及高级表现优化方法(例如:KTO / ORPO 等),并为这些算法提供了特殊的数据整理流程(通常需要 2n 个样本,其中 nchosen examplesnreject examples
  • 模型共享的 RLHF
    • 传统 RLHF 过程非常复杂,因为需要四个模型:
      • Policy model / Base model:用于生成 action,是待优化的模型
      • Reward model:用于评估 action 的好坏,由人类偏好数据训练,输入是策略模型的回答或一对 chosen / reject 回答,输出是一个分数或更优回答的类别
      • Reference model / Frozen model:基准模型,是 Policy model 的冻结副本,用于计算策略模型的优势,即奖励信号需要减去基准奖励,以减少训练的不稳定性
      • Value model / Critic model:评估策略模型的预期回报,输入是环境状态(即用户输入)和策略模型的回答,输出是预期的回报,可以和策略模型共享参数也可以是单独的模型
    • 本文提出一种四合一的 RLHF 架构,具体步骤如下:
      1. SFT 模型上添加一个 Adapter 和一个 Value head,在人类偏好数据上训练,得到 Reward model,同时也可以被看做是 Value model
      2. SFT 模型基础上重新添加一个 Adapter 和一个 Value head,用 PPO 强化学习算法训练,得到 Policy model
      3. 再结合冻结的 SFT 模型作为 Reference model
      4. 即:
        1. 奖励模型:SFT model + Adapter 1 + Value head 1
        2. 价值模型:SFT model + Adapter 1 + Value head 1
        3. 策略模型:SFT model + Adapter 2 + Value head 2
        4. 基准模型:SFT model
        5. 这里提到的 Adapter 可理解为 LoRA 等适配器训练得到的一组参数,附加在原始模型参数上,在 PEFT 库中可用 set_adapter / disable_adapter 等方法灵活控制
  • 分布式训练:主要借助 DeepSpeed 库实现

其他部分

  • 模型推理:用 transformersvLLM 库实现
  • 模型评估:支持多选任务(例如:MMLU / CMMLU / C-Eval)和计算文本相似度分数任务(例如:BLEU-4 / ROUGE

Thoughts

  • LLAMAFACTORY 主要面向大模型微调而不是预训练,确实做了很多工程化的工作,使得不同预训练模型 + 不同数据集 + 不同微调方法的组合变得更加容易,github 上恐怖的 star 数也是有目共睹
  • 虽然是基于多个基础库的封装,但想要做到如此必须有及其广阔的大模型知识面以及对其进行抽象的能力,这一点是难得可贵的
  • 不过随着大模型范式的不断更新,维护一套这样的工程是非常有挑战性的

TL;DR

  • 本文是上手训练大模型系列的第一篇,主要介绍了如何用指令微调数据集去微调一个大模型。
  • 为了快速走通流程,本文选择了较小的指令微调数据集和较小的模型。

详细介绍

SFT

  • SFT 全称是 Supervised Fine-Tuning,监督微调,是预训练大模型在变成实际可用模型必须经过的一个步骤。
  • 训练的基础配置(例如:optimizerlearning reteweight decay)等一般都是从预训练模型继承过来的。
  • 但实际上训练范式已经完全变化了,从无监督的预训练变成了有监督的微调,具体来说:
    • 预训练过程中,模型只需要预测下一个 token,直到输出 EOS 或者达到最大长度。
    • Instruct-SFT 阶段,模型的输入是一条指令(通常是 Instruction + Input,即指令和输入),输出是指令对应的输出。指令部分的预测不会被用于 loss 的计算,只有输入部分的预测会被用于 loss 的计算。
  • 用一个例子来说明,假如有一条数据:“请计算:1 + 1 = 2”
    • 在预训练阶段
      • 模型 input 是 [“请”,“计算”,“:”,“1”,“+”,“1”,“=”,“2”]
      • 模型 label 是 [“计算”,“:”,“1”,“+”,“1”,“=”,“2”, “pad”],其中 pad 是占位符,表示这个位置不需要预测。
      • pad 之外,其他位置的预测都会被用于 loss 的计算。
    • Instruct-SFT 阶段
      • 模型 input 是 [“请”,“计算”,“:”,“1”,“+”,“1”,“=”]
      • 模型 label 是 [“pad”, “pad”, “pad”, “pad”, “pad”, “pad”, “2”]
      • 只有最后一个位置的预测会被用于 loss 的计算。
  • 从实际推理效果看:
    • SFT 前,模型的输出重复性较高,指令遵循能力很差。
    • SFT 后,模型的输出重复性降低,指令遵循能力明显提升,但可能产生格式依赖问题。

Alpaca-cleaned 数据集

  • Alpaca-cleaned 是一个指令微调数据集,数据集地址:Alpaca-cleaned
  • 数据集包含 51,760 条数据,每条数据包含 instructioninputoutput 三个字段。
  • 其中 instruction 是指令,input 是输入,output 是输出,input 字段可能为空。
  • 例如:
    • instruction 是 “请计算如下数学题:”
    • input 是 “1 + 1 =”
    • output 是 “2”

Llama-3.2-3B

  • Llama-3.2-3Bmeta 开源的预训练大模型(未经过 SFTRLHF),模型地址:Llama-3.2-3B
  • 词表大小 128,256,支持英语、德语、法语、意大利语、葡萄牙语、印地语、西班牙语和泰语等多种语言(就是不支持中文,好烦)。
  • 模型有 32Transformer,每个 Transformer32Head,使用了 GQA 降低了 Transformer 的计算复杂度,hidden size4096

指令微调

  • 指令微调的流程如下:
    1. 加载预训练模型和 tokenizer
    2. 加载数据集并进行预处理
    3. 分布式训练
  • 具体实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import torch
import torch.distributed as dist
from datasets import load_dataset
from datasets import Split
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.utils.data import IterableDataset, DataLoader
import deepspeed
from torch.utils.tensorboard import SummaryWriter
# 初始化 DeepSpeed
deepspeed.init_distributed()
# 设置环境变量以避免 tokenizers 并行化问题
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# 设置分布式训练所需的环境变量
local_rank = int(os.getenv("LOCAL_RANK", "0"))
world_size = int(os.getenv("WORLD_SIZE", "1"))
os.environ["RANK"] = str(local_rank)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "23333"
# NCCL 超时参数配置
os.environ["NCCL_BLOCKING_WAIT"] = "1"
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
os.environ["NCCL_DEBUG"] = "INFO"
os.environ["NCCL_TIMEOUT"] = "600"
# 初始化进程组
if not dist.is_initialized():
dist.init_process_group(backend="nccl")
# config
model_path = "./models/meta-llama/Llama-3.2-3B"
dataset_path = "./dataset/LLM/yahma___alpaca-cleaned"
ds_config = "./ds_config.json"
log_dir = "./logs"
max_length = 1024
batch_size = 8
train_epochs = 3
# Load the local alpaca dataset
dataset_stream = load_dataset(dataset_path, split=Split.TRAIN, streaming=True)
# tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path, token=True)
tokenizer.pad_token = tokenizer.eos_token # llama-3.2 tokenizer padding token not set
# model
model = AutoModelForCausalLM.from_pretrained(model_path, token=True)
model.to(f"cuda:{local_rank}") # 将模型移动到对应的 GPU
# dataset
class StreamDataset(IterableDataset):
def __init__(self, data_stream, tokenizer, max_length=max_length):
self.data_stream = data_stream
self.tokenizer = tokenizer
self.max_length = max_length
def __iter__(self):
for data in self.data_stream:
instruction = data["instruction"]
input_text = data["input"]
output = data["output"]
# 构造 Prompt
if input_text.strip():
prompt = f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n"
else:
prompt = f"### Instruction:\n{instruction}\n\n### Response:\n"
# 拼接完整输入序列:prompt + output
full_text = prompt + output
# Tokenize
input_ids = tokenizer(
full_text,
truncation=True,
max_length=self.max_length,
padding="max_length",
)["input_ids"]
# 构造 labels: 仅监督 output 部分
# 找到 output 起始位置
prompt_length = len(
tokenizer(prompt, padding="do_not_pad", truncation=False)["input_ids"]
)
labels = [-100] * prompt_length + input_ids[prompt_length:] # 忽略 prompt 的部分
labels = labels[: self.max_length] # 截断
labels = labels + [-100] * (max_length - len(labels)) # Padding 保持一致长度
yield {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"labels": torch.tensor(labels, dtype=torch.long),
}
def __len__(self):
return 51760 # cleaned alpaca dataset rows
def train():
# DataLoader
train_dataset = StreamDataset(dataset_stream, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
# TensorBoard SummaryWriter
writer = SummaryWriter(log_dir=log_dir) if dist.get_rank() == 0 else None
# DeepSpeed 初始化
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config=ds_config,
training_data=train_dataset,
)
# 训练循环
for epoch in range(train_epochs): # num_train_epochs
for step, batch in enumerate(train_dataloader):
inputs = {
key: value.to(f"cuda:{local_rank}") for key, value in batch.items()
}
outputs = model_engine(**inputs) # 添加 labels 参数
loss = outputs.loss
model_engine.backward(loss)
model_engine.step()
# 日志打印和 TensorBoard 记录
if dist.get_rank() == 0:
if step % 10 == 0:
print(
f"Epoch: {epoch}, Step: {step}/{len(train_dataloader)}, Loss: {loss.item()}"
)
writer.add_scalar(
"Loss/train", loss.item(), epoch * len(train_dataloader) + step
)
checkpoint_path = f"./checkpoints"
model_engine.save_checkpoint(checkpoint_path, tag=f"epoch_{epoch}")
print(f"Checkpoint saved at {checkpoint_path}")
# 关闭 TensorBoard SummaryWriter
if writer:
writer.close()
if __name__ == "__main__":
train()
  • 总体上使用了:
    • transformers 加载模型和 tokenizer
    • datasets 加载开源数据集。
    • deepspeed 进行分布式训练,包含大规模分布式训练和零冗余优化以及混合精度训练等。

模型格式转换

  • 使用 deepspeed 训练和保存的模型格式是 deepspeed 特有的,无法直接加载到 transformers 中,因此需要进行模型格式转换。
  • deepspeed 保存的模型结构如下:
1
2
3
4
5
6
7
8
checkpoints/
├── epoch_xxx
│ ├── mp_rank_00_model_states.pt
│ ├── zero_pp_rank_0_mp_rank_00_optim_states.pt
│ ├── zero_pp_rank_1_mp_rank_00_optim_states.pt
│ ├── zero_pp_rank_2_mp_rank_00_optim_states.pt
│ └── zero_pp_rank_3_mp_rank_00_optim_states.pt
└── latest
  • 转换过程分成两步:
    1. 使用 deepspeed 在保存模型同时提供的 zero_to_fp32.py 脚本将模型转换为 fp32 格式(主要是将不同的 partfp16 模型参数用 ring reduce 聚合成完整的并转成 fp32),转换后的模型格式是 pytorch 适用的(zero_to_fp32.py 只会转换 latest 指向的 epoch)。
    2. pytorch 格式的模型转成 transformers 格式。
  • 第一步代码 deepseed 提供了,第二步代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
from transformers import AutoModelForCausalLM, AutoTokenizer
final_model_save_path = "./checkpoints/final_transformers_model" # 保存最终模型的路径
original_model_path = "./models/meta-llama/Llama-3.2-3B" # 原始模型的路径
sharded_model_dir = "./checkpoints/fp32_checkpoints" # SFT fp32 分片模型的路径
# 1. 将原始模型配置复制到最终模型目录,需要让 transformers 自动识别模型
os.system(
f"cp {os.path.join(original_model_path, 'config.json')} {final_model_save_path}"
)
# 2. 将分片模型转换为 transformers 模型
model = AutoModelForCausalLM.from_pretrained(sharded_model_dir)
model.save_pretrained(final_model_save_path)
# 3. 补全 tokenizer 配置
tokenizer = AutoTokenizer.from_pretrained(original_model_path, token=True)
tokenizer.save_pretrained(final_model_save_path)
  • 转换后的模型变成了 transformers 可以识别的格式,可以直接加载到 transformers 中进行推理。

推理

  • 将转换后的模型加载到 transformers 中,即可使用 pipeline 进行推理。
  • 推理代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
from transformers import pipeline
model_id = "./checkpoints/final_transformers_model"
pipe = pipeline(
"text-generation", model=model_id, torch_dtype=torch.bfloat16, device_map="auto"
)
instruction = "Use Python code to implement."
input_text = "Find prime numbers within 100."
total_input = f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n"
print(
pipe(total_input, max_length=1024)[0][
"generated_text"
]
)

Thoughts

  • 大模型是个非常依赖工具的领域,transformersdatasetsdeepspeed 等工具的使用对于大模型的最终效果非常重要。
  • 大模型即使在微调阶段,能做的也不多,什么都不能改,数据集的质量对于模型的影响非常大。
  • 指令微调前后,模型差距不是一星半点。

URL

TL;DR

  • Actor-Critic 算法是一种结合了策略梯度和值函数的方法,Actor 负责输出策略,Critic 负责输出状态价值函数。和 REINFORCE 算法相比,Actor-Critic 算法多引入了一个 Critic 网络,用于评估 Actor 的策略。
  • A2C 是对 Actor-Critic 算法的改进,引入了 Advantage 项来评估当前状态的优势,优化了 Actor 的损失计算。
  • A3C 是在 A2C 的基础上引入了多个并行的 Actor-Critic 网络,用于加速训练。
  • PPO 也是 Actor-Critic 架构下的一种算法,引入了 Clipped Surrogate ObjectiveGeneralized Advantage Estimation (GAE),用于提高策略梯度的稳定性和效率。

Algorithms

1. Actor-Critic Algorithms

公式表述

  • 这是一篇 1999 年的强化学习论文,是对于 1992REINFORCE 算法论文 Simple Statistical Gradient-Following Algorithms for Connectionist Reinforcement Learning 的扩展。
  • REINFORCE 算法是一种纯粹的策略梯度算法,也就是只有 Actor 网络,没有 Critic 网络。
  • Actor-Critic 算法在 REINFORCE 算法的基础上引入了一个 Critic 网络,用于评估当前状态的价值(或当前状态-空间的价值)。
  • Actor 的输入是 state,输出是 action 的分布均值和标准差,在均值和标准差组成的正态分布中采样得到 action
  • Critic 的输入是 statestate-action,输出是当前状态的价值,即 V(s)Q(s, a)
  • Actor 的损失:

Loss=t=1Tlogπθ(atst)G(st,at)Loss = -\sum_{t=1}^T \log \pi_{\theta}(a_t|s_t) \cdot G(s_t,a_t)

  • 其中,G(st,at)G(s_t,a_t) 是当前状态的实际累积回报
  • Critic 的损失:

Loss=t=1TMSE(G(st,at)Vθπ(st))Loss=\sum_{t=1}^T MSE(G(s_t,a_t)- V_{\theta}^{\pi}(s_t))

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class ActorCriticNetwork(nn.Module):
"""Actor-Critic 网络,包含策略网络和价值网络。"""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""初始化 Actor-Critic 网络。
参数:
obs_space_dims: 观测空间维度
action_space_dims: 动作空间维度
"""
super().__init__()
hidden_space1 = 16
hidden_space2 = 32
# 共享网络
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# 策略网络输出动作的均值和标准差
self.policy_mean_net = nn.Linear(hidden_space2, action_space_dims)
self.policy_stddev_net = nn.Linear(hidden_space2, action_space_dims)
# 价值网络输出状态价值估计
self.value_net = nn.Linear(hidden_space2, 1)
def forward(self, x: torch.Tensor):
"""前向传播,输出动作参数和状态价值。
参数:
x: 环境的观测值
返回:
action_means: 动作均值
action_stddevs: 动作标准差
state_values: 状态价值估计
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
state_values = self.value_net(shared_features)
return action_means, action_stddevs, state_values
class ActorCriticAgent:
"""Actor-Critic 算法的实现。"""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""初始化 Agent。
参数:
obs_space_dims: 观测空间维度
action_space_dims: 动作空间维度
"""
self.learning_rate = 1e-4
self.gamma = 0.99
self.eps = 1e-6
self.log_probs = []
self.state_values = []
self.rewards = []
self.net = ActorCriticNetwork(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray):
"""根据策略采样动作。
参数:
state: 环境的观测值
返回:
action: 采样的动作
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs, state_value = self.net(state)
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
log_prob = distrib.log_prob(action)
action = action.numpy()
self.log_probs.append(log_prob)
self.state_values.append(state_value)
return action
def update(self):
"""更新策略网络和价值网络。"""
# 将列表转换为张量
log_probs = torch.stack(self.log_probs)
state_values = torch.cat(self.state_values).squeeze()
rewards = self.rewards
# 计算折扣回报
returns = []
G = 0
for R in rewards[::-1]:
G = R + self.gamma * G
returns.insert(0, G)
returns = torch.tensor(returns)
# 计算损失
policy_loss = -(log_probs * returns.detach()).sum()
value_loss = nn.functional.mse_loss(state_values, returns)
loss = policy_loss + value_loss
# 更新网络参数
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 清空存储的值
self.log_probs = []
self.state_values = []
self.rewards = []
  • 实现上和 REINFORCE 算法的代码相比,有以下几点不同:
    • 引入了 Critic 网络,用于评估当前状态的价值(可以理解为训练一个可微的环境仿真器)。
    • 损失函数中包含了 Critic 的损失,用于更新 Critic 网络。

2. Advantage Actor-Critic (A2C)

公式表述

  • A2C 的全称是 Advantage Actor-Critic,是对于 Actor-Critic 算法的改进,引入了 Advantage 项,用于评估当前状态的优势。
  • 具体来说:在计算 Actor 的损失时,使用的是 Advantage 项,而不是直接使用回报,即:

Loss=t=1Tlogπθ(atst)A(st,at)A(st,at)=G(st,at)V(st)Loss = -\sum_{t=1}^T \log \pi_{\theta}(a_t|s_t) \cdot A(s_t,a_t)\\A(s_t,a_t)=G(s_t,a_t) - V(s_t)

  • 其中,A(st,at)A(s_t,a_t) 是当前状态的优势,G(st,at)G(s_t,a_t) 是当前状态的实际回报,V(st)V(s_t) 是当前状态的价值估计。
  • 其他部分和 Actor-Critic 算法基本一致。

代码表述

1
2
3
4
5
# Actor-Critic 算法的实现
actor_loss = -(log_probs * returns.detach()).sum()
# Advantage Actor-Critic 算法的实现
advantages = returns - state_values
actor_loss = -(log_probs * advantages.detach()).sum()

3. Asynchronous Advantage Actor-Critic (A3C)

  • A3C 是在 A2C 的基础上引入了多个并行的 Actor-Critic 网络,用于加速训练。
  • 由于强化学习的数据来自于和环境交互,因此如果可以让多个 worker 并行地和环境交互,然后将数据汇总到一个 learner 网络中,就可以加速训练。A3C 就是这样一种算法。
  • 更偏工程化一些,主要涉及到多线程/多进程的并行计算,以及梯度的更新和参数同步等。

4. Proximal Policy Optimization (PPO)

公式表述

  • PPO 是一种 Actor-Critic 算法,核心思想是:在更新策略参数时,限制新策略和旧策略之间的差异,以此来保证策略更新的稳定性
  • PPO 相较于 A2C
    1. 限制更新幅度:引入了 Clipped Surrogate Objective,限制新策略和旧策略之间的差异,即:

    LCLIP(θ)=Et[min(rt(θ)At,clip(rt(θ),1ϵ,1+ϵ)At)]rt(θ)=πθ(atst)πθold(atst)L^{CLIP}(\theta) = \mathbb{E}_t[\min(r_t(\theta) \cdot A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) \cdot A_t)]\\r_t(\theta)=\frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}

    • 其中:
      • LCLIP(θ)L^{CLIP}(\theta)Clipped Surrogate Objective
      • rt(θ)r_t(\theta) 是新旧策略在状态 sts_t 采取动作 ata_t 的概率比
      • AtA_t 是优势函数
      • ϵ\epsilon 是区间宽度超参数。
    1. 优势函数估计:引入了广义优势估计 Generalized Advantage Estimation (GAE),用于提高策略梯度估计的准确性和平滑性。

    At=l=0(γλ)lδt+lδt=rt+γV(st+1)V(st)A_t = \sum_{l=0}^{\infty}(\gamma \lambda)^l \delta_{t+l}\\\delta_t=r_t+\gamma V(s_{t+1}) - V(s_t)

    • 其中:
      • AtA_t 是广义优势估计
      • δt\delta_t 是时序差分 (TD Error)
      • λ\lambda 是折扣因子
      • γ\gamma 是优势函数的折扣因子
      • V(st)V(s_t) 是状态价值估计
    1. 多次采样和小批量更新:引入了记忆、多次采样和小批量更新,用于提高数据的利用效率,即:存储一个或多个 episode 所有轨迹,然后每次更新时从中随机抽取数据分 batch 训练
    2. 一阶优化算法:使用一阶优化算法,如 Adam / SGD 等,而不是 TRPO 等算法用到的二阶优化算法,训练复杂度低效率高。
    3. 联合优化:同时优化策略和价值网络,即:

    L(θ)=LCLIP(θ)+c1E[(Vθ(st)Vttarget)2]+c2H(πθ)L(\theta)= L^{CLIP}(\theta) + c_1 \cdot \mathbb E[(V_{\theta}(s_t) - V_t^{target})^2] + c_2 \cdot H(\pi_\theta)

    • 其中:
      • LCLIP(θ)L^{CLIP}(\theta)Clipped Surrogate Objective,用于优化策略
      • 第二项是价值网络的损失,用于优化价值网络
      • 第三项是熵正则项,用于提高策略的探索性

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class PPOActorCriticNetwork(nn.Module):
""" PPO Actor-Critic 网络,包含策略网络和价值网络。 """
def __init__(self, obs_space_dims: int, action_space_dims: int):
super().__init__()
hidden_space1 = 16
hidden_space2 = 32
# 共享网络
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# 策略网络输出动作的均值和标准差
self.policy_mean_net = nn.Linear(hidden_space2, action_space_dims)
self.policy_stddev_net = nn.Linear(hidden_space2, action_space_dims)
# 价值网络输出状态价值估计
self.value_net = nn.Linear(hidden_space2, 1)
def forward(self, x: torch.Tensor):
"""前向传播,输出动作参数和状态价值。"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.nn.functional.softplus(
self.policy_stddev_net(shared_features)
)
action_stddevs = torch.clamp(action_stddevs, min=1e-3) # 标准差裁剪
state_values = self.value_net(shared_features)
return action_means, action_stddevs, state_values
class PPOAgent:
""" PPO 算法的实现。 """
def __init__(self, obs_space_dims: int, action_space_dims: int):
self.learning_rate = 1e-5
self.gamma = 0.99
self.lam = 0.95
self.eps_clip = 0.1
self.entropy_coeff = 0.01
self.epochs = 10
self.batch_size = 64
self.net = PPOActorCriticNetwork(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=self.learning_rate)
self.memory = []
def sample_action(self, state: np.ndarray):
"""根据策略采样动作。"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs, state_value = self.net(state)
distrib = Normal(action_means[0], action_stddevs[0] + 1e-3)
action = distrib.sample()
action = torch.clamp(action, -3.0, 3.0)
log_prob = distrib.log_prob(action).sum()
return action.numpy(), log_prob, state_value
def store_transition(self, transition):
"""存储轨迹数据。"""
self.memory.append(transition)
def process_batch(self):
"""处理并计算优势函数和目标回报值。"""
states, actions, log_probs, rewards, dones, state_values = zip(*self.memory)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.float32)
log_probs = torch.tensor(log_probs, dtype=torch.float32)
state_values = torch.tensor(state_values, dtype=torch.float32).squeeze()
rewards = torch.tensor(rewards, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.float32)
# 在 state_values 末尾添加一个 0,以表示最后一个状态的下一个状态价值
next_state_values = torch.cat([state_values[1:], torch.tensor([0.0])]) * (
1 - dones
)
# 计算 delta
deltas = rewards + self.gamma * next_state_values * (1 - dones) - state_values
# 计算优势函数
advantages = torch.zeros_like(rewards)
running_adv = 0
for t in reversed(range(len(rewards))):
running_adv = deltas[t] + self.gamma * self.lam * running_adv * (
1 - dones[t]
)
advantages[t] = running_adv
# 计算目标回报值
returns = advantages + state_values
# 标准化优势函数
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 清空记忆
self.memory = []
return states, actions, log_probs, returns, advantages
def update(self):
"""更新策略和价值网络。"""
states, actions, old_log_probs, returns, advantages = self.process_batch()
dataset = torch.utils.data.TensorDataset(
states, actions, old_log_probs, returns, advantages
)
loader = torch.utils.data.DataLoader(
dataset, batch_size=self.batch_size, shuffle=True
)
for _ in range(self.epochs):
for (
batch_states,
batch_actions,
batch_old_log_probs,
batch_returns,
batch_advantages,
) in loader:
action_means, action_stddevs, state_values = self.net(batch_states)
distrib = Normal(action_means, action_stddevs + 1e-3)
log_probs = distrib.log_prob(batch_actions).sum(axis=-1)
entropy = distrib.entropy().sum(axis=-1)
# 计算 PPO 损失
# 1. 计算裁剪的策略损失
ratios = torch.exp(log_probs - batch_old_log_probs)
surr1 = ratios * batch_advantages
surr2 = (
torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip)
* batch_advantages
)
policy_loss = -torch.min(surr1, surr2).mean()
# 2. 计算价值损失
value_loss = nn.functional.mse_loss(
state_values.squeeze(), batch_returns
)
# 3. 计算熵损失
entropy_loss = -entropy.mean()
# 4. 加权得到总损失
loss = (
policy_loss + 0.5 * value_loss + self.entropy_coeff * entropy_loss
)
# 更新网络参数
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.net.parameters(), max_norm=0.1)
self.optimizer.step()

Thoughts

  • Actor-Critic 算法是一种结合了策略梯度和值函数的方法,多用于连续动作空间的强化学习任务。
  • 结合多种改进算法,Actor-Critic 算法占据了强化学习领域的重要地位。

URL

TL;DR

  • Double DQN: 解决了 DQN 中的过估计问题,使用目标网络和当前网络来共同计算 Q 值,从而减少 Q 值的偏差。
  • Dueling DQN: 提出了对偶网络,将 Q 函数分为价值函数和优势函数,这样可以更好地估计状态的价值,减少动作选择的随机性。
  • Prioritized Experience Replay: 改进了经验回放机制,通过给经验赋予优先级,使得学习过程更加高效。
  • Noisy DQN: 在网络中引入噪声以探索更加多样的策略,提高探索效率。
  • C51: 传统基于 Q 值的学习方式只关注期望值不关心分布,Distributional RL 引入了基于分布的强化学习的概念。
  • Rainbow: 将上述五种方法结合起来,另外还加入了 Multi-step Learning(多步学习),六合一行成了一个强大的 DQN 变体。

Algorithm

1. Double DQN

公式表述

  • DQN 中计算目标 Q 值时,使用的是目标网络的最大 Q 值,即 yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-) ,这样会导致过估计问题
  • Double DQN 对其改进非常简单,使用 Q 值网络来选择动作,使用目标网络来评估该动作的 Q 值,即 yt=rt+1+γQ(st+1,argmaxaQ(st+1,a;θ);θ)y_t=r_{t+1}+\gamma Q(s_{t+1},\arg\max_{a'}Q(s_{t+1},a';\theta);\theta^-)

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
# DQN
q_values = self.q_network(states)
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.max(dim=1)[0] # 直接使用目标网络的最大 Q 值
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values
# Double DQN
q_values = self.q_network(states)
next_q_values = self.q_network(next_states)
next_actions = torch.argmax(next_q_values, dim=1) # 使用 Q 值网络选择动作
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(1) # 使用目标网络评估该动作的 Q 值
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values

2. Dueling DQN

公式表述

  • 状态价值函数和状态动作价值函数的定义:请参考 这里
  • 动作优势函数的定义:A(s,a)A(s,a) 表示在状态 ss 下选择动作 aa相对优势
  • DQN / Double DQN 中的 Q 函数是直接输出动作的 Q 值,这样会导致动作选择的随机性
  • Dueling DQN 提出了对偶网络,Q 函数分为价值函数和优势函数,即 Q(s,a)=V(s)+A(s,a)Q(s,a)=V(s)+A(s,a) ,这样可以更好地估计状态的价值
  • 原理也很简单:价值函数表示状态的价值,优势函数表示此状态下动作的相对优势,最后再将二者相加得到 Q
  • 实际使用时,为了保证优势函数的相对性,需要对优势函数进行零均值化处理,即 Q(s,a)=V(s)+A(s,a)1AaA(s,a)Q(s,a)=V(s)+A(s,a)-\frac{1}{|A|}\sum_{a'}A(s,a')

代码表述

1
2
3
4
5
6
# DQN
q_values = self.q_network(states)
# Dueling DQN
value = self.value_network(states)
advantage = self.advantage_network(states)
q_values = value + advantage - advantage.mean(dim=1, keepdim=True)
  • 其他部分和 Double DQN 完全一样
  • 深度学习领域有个说法:如果一个变量可以被拆成多个变量的组合,那么理论上会更好(参数量和计算量都高了,自然是好事)

3. Prioritized Experience Replay

公式表述

  • DQN 中的经验回放机制是随机采样的或顺序采样的,这样会导致一些重要的经验被忽略
  • Prioritized Experience Replay 改进了经验回放机制,通过给经验赋予优先级,使得学习过程更加高效
  • 具体做法是:根据 TD 误差(时序差分误差 Temporal Difference Error)的大小来给经验赋予优先级,然后按照优先级采样
  • 时序差分误差的定义:当前状态下预测值和实际回报之间的差异,即 TD=Rt+1+γV(St+1)V(St)TD=R_{t+1}+\gamma V(S_{t+1})-V(S_t)
    • Rt+1R_{t+1} 表示在 t+1t+1 时刻的奖励
    • V(St+1)V(S_{t+1}) 表示在 t+1t+1 时刻的状态价值
    • V(St)V(S_t) 表示在 tt 时刻的状态价值(即对状态 StS_t 未来回报的预期)
    • γ\gamma 表示折扣因子

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class SumTree:
"""
SumTree data structure to implement Prioritized Experience Replay
"""
def __init__(self, capacity):
self.capacity = capacity
self.size = 0
self.tree = np.zeros(2 * capacity - 1)
self.data = [None] * capacity
def add(self, priority, data):
# 更新 data 节点的值
self.data[self.size % self.capacity] = data
# 更新 tree 节点的值,同时更新父节点
idx = self.size % self.capacity + self.capacity - 1
change = priority - self.tree[idx]
self.tree[idx] = priority
while idx != 0:
idx = (idx - 1) // 2
self.tree[idx] += change
self.size += 1
def get(self, priority_sum):
idx = 0
while idx < self.capacity - 1:
left = 2 * idx + 1
right = left + 1
if priority_sum <= self.tree[left]:
idx = left
else:
priority_sum -= self.tree[left]
idx = right
data_idx = idx - self.capacity + 1
return self.tree[idx], self.data[data_idx]
def total_priority(self):
return self.tree[0]
class PERAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
alpha: float = 0.6, # 权重参数,决定优先级的影响
beta: float = 0.4, # 用于重要性采样的权重,随着训练增加
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.alpha = alpha
self.beta = beta
# 初始化 SumTree 作为经验池
self.memory = SumTree(memory_size)
# Q 网络和目标网络
self.q_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network.load_state_dict(self.q_network.state_dict()) # 初始化目标网络
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: Tuple[int, int, bool]) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return self.env.action_space.sample() # 探索:随机选择动作
else:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
return torch.argmax(q_values).item() # 利用:选择 Q 值最大的动作
def update(self):
"""从优先经验回放池中抽取一个批次的经验,进行 Q 网络的更新"""
if self.memory.size < self.batch_size:
return # 如果经验回放池中的样本不足一个批次,则不进行更新
batch_indices = np.random.uniform(
0, self.memory.total_priority(), size=self.batch_size
)
# 批量从 SumTree 中获取经验
batch = []
for priority in batch_indices:
_, data = self.memory.get(priority) # 获取对应的经验样本
batch.append(data)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.int64)
# 计算 Q 值:使用当前网络和目标网络
q_values = self.q_network(states)
next_q_values = self.q_network(next_states)
next_actions = torch.argmax(next_q_values, dim=1)
# 计算 next_states 对应的目标 Q 值
next_q_values = self.target_network(next_states)
max_next_q_values = next_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(
1
)
# 计算目标 Q 值
target_q_values = (
rewards + (1 - dones) * self.discount_factor * max_next_q_values
)
# 计算当前 Q 值和目标 Q 值的损失
q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
# 计算重要性采样权重
weights = torch.tensor(
[(1.0 / self.memory.size) ** self.beta for _ in range(self.batch_size)]
)
loss = (weights * (q_value - target_q_values) ** 2).mean()
# 优化 Q 网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def store_experience(self, obs, action, reward, next_obs, done):
"""计算 TD 误差并存储到优先经验回放池"""
obs_tensor = torch.tensor([obs], dtype=torch.float32)
next_obs_tensor = torch.tensor([next_obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
next_q_values = self.q_network(next_obs_tensor)
next_actions = torch.argmax(next_q_values, dim=1)
# 计算 TD 误差
target_q_values = (
self.target_network(next_obs_tensor)
.gather(1, next_actions.unsqueeze(1))
.squeeze(1)
)
q_value = q_values.gather(
1, torch.tensor([action], dtype=torch.int64).unsqueeze(1)
).squeeze(1)
td_error = abs(target_q_values - q_value).item()
# 将经验存储到回放池,TD 误差用作优先级
self.memory.add(td_error + 1e-6, (obs, action, reward, next_obs, done))
def decay_epsilon(self):
"""逐渐减少 epsilon"""
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def update_target_network(self):
"""每隔一段时间更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
  • 代码中的 SumTree 类是一个优先级经验回放池,用于存储经验和计算优先级
  • 实现上,SumTree 由两部分组成:
    • tree:用于存储优先级,是一个完全二叉树
      • 叶子节点存储优先级,长度为 Capacity
      • 非叶子节点存储子节点的优先级之和,长度为 Capacity - 1
      • 总长度为 2 * Capacity - 1,根节点存储所有优先级之和
      • 叶子节点之间无需排序,这就是 SumTree 的优势,可以用 O(logN) 的时间复杂度实现优先级采样
    • data:用于存储经验,是一个列表
      • 长度为 Capacity,存储 tree 中叶子节点对应的经验,和 tree 中优先级一一对应,同步更新
  • SumTree 的作用:输入 [0, total_priority] 范围的均匀分布,输出非均匀的优先级采样结果
  • 在取数据时,只需要在 [0, total_priority] 范围内随机取样,然后 SumTree 的返回结果就是已经按照优先级采样的数据

4. Noisy DQN

公式表述

  • 传统 Q Learning / DQN 都采用 epsilon-greedy,即以 ϵ\epsilon 的概率随机选择动作,以 1ϵ1-\epsilon 的概率选择 Q 值最大的动作
  • 但是,这种方法有一个问题:随机性不够,可能会导致陷入局部最优解;随机性太大,会浪费太多时间在无效搜索上
  • Noisy DQN 引入了噪声网络的概念,即在网络中引入噪声以探索更加多样的策略,提高探索效率
  • 具体做法是:训练阶段在网络的全连接层中引入高斯噪声,即:

y=wx+b   =>   y=(μw+σwϵw)x+μb+σbϵby=wx+b\ \ \ =>\ \ \ y=(\mu^w+\sigma^w\odot \epsilon^w)x+\mu^b+\sigma^b\odot \epsilon^b

  • 其中,μw,μb,σw,σb\mu_w,\mu_b,\sigma_w,\sigma_b 是可训练参数,ϵw,ϵb\epsilon^w,\epsilon^b 是动态高斯噪声
  • 对于噪声的引入,有两种方式:
    • Independent Gaussian Noise(独立高斯噪声):每个权重和偏置都有都独立同分布
      • ϵwRfan_in×fan_outN(0,1)\epsilon^w\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal N(0,1)
      • ϵbRfan_outN(0,1)\epsilon^b\in\mathbb R^{fan\_out}\sim \mathcal N(0,1)
      • μinitwRfan_in×fan_outU(3fan_in,3fan_in)\mu^w_{init}\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal U(-\sqrt\frac{3}{fan\_in},\sqrt\frac{3}{fan\_in})
      • μinitbRfan_outU(3fan_in,3fan_in)\mu^b_{init}\in\mathbb R^{fan\_out}\sim \mathcal U(-\sqrt\frac{3}{fan\_in},\sqrt\frac{3}{fan\_in})
      • σinitw[0.017]fan_in×fan_out\sigma^w_{init}\in [0.017]^{fan\_in\times fan\_out}
      • σinitb[0.017]fan_out\sigma^b_{init}\in [0.017]^{fan\_out}
    • Factorized Gaussian Noise (分解高斯噪声):将 ϵw\epsilon_w 分解为两个矩阵的乘积,以减少参数量
      • ϵinRfan_inN(0,1)\epsilon^{in}\in\mathbb R^{fan\_in}\sim \mathcal N(0,1)
      • ϵoutRfan_outN(0,1)\epsilon^{out}\in\mathbb R^{fan\_out}\sim \mathcal N(0,1)
      • ϵw=ϵinϵout\epsilon^w=\epsilon^{in}\otimes\epsilon^{out}
      • ϵb=ϵout\epsilon^b=\epsilon^{out}
      • μinitwRfan_in×fan_outU(1fan_in,1fan_in)\mu^w_{init}\in\mathbb R^{fan\_in\times fan\_out}\sim \mathcal U(-\frac{1}{\sqrt {fan\_in}},\frac{1}{\sqrt{fan\_in}})
      • μinitbRfan_outU(1fan_in,1fan_in)\mu^b_{init}\in\mathbb R^{fan\_out}\sim \mathcal U(-\frac{1}{\sqrt {fan\_in}},\frac{1}{\sqrt{fan\_in}})
      • σinitw[0.5fan_in]fan_in×fan_out\sigma^w_{init}\in[\frac{0.5}{\sqrt{fan\_in}}]^{fan\_in\times fan\_out}
      • σinitb[0.5fan_in]fan_out\sigma^b_{init}\in[\frac{0.5}{\sqrt{fan\_in}}]^{fan\_out}
  • inference 阶段,直接使用 μw\mu^wμb\mu^b 进行计算,不使用 ϵ\epsilonσ\sigma

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import torch
import torch.nn as nn
class NoisyLinear(nn.Module):
def __init__(self, in_features, out_features, sigma_init=0.017):
super(NoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
# 可训练参数
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.sigma_init = sigma_init
self.reset_parameters()
def reset_parameters(self):
mu_range = 1 / torch.sqrt(torch.tensor(self.in_features, dtype=torch.float32))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_init)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_init)
def forward(self, input):
if self.training:
# 动态生成噪声 epsilon
weight_epsilon = torch.randn(self.out_features, self.in_features).to(
input.device
)
bias_epsilon = torch.randn(self.out_features).to(input.device)
weight = self.weight_mu + self.weight_sigma * weight_epsilon
bias = self.bias_mu + self.bias_sigma * bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(input, weight, bias)
class FactorisedNoisyLinear(nn.Module):
def __init__(self, in_features, out_features, sigma_zero=0.5):
super(FactorisedNoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.sigma_zero = sigma_zero / torch.sqrt(
torch.tensor(self.in_features, dtype=torch.float32)
)
# 可训练参数
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.reset_parameters()
def reset_parameters(self):
mu_range = 1 / torch.sqrt(torch.tensor(self.in_features, dtype=torch.float32))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_zero)
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_zero)
def _scale_noise(self, size):
x = torch.randn(size).to(self.weight_mu.device)
return x.sign() * x.abs().sqrt()
def forward(self, input):
if self.training:
# 动态生成噪声 epsilon
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
weight_epsilon = torch.ger(epsilon_out, epsilon_in)
bias_epsilon = epsilon_out
weight = self.weight_mu + self.weight_sigma * weight_epsilon
bias = self.bias_mu + self.bias_sigma * bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(input, weight, bias)

5. C51

公式表述

  • C51 (Categorical 51 atoms) 直译是:离散 51 个原子,是 Distributional RL 的一种算法,即基于分布的强化学习。
  • 传统基于 Q 值的学习方式只关注期望值(Q 值,即未来回报的期望)不关心分布(未来回报的概率分布),C51 改进了这一点
  • C51 提出 概率密度函数 Z(s, a),即在状态 s 下选择动作 a未来回报分布
  • 同时提出了基于分布的 Bellman 方程,即:

Z(s,a)=R(s,a)+γZ(s,a)Z(s,a)=R(s,a)+\gamma Z(s',a')

  • 其中:
    • Z(s,a)Z(s,a) 是在状态 s 下选择动作 a 的未来回报分布
    • R(s,a)R(s,a) 是在状态 s 下选择动作 a 的即时回报
    • γ\gamma 是折扣因子
    • Z(s,a)Z(s',a') 是下一个状态 s' 下选择动作 a' 的未来回报分布
  • Q 函数和 Z 函数的关系:Q(s,a)=zZ(s,a)zQ(s,a)=\sum_{z\in Z(s,a)}z
  • 由于连续分布不方便计算,C51 使用离散分布来近似 Z 函数,离散分布由 N 个均匀分布的原子组成,每个原子表示一个未来的回报值和其概率,原子被定义为:

zi=vmin+iΔzΔz=VmaxVminN1z_i=v_{min}+i\Delta z\\\Delta z=\frac{V_{max}-V_{min}}{N-1}

  • 其中,vminv_{min}vmaxv_{max} 分别是未来回报的最小值和最大值,Δz\Delta z 是原子之间的间隔
  • 同时,扩展 Bellman 方程到离散分布:

T^zi=r+γzi\hat{T}z_i=r+\gamma z_i

  • 离散分布的更新需要用到投影,也就是将更新后的分布重新映射到原子上

代码表述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class C51(nn.Module):
def __init__(self, input_dim, output_dim, n_atoms, V_min, V_max):
super(C51, self).__init__()
self.n_atoms = n_atoms
self.output_dim = output_dim
self.V_min = V_min
self.V_max = V_max
self.delta_z = (V_max - V_min) / (n_atoms - 1)
self.input_dim = input_dim
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_dim * n_atoms)
def forward(self, x):
x = torch.nn.functional.one_hot(
x.to(torch.int64), num_classes=self.input_dim
).float()
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
x = x.view(-1, self.output_dim, self.n_atoms)
# shape: (batch_size, action_space, n_atoms)
return torch.softmax(x, dim=-1)
class C51Agent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
n_atoms: int = 51,
V_min: float = -10.0,
V_max: float = 10.0,
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.n_atoms = n_atoms
self.V_min = V_min
self.V_max = V_max
self.delta_z = (V_max - V_min) / (n_atoms - 1)
self.z = torch.linspace(V_min, V_max, n_atoms).to(torch.float32)
self.memory = deque(maxlen=memory_size)
self.training_error = []
self.input_dim = env.observation_space.n
self.output_dim = env.action_space.n
self.q_network = C51(
input_dim=self.input_dim,
output_dim=self.output_dim,
n_atoms=n_atoms,
V_min=V_min,
V_max=V_max,
)
self.target_network = C51(
input_dim=self.input_dim,
output_dim=self.output_dim,
n_atoms=n_atoms,
V_min=V_min,
V_max=V_max,
)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: int) -> int:
if np.random.random() < self.epsilon:
return self.env.action_space.sample()
else:
obs_tensor = torch.tensor([obs], dtype=torch.int64)
dist = self.q_network(obs_tensor)
# 用离散概率密度函数计算Q值
q_values = torch.sum(dist * self.z, dim=2)
# 选择最大Q值对应的动作
action = torch.argmax(q_values, dim=1).item()
return action
def update(self):
if len(self.memory) < self.batch_size:
return
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.int64)
actions = torch.tensor(actions, dtype=torch.long)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.int64)
dones = torch.tensor(dones, dtype=torch.float32)
batch_size = states.size(0)
dist = self.q_network(states)
dist = dist[range(batch_size), actions]
with torch.no_grad():
next_dist = self.target_network(next_states)
next_q_values = torch.sum(next_dist * self.z, dim=2)
next_actions = torch.argmax(next_q_values, dim=1)
next_dist = next_dist[range(batch_size), next_actions]
# 离散化Bellman更新
Tz = rewards.unsqueeze(1) + (
1 - dones.unsqueeze(1)
) * self.discount_factor * self.z.unsqueeze(0)
Tz = Tz.clamp(self.V_min, self.V_max)
# 计算投影
b = (Tz - self.V_min) / self.delta_z
l = b.floor().long()
u = b.ceil().long()
l[(u > 0) * (l == u)] -= 1
u[(l < (self.n_atoms - 1)) * (l == u)] += 1
m = torch.zeros(batch_size, self.n_atoms)
offset = (
torch.linspace(0, ((batch_size - 1) * self.n_atoms), batch_size)
.unsqueeze(1)
.expand(batch_size, self.n_atoms)
.long()
)
m.view(-1).index_add_(
0, (l + offset).view(-1), (next_dist * (u.float() - b)).view(-1)
)
m.view(-1).index_add_(
0, (u + offset).view(-1), (next_dist * (b - l.float())).view(-1)
)
dist = dist + 1e-8
loss = -torch.sum(m * torch.log(dist), dim=1).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.training_error.append(loss.item())
def decay_epsilon(self):
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def store_experience(self, obs, action, reward, next_obs, done):
self.memory.append((obs, action, reward, next_obs, done))
def update_target_network(self):
self.target_network.load_state_dict(self.q_network.state_dict())
  • C51 算法对 DQN 的优化实际上和 Dueling DQN 有些类似,都是将 Q 函数分解成多个部分,让其计算量和参数量更大,但不对拆解后的部分做监督,最终将所有部分合并起来成为 Q 值,依据 Q 值来选择动作

6. Rainbow

1. Multi-step Learning

  • 出自论文:《Multi-Step Reinforcement Learning: A Unifying Algorithm》
  • DQN 中的目标 Q 值是单步的,即目标 Q 值为: yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-)
  • Multi-step Learning 引入了多步学习,即目标 Q 值为: yt=rt+1+γ2rt+2+γ3rt+3+...+γNrt+N+γN+1maxaQ(st+N+1,a;θ)y_t=r_{t+1}+\gamma^2r_{t+2}+\gamma^3r_{t+3}+...+\gamma^Nr_{t+N}+\gamma^{N+1}\max_{a'}Q(s_{t+N+1},a';\theta^-)
    • 其中 NN 为多步数
    • θ\theta^- 是目标网络的参数
  • 这样可以更好地估计长期回报,减少 Q 值的偏差

2. Rainbow

  • Rainbow = DQN + Double DQN + Dueling DQN + Prioritized Experience Replay + Noisy DQN + Distributional RL + Multi-step Learning

Thought

  • 这些算法都分别从不同的角度对 DQN 进行了改进,很多都是非常小的改进,容易理解
  • DQN 系列强化学习算法只能用于离散状态动作空间,绝大多数强化学习都是连续状态动作空间,所以 DQN 系列算法的应用场景有限

URL

TL;DR

  • 本文介绍了如何使用 DQN (Deep Q Network) 算法训练 AgentTaxi 游戏,重点介绍了 DQN 算法的原理和实现细节。
  • DQN 算法是一种结合了深度学习和 Q 学习的方法,通过使用神经网络来逼近 Q 函数,和 Q Learning 算法类似,只能用于解决离散状态动作下的强化学习问题(例如 Taxi 游戏),并用 Experience Replay 机制解决了和环境交互的数据效率问题。

Algorithm

0. Taxi Game

  • Taxi 游戏是一个强化学习环境,状态空间和动作空间都是离散的,适合用于测试 DQN 等离散状态动作的强化学习算法
    • 状态空间:500500 种状态,表示 Taxi 的位置、乘客的位置和目的地的位置
    • 动作空间:66 个动作,分别是:
      • 0: Move south (down)
      • 1: Move north (up)
      • 2: Move east (right)
      • 3: Move west (left)
      • 4: Pickup passenger
      • 5: Drop off passenger
    • 奖励机制:
      • +20: 送达乘客
      • -10: 乘客在错误位置上车或下车
      • -1: 每一步的惩罚
    • 结束条件:
      • 送达乘客
      • 达到 200 时间步
  • 训练 200episode 后的效果如下:
    episode-episode-200.gif
  • 训练 1000episode 后的效果如下:
    episode-episode-1000.gif

1. Q 函数

  • 价值函数:常用的价值函数有两种,分别是 Q 函数和 V 函数,这里的价值是 累积奖励的期望值(包含折扣),公式表示:E[t=0γtrt]\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t]
  • Q 函数:状态动作值函数,用于评估在状态 s 下采取动作 a 的价值,即 Q(s, a) 表示在状态 s 下采取动作 a 的价值,Qπ(s,a)=E[t=0γtrts0=s,a0=a,π]Q^{\pi}(s,a)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,a_0=a,\pi],其中:
    • E\mathbb E 表示期望操作,即求期望值
    • rtr_t 是在时间步 tt 收到的即时奖励
    • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • π\pi 是智能体的策略,决定了在每个状态下选择什么动作
  • V 函数:状态值函数,用于评估在状态 s 下的价值,Vπ(s)=E[t=0γtrts0=s,π]V^\pi(s)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,\pi]

特性 V 函数 (状态价值函数) Q 函数 (状态-动作价值函数)
定义 评估状态 s 的价值,即在状态 s 开始,按照策略 π 执行的期望累积奖励 评估状态 s 和动作 a 的价值,即在状态 s 选择动作 a,然后按照策略 π 执行的期望累积奖励
输入 状态 s 状态 s 和动作 a
输出 状态的价值,即期望的回报 状态-动作对的价值,即从状态 s 执行动作 a 后的期望回报
更新方式 V(s)V(s) 通过 Q(s,a)Q(s,a) 来更新,通常用贝尔曼方程 Q(s,a)Q(s,a) 直接通过贝尔曼方程更新,通常用于 Q-learningDQN
公式 Vπ(s)=E[t=0γtrts0=s,π]V^\pi(s)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,\pi] Qπ(s,a)=E[t=0γtrts0=s,a0=a,π]Q^\pi(s,a)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,a_0=a,\pi]
主要用途 用于评估某一状态的好坏,特别适合评估策略 用于评估某一状态下选择特定动作的好坏,通常用于 Q-learning 等算法

2. Q Learning

  • 决策:维护一个 Q TableRnum of state×num of actionQ\ Table\in\mathbb R^{num\ of\ state\times num\ of\ action},用于存储每个状态动作对的价值,对于每一个状态 s,以 1ϵ1-\epsilon 的概率选择具有最大 Q 值的动作 a,以 ϵ\epsilon 的概率随机选择动作 a,即

π(s)={random(a), with probabilityϵargmaxaQ(s,a), with probability1ϵ\pi(s)=\left\{ \begin{array}{rcl} random(a) & \text{, with probability} & \epsilon\\ \arg\max_aQ(s,a) & \text{, with probability} & 1-\epsilon \end{array} \right.

  • 更新:使用 Q Learning 算法更新 Q 函数,即 Q(s,a)Q(s,a)+α[r+γmaxaQ(s,a)Q(s,a)]Q(s,a)\leftarrow Q(s,a)+\alpha[r+\gamma\max_{a'}Q(s',a')-Q(s,a)],其中:
    • α\alpha 是学习率(0<α10<\alpha\leq1),用于控制更新的步长
    • rr 是在状态 s 采取动作 a 后获得的即时奖励
    • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • ss' 是在状态 s 采取动作 a 后转移到的下一个状态

3. DQN Algorithm

  • Q Learning 一个问题是:在较大的状态空间和动作空间下,使用 Q Table 存储所有状态动作对的价值是不现实的
  • 因此 DQN 算法使用神经网络来逼近 Q 函数,即 Q(s,a;θ)Q(s,a)Q(s,a;\theta)\approx Q^*(s,a),其中 θ\theta 是神经网络的参数,用于逼近 Q 函数
  • DQN 算法的目标是最小化 Q 函数的均方误差,即:

    Loss(θ)=Est,at,rt+1,st+1[(ytQ(st,at;θ))2]yt=rt+1+γmaxaQ(st+1,a;θ)Loss(\theta)=\mathbb E_{s_t,a_t,r_{t+1},s_{t+1}}[(y_t-Q(s_t,a_t;\theta))^2]\\ y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-)

    • 此公式是 DQN 最重要的公式,DQN 的整个训练过程都是围绕这个公式展开的
    • Est,at,rt+1,st+1\mathbb E_{s_t,a_t,r_{t+1},s_{t+1}} 这个表示对当前从经验池(Agent 之前的状态动作记录)中采样的数据计算二范数损失,经验池中每个样本都是一个四元组 (st,at,rt+1,st+1)(s_t,a_t,r_{t+1},s_{t+1})表示在状态 sts_t 采取动作 ata_t 后获得的即时奖励 rt+1r_{t+1} 和转移到的下一个状态 st+1s_{t+1}
    • yty_t 是目标 Q 值,表示在状态 sts_t 采取动作 ata_t 后的目标 Q 值(也就是训练的 ground truth),当然这个值是由 Bellman Equation 计算得到的,即 yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-),其中:
      • θ\theta^- 是目标网络的参数,是每 T 个时间步更新一次的 DQN 网络参数,用于固定目标 Q
      • rt+1r_{t+1} 是在状态 sts_t 采取动作 ata_t 后获得的即时奖励
      • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • Q(st,at;θ)Q(s_t,a_t;\theta) 是当前 Q 值,表示在状态 sts_t 采取动作 ata_t 后的当前 Q 值,即实时 DQN 网络输出的 Q

4. DQN Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
from tqdm import tqdm
from typing import Tuple
# 定义一个简单的神经网络模型来近似 Q 函数
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_dim)
def forward(self, x):
x = torch.nn.functional.one_hot(
x.to(torch.int64), num_classes=env.observation_space.n
).float()
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x) # 输出每个动作的 Q 值
class DQNAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.memory = deque(maxlen=memory_size)
self.training_error = []
# Q 网络和目标网络
self.q_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network.load_state_dict(self.q_network.state_dict()) # 初始化目标网络
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: Tuple[int, int, bool]) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return self.env.action_space.sample() # 探索:随机选择动作
else:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
return torch.argmax(q_values).item() # 利用:选择 Q 值最大的动作
def update(self):
"""从经验回放中随机抽取一个批次的经验,进行 Q 网络的更新"""
if len(self.memory) < self.batch_size:
return # 如果经验回放池中的样本不足一个批次,则不进行更新
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.int64)
# 计算 Q 值:使用当前网络和目标网络
q_values = self.q_network(states)
next_q_values = self.target_network(next_states)
# 选择下一状态的最大 Q 值
max_next_q_values = next_q_values.max(dim=1)[0]
# 计算目标 Q 值
target_q_values = (
rewards + (1 - dones) * self.discount_factor * max_next_q_values
)
# 计算当前 Q 值和目标 Q 值的损失
q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
loss = nn.functional.mse_loss(q_value, target_q_values)
# 优化 Q 网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.training_error.append(loss.item())
def decay_epsilon(self):
"""逐渐减少 epsilon"""
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def store_experience(self, obs, action, reward, next_obs, done):
"""将经验存储到经验回放池"""
self.memory.append((obs, action, reward, next_obs, done))
def update_target_network(self):
"""每隔一段时间更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
# 超参数
learning_rate = 0.001
n_episodes = 1000 + 2
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)
final_epsilon = 0.1
batch_size = 64
memory_size = 10000
target_update_freq = 10 # 目标网络更新的频率
# 创建环境
env = gym.make("Taxi-v3", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
env,
video_folder="./Taxi_video",
episode_trigger=lambda episode_id: episode_id % 200 == 0,
name_prefix="episode",
)
# 创建智能体
agent = DQNAgent(
env=env,
learning_rate=learning_rate,
initial_epsilon=start_epsilon,
epsilon_decay=epsilon_decay,
final_epsilon=final_epsilon,
batch_size=batch_size,
memory_size=memory_size,
)
# 训练过程
for episode in tqdm(range(n_episodes)):
obs, info = env.reset()
done = False
# 玩一局
while not done:
action = agent.get_action(obs)
next_obs, reward, terminated, truncated, info = env.step(action)
# 存储经验到经验回放池
agent.store_experience(obs, action, reward, next_obs, terminated or truncated)
# 更新 Q 网络
agent.update()
# 如果当前回合结束,则更新状态
obs = next_obs
done = terminated or truncated
# 每隔一定步骤更新目标网络
if episode % target_update_freq == 0:
agent.update_target_network()
# 衰减 epsilon
agent.decay_epsilon()
env.close()
  • Q 网络:
    • 输入:Rbatch×state_dim\mathbb R^{batch\times state\_dim}
    • 输出是:Rbatch×action_dim\mathbb R^{batch\times action\_dim},表示每个动作的 Q
  • DQNϵ\epsilon 的概率随机选择动作,以 1ϵ1-\epsilon 的概率选择具有最大 Q 值的动作,其中 ϵ\epsilon 是一个逐渐减小的值(从 1.0 逐渐降低到 0.1),用于控制探索和利用的平衡,这一点和 Q Learning 算法一样
  • 为了减少 Q 网络的方差,DQN 算法使用了两个网络:Q 网络和目标网络,Q 网络用于计算当前 Q 值,目标网络用于计算目标 Q 值,每隔一定步骤更新目标网络的参数,用于固定目标 Q
  • DQN 的经验回放池使用了一个 deque 队列(训练数据是在队列中随机采样),用于存储每一步的 st,at,rt+1,st+1,dones_t,a_t,r_{t+1},s_{t+1},done 信息,用 deque 的原因有两个:
    1. 先进先出,可以保证使用的经验是不断被更新的
    2. 不会因为存储经验过多而导致内存溢出,增加训练的稳定性

Thoughts

  • DQN 中利用影子网络当做目标网络的思想,启发了后续很多算法的设计,例如 MoCo 系列算法
  • 参数更新的本质原理还是绕不开贝尔曼方程和时序差分的思想,只是在实现上有所不同

TL;DR

  • 本文是一篇关于强化学习算法分类的综述性文章,主要介绍了强化学习算法的分类方法,包括基于模型的分类、基于价值函数的分类、基于策略的分类、基于目标的分类、基于学习方法的分类等

强化学习算法分类

reinforce_learnings.png
这张图表展示了强化学习(RL)算法的分类,分为无模型强化学习(Model-Free RL)和基于模型的强化学习(Model-Based RL)。

1. 无模型强化学习(Model-Free RL

无模型强化学习方法 不依赖于环境的模型,而是通过与环境的交互来学习最优策略。它进一步分为策略优化(Policy Optimization)和 Q 学习(Q-Learning)。

1.1 策略优化(Policy Optimization

策略优化方法通过直接优化策略函数来找到最优策略。常见的策略优化算法包括:

  • Policy Gradient: 通过计算策略梯度来更新策略函数,例如 REINFORCE 算法。
  • Actor-Critic (A2C、A3C): 结合了策略梯度(Policy Gradient)和价值迭代(Value Iteration)的思想。
  • Proximal Policy Optimization (PPO): 一种改进的 Actor-Critic 策略梯度方法,通过引入信任区域来提高训练的稳定性。
  • Trust Region Policy Optimization (TRPO): 另一种策略梯度方法,通过限制策略的更新范围来提高训练的稳定性。

1.2 Q 学习(Q-Learning

Q 学习方法通过学习动作值函数(Q 函数)来找到最优策略。常见的 Q 学习算法包括:

  • Deep Q-Network (DQN): 一种结合了深度学习和 Q 学习的方法,通过使用神经网络来逼近 Q 函数。
  • Double Deep Q-Network (DDQN): 一种改进的 DQN 方法,通过引入双网络结构来减少过估计问题。
  • Twin Delayed Deep Deterministic Policy Gradient (TD3): 一种改进的 DDPG 方法,通过引入延迟和双网络结构来提高训练的稳定性。
  • Soft Actor-Critic (SAC): 一种基于最大熵强化学习的方法,通过引入熵正则化来提高探索能力。

1.3 其他 Q 学习方法

  • Categorical DQN (C51): 一种改进的 DQN 方法,通过将 Q 函数离散化为多个类别来提高训练的稳定性。
  • Quantile Regression DQN (QR-DQN): 一种改进的 DQN 方法,通过引入分位数回归来提高训练的稳定性。
  • Hindsight Experience Replay (HER): 一种改进的 DQN 方法,通过引入后见经验回放来提高探索能力。

2. 基于模型的强化学习(Model-Based RL

基于模型的强化学习方法 依赖于环境的模型,通过使用模型来预测环境的动态性,从而找到最优策略。它进一步分为学习模型(Learn the Model)和给定模型(Given the Model)。

2.1 学习模型(Learn the Model

学习模型方法通过与环境的交互来学习环境的模型,然后使用模型来预测环境的动态性。常见的学习模型算法包括:

  • World Models: 一种基于模型的方法,通过使用神经网络来逼近环境的动态性。
  • Integrated Actionable Beliefs (IA2): 一种基于模型的方法,通过将信念状态和动作空间集成起来来提高训练的稳定性。

2.2 给定模型(Given the Model

给定模型方法 假设环境的模型是已知的,然后使用模型来预测环境的动态性。常见的给定模型算法包括:

  • AlphaZero: 一种基于模型的方法,通过使用蒙特卡洛树搜索和神经网络来逼近环境的动态性。
  • Model-Based Policy Optimization (MBPO): 一种基于模型的方法,通过使用模型来预测环境的动态性,然后使用策略优化方法来找到最优策略。
  • Model-Based Value Expansion (MBVE): 一种基于模型的方法,通过使用模型来预测环境的动态性,然后使用价值迭代方法来找到最优策略。

URL

TL;DR

  • 本文通过一个简单的例子介绍了如何用 Q Learning 算法训练一个 AgentBlackjack 游戏,环境是 OpenAI Gym 提供的 Blackjack-v0
  • Q Learning 算法是一种用于有限离散状态和动作环境下的强化学习算法,通过基于贝尔曼方程的更新策略,不断更新 Q Table 来学习最优策略,最终实现 Agent 的自动决策

Algorithm

q_learning_algo.jpeg

Q Learning 算法

1. 环境介绍

  1. Blackjack 游戏规则
graph TD
    A[开始] --> B[给玩家发两张牌,给庄家发两张牌,一张明牌,一张暗牌]
    
    B --> H{玩家是否有21点?}
    H -->|是| I[玩家有自然牌,检查庄家]
    H -->|否| N{玩家选择是否继续要牌?}
    
    I --> K{庄家是否也有自然牌?}
    K -->|是| L[平局]
    K -->|否| M[玩家胜利]
    
    N -->|要牌| O[发一张牌给玩家]
    N -->|停牌| P[玩家停牌,轮到庄家]
    
    O --> Q{玩家是否爆掉(超过21点)?}
    Q -->|是| R[玩家爆掉,庄家胜利]
    Q -->|否| N
    P --> S{庄家是否点数≥17?}
    S -->|是| T[庄家停牌]
    S -->|否| U[庄家要牌]
    
    U --> V{庄家是否爆掉?}
    V -->|是| M[玩家胜利]
    V -->|否| S
    
    T --> X{比较玩家和庄家的点数}
    X -->|玩家点数更大| M[玩家胜利]
    X -->|庄家点数更大| Z[庄家胜利]
    X -->|点数相同| L[平局]
    R --> AA[游戏结束]
    Z --> AA
    L --> AA
    M --> AA
  1. Blackjack 环境定义:
    • Observation space: Tuple(Discrete(32), Discrete(11), Discrete(2)),用一个三元组表示当前状态,分别是:
      • 玩家的点数
      • 庄家的明牌点数
      • 玩家是否有 Ace
    • Action space: Discrete(2)
      • 0 表示 stick (停牌)
      • 1 表示 hit (要牌)
    • Reward: +1 表示玩家胜利,-1 表示庄家胜利,0 表示平局
    • Starting state:
      • 玩家牌之和为 4 - 21 之间
      • 庄家明牌为 1 - 10 之间
      • 玩家是否有 Ace01
    • Episode End:
      • 玩家停牌
      • 玩家爆牌

2. Q Learning 算法

  1. Q Learning 算法是一种 Off-Policy 的强化学习算法,通过不断更新 Q Table 来学习最优策略
  2. Q Table 是一个 State-Action 表,用于存储每个状态下每个动作的 Q Value,因此 Q Table 的大小是 Observation space length * Action space length
  3. Q Value 的更新公式:

Q(s,a)=Q(s,a)+α(r+γmaxaQ(s,a)Q(s,a))Q(s, a) = Q(s, a) + \alpha \cdot (r + \gamma \cdot \max_{a'} Q(s', a') - Q(s, a))

  • Q(s,a)Q(s, a) 是当前状态 s 下采取动作 aQ Value
  • α\alpha 是学习率
  • rr 是当前状态下采取动作 a 后的奖励
  • γ\gamma 是折扣因子,用于平衡当前奖励和未来奖励,越大表示越重视未来奖励,越小表示越重视当前奖励
  • maxaQ(s,a)\max_{a'} Q(s', a') 是下一个状态 s' 下所有动作中最大的 Q Value

3. 使用 Q Learning 算法训练 AgentBlackjack

实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from matplotlib.patches import Patch
from tqdm import tqdm
from typing import Tuple
import gymnasium as gym
class BlackjackAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
):
"""Initialize a Reinforcement Learning agent with an empty dictionary
of state-action values (q_values), a learning rate and an epsilon.
Args:
learning_rate: The learning rate
initial_epsilon: The initial epsilon value
epsilon_decay: The decay for epsilon
final_epsilon: The final epsilon value
discount_factor: The discount factor for computing the Q-value
"""
self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.training_error = []
def get_action(self, env, obs: Tuple[int, int, bool]) -> int:
"""
Returns the best action with probability (1 - epsilon)
otherwise a random action with probability epsilon to ensure exploration.
"""
# with probability epsilon return a random action to explore the environment
if np.random.random() < self.epsilon:
return env.action_space.sample()
# with probability (1 - epsilon) act greedily (exploit)
else:
return int(np.argmax(self.q_values[obs]))
def update(
self,
obs: Tuple[int, int, bool],
action: int,
reward: float,
terminated: bool,
next_obs: Tuple[int, int, bool],
):
"""Updates the Q-value of an action."""
future_q_value = (not terminated) * np.max(self.q_values[next_obs])
temporal_difference = (
reward + self.discount_factor * future_q_value - self.q_values[obs][action]
)
self.q_values[obs][action] = (
self.q_values[obs][action] + self.lr * temporal_difference
)
self.training_error.append(temporal_difference)
def decay_epsilon(self):
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
env = gym.make("Blackjack-v1", sab=True)
# hyperparameters
learning_rate = 0.01
n_episodes = 100_000
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2) # reduce the exploration over time
final_epsilon = 0.1
agent = BlackjackAgent(
env=env,
learning_rate=learning_rate,
initial_epsilon=start_epsilon,
epsilon_decay=epsilon_decay,
final_epsilon=final_epsilon,
)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)
for episode in tqdm(range(n_episodes)):
obs, info = env.reset()
done = False
# play one episode
while not done:
action = agent.get_action(env, obs)
next_obs, reward, terminated, truncated, info = env.step(action)
# update the agent
agent.update(obs, action, reward, terminated, next_obs)
# update if the environment is done and the current obs
done = terminated or truncated
obs = next_obs
agent.decay_epsilon()
  1. 代码中定义了一个 BlackjackAgent 类,用于实现 Q Learning 算法
  2. 初始化:
    1. 初始化 Q Table 为一个空的字典,Q Value0
    2. 学习率为 0.01
    3. 折扣因子为 0.95
    4. 初始 epsilon1.0,随着训练逐渐减小为 0.1
  3. get_action 方法用于根据当前状态选择动作,以 epsilon 的概率选择随机动作,以 1 - epsilon 的概率选择最优动作
  4. update 方法用于更新 Q Value,根据 Q Learning 公式更新 Q Value

4. 训练结果

q_learning.png
q_learning_1.png
q_learning_2.png

Thoughts

  • Q Learning 算法的关键是更新 Q Table 的过程,原理是贝尔曼方程:Q(s,a)=E[Rt+1+γ maxaQ(st+1,a)St=s,At=a]Q(s,a)=\mathbb E[R_{t+1}+\gamma\ max_{a'}Q(s_{t+1},a')|S_t=s,A_t=a],表示当前状态下采取动作 aQ Value 等于即时奖励加上折扣后未来(下一个时间步)状态的最大 Q Value
  • 对于 State SpaceAction Space 比较大的问题,Q Learning 算法的 Q Table 可能会非常庞大,因此需要使用 Deep Q Learning 算法,用神经网络来近似 Q Value,以减少存储空间,也就是 DQN 算法

URL

TL;DR

  • 本博客是从零开始学习强化学习系列的第一篇,重点在于介绍强化学习的基础概念。主要介绍了 REINFORCE 算法的基本原理,并用 REINFORCE 算法训练 Agent 玩倒立摆游戏
  • REINFORCE 算法是一种基于梯度的策略优化算法,提出时间是 1992 年,算是强化学习的基础算法之一
  • 倒立摆游戏是一个非常简单的强化学习环境,但是可以很好地展示 REINFORCE 算法的效果

Algorithm

1. 强化学习基础

AE_loop_dark.png

  • 强化学习的基本流程如上图所示,主要包括:
    1. Agent:智能体,即我们要训练的模型
    2. Environment:环境,即智能体需要与之交互的环境,比如倒立摆游戏
    3. State:状态,也被称为 Observation (观测) 即环境的状态,比如倒立摆的角度
    4. Action:动作,即智能体在某个状态下可以采取的动作,比如向左或向右
    5. Reward:奖励,即智能体在某个状态下采取某个动作后得到的奖励,比如倒立摆保持平衡时给予正奖励
    6. Policy:策略,是智能体的核心部分,即智能体在某个状态下采取某个动作的概率分布,智能体需要根据策略来选择动作

2. 倒立摆游戏

episode-episode-8000.gif

  • 上图是用强化学习实际学习得到的倒立摆游戏效果,目标推动小车让杆尽可能竖直,这个游戏在 Gymnasium 库中,被定义为:
    1. Observation SpaceBox(-inf, inf, (4,), float64),观测状态用一个长度为 4 的向量表示,每个元素的取值范围为任意实数,其中每个维度数值的含义如下:
      1. 小车的位置
      2. 小车上杆子的垂直角度
      3. 小车的速度
      4. 小车上杆子的角速度
    2. Action SpaceBox(-3.0, 3.0, (1,), float32),动作空间为 [-3, 3] 之间的一个浮点数,表示智能体推小车的力(带方向)
    3. Reward:目标是使倒立摆尽可能长时间直立(在一定角度限制内),因此,当杆直立的每个时间步都会获得 +1 的奖励
    4. Starting State:起始状态为 (0, 0, 0, 0),然后随机施加 [-0.01, 0.01] 的均匀随机噪声
    5. Episode End:一次游戏结束,判定条件为:
      1. Truncation:游戏累积 1000 个时间步
      2. Termination:状态空间中元素出现无穷 或 立杆的垂直角度大于 0.2 弧度(约 11.5 度)

3. REINFORCE算法

  • REINFORCE 算法是一种基于策略梯度的强化学习算法,其核心思想是通过采样得到的轨迹来估计策略梯度,并通过梯度上升的方法来优化策略

3.1 从公式角度讲

  • 具体步骤如下:
    1. 初始化策略:随机初始化策略参数 θ\theta
    2. 采样轨迹:在当前策略 πθ\pi_\theta 下采样一条轨迹(状态、动作、奖励在时间维度上组成的序列) τ=(s0,a0,r1,s1,a1,r2,,sT)\tau = (s_0, a_0, r_1, s_1, a_1, r_2, \ldots, s_T)
    3. 计算累积回报:对于轨迹中的每个时间步 tt,计算从时间步 tt 开始的累积回报 Gt=k=tTγktrkG_t = \sum_{k=t}^T \gamma^{k-t} r_k,其中 γ\gamma 是折扣因子
    4. 计算累积回报期望:计算轨迹中每个时间步的累积回报期望 J(θ)=Eτπθ[t=0Tlogπθ(atst)Gt]J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \log \pi_\theta(a_t | s_t) G_t \right]
    5. 更新策略参数:根据轨迹中的状态、动作和回报,计算策略梯度 θJ(θ)=Eτπθ[t=0Tθlogπθ(atst)Gt]\nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t | s_t) G_t \right],并使用梯度上升法更新策略参数 θθ+αθJ(θ)\theta \leftarrow \theta + \alpha \nabla_\theta J(\theta),其中 α\alpha 是学习率
  • 通过不断重复上述步骤,策略会逐渐优化,使得智能体在环境中的表现越来越好
  • REINFORCE 算法的优点是简单易实现,但缺点是方差较大,收敛速度较慢
  • 代码实现可以参考 Gymnasium 教程

3.2 从实现代码角度讲

  1. 构建 Policy
    1. 构成:Policy 是一个 MLP 网络
    2. 输入:State(一个长度为 4 的浮点数向量)
    3. 输出:两个标量,分别表示正态分布的均值和标准差
  2. 构建 Agent
    1. 构成:一个 Agent 包含一个 Policy 以及对此 Policy 的使用和更新方法
    2. 使用:即如何使用 Agent 根据当前状态选择动作
    3. 更新:即如何根据环境的反馈(奖励)更新 Policy 的参数
  3. 训练 AgentAgentEnv 交互):
    1. 初始化 Agent 和环境
    2. 采样轨迹:在当前策略下采样动作,形成一条轨迹
    3. 计算回报:计算轨迹中每个时间步的回报
    4. 更新策略:根据策略梯度更新策略参数
    5. 重复上述步骤直到策略收敛
      代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import random
import numpy as np
import pandas as pd
import seaborn as sns
from typing import Tuple
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import matplotlib.pyplot as plt
import gymnasium as gym
plt.rcParams["figure.figsize"] = (10, 5)
class Policy_Network(nn.Module):
"""Parametrized Policy Network."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes a neural network that estimates the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
super().__init__()
hidden_space1 = 16 # Nothing special with 16, feel free to change
hidden_space2 = 32 # Nothing special with 32, feel free to change
# Shared Network
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# Policy Mean specific Linear Layer
self.policy_mean_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
# Policy Std Dev specific Linear Layer
self.policy_stddev_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Conditioned on the observation, returns the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
x: Observation from the environment
Returns:
action_means: predicted mean of the normal distribution
action_stddevs: predicted standard deviation of the normal distribution
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
return action_means, action_stddevs
class REINFORCE:
"""REINFORCE algorithm."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes an agent that learns a policy via REINFORCE algorithm [1]
to solve the task at hand (Inverted Pendulum v4).
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.gamma = 0.99 # Discount factor
self.eps = 1e-6 # small number for mathematical stability
self.probs = [] # Stores probability values of the sampled action
self.rewards = [] # Stores the corresponding rewards
self.net = Policy_Network(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray) -> float:
"""Returns an action, conditioned on the policy and observation.
Args:
state: Observation from the environment
Returns:
action: Action to be performed
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs = self.net(state)
# create a normal distribution from the predicted
# mean and standard deviation and sample an action
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
prob = distrib.log_prob(action)
action = action.numpy()
self.probs.append(prob)
return action
def update(self):
"""Updates the policy network's weights."""
running_g = 0
gs = []
# Discounted return (backwards) - [::-1] will return an array in reverse
for R in self.rewards[::-1]:
running_g = R + self.gamma * running_g
gs.insert(0, running_g)
deltas = torch.tensor(gs)
log_probs = torch.stack(self.probs)
# Update the loss with the mean log probability and deltas
# Now, we compute the correct total loss by taking the sum of the element-wise products.
loss = -torch.sum(log_probs * deltas)
# Update the policy network
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Empty / zero out all episode-centric/related variables
self.probs = []
self.rewards = []
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
wrapped_env = gym.wrappers.RecordVideo(
env,
video_folder="./InvertedPendulum_video",
episode_trigger=lambda episode_id: episode_id % 2000 == 0,
name_prefix="episode",
)
# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
agent = REINFORCE(obs_space_dims, action_space_dims)
reward_over_episodes = []
for episode in range(total_num_episodes):
obs, info = wrapped_env.reset()
done = False
while not done:
action = agent.sample_action(obs)
obs, reward, terminated, truncated, info = wrapped_env.step(action)
agent.rewards.append(reward)
done = terminated or truncated
reward_over_episodes.append(wrapped_env.return_queue[-1])
agent.update() # 每完成一次轨迹才会更新一次策略

重点代码分析:

  1. Policy 预测采样动作的均值和标准差:
    1
    2
    3
    4
    5
    shared_features = self.shared_net(x.float())
    action_means = self.policy_mean_net(shared_features) # 直接预测采样动作的均值
    action_stddevs = torch.log(
    1 + torch.exp(self.policy_stddev_net(shared_features))
    ) # 预测采样动作的标准差,保证标准差为正
  2. 采样动作:
    1
    2
    3
    distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)  # 根据 Policy 预测的均值和标准差构建正态分布
    action = distrib.sample() # 从正态分布中采样动作
    prob = distrib.log_prob(action) # 同时计算采样动作的概率,用于后续计算策略梯度来更新策略
  3. 更新策略:
    1
    2
    3
    4
    5
    6
    7
    8
    running_g = 0
    gs = []
    for R in self.rewards[::-1]:
    running_g = R + self.gamma * running_g
    gs.insert(0, running_g)
    deltas = torch.tensor(gs) # 计算折扣累积回报
    log_probs = torch.stack(self.probs)
    loss = -torch.sum(log_probs * deltas) # 根据策略累积折扣回报和策略概率计算期望策略累积折扣期望,目标是最大化期望

最终效果:
reinforce_learning_v2.png

可以看出,Agent 在训练过程中逐渐学会了如何控制小车,使得倒立摆尽可能直立,训练 5000 步就可以将倒立摆稳定保持 200 时间步

4. 思考和尝试

  • 由于长期做有监督深度学习项目,所以会思考:如果使用深度有监督学习模型来解决倒立摆问题,会有什么不同?
  • 但直接使用深度有监督学习模型来解决倒立摆问题是不现实的,因为不管是用奖励计算损失还是用观测状态计算损失,都无法通过梯度反向传播来优化模型,因为环境并不可微
  • 环境不可微 是强化学习和深度学习的根本区别之一,那么如何解决 “深度有监督学习无法解决倒立摆问题” 呢?
  • 一个简单有效的方法是使用两个阶段的模型:
    1. 第一阶段:训练一个深度有监督学习模型,作为环境仿真器
      1. 输入:状态(观测)+ 随机动作
      2. 输出:预测的下一个状态 + 预测的奖励
      3. 监督:真实环境下,输入随机动作后的新状态和奖励
    2. 第二阶段:训练一个深度有监督学习模型,作为智能体
      1. 输入:状态(观测)
      2. 输出:动作
      3. 监督:环境仿真器(冻结)预测的下一个状态和奖励(目标是奖励尽可能高 且 立杆尽可能竖直 且 小车速度尽可能小 且 立杆线速度尽可能小)
  • 通过两个阶段的模型训练,可以将环境的不可微性质转化为可微性质,从而使用深度有监督学习模型来解决倒立摆问题
  • 实现代码:
  1. 训练环境仿真器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import random
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
class EnvPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims + action_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, obs_space_dims + reward_space_dims),
)
def forward(self, x: torch.Tensor):
return self.net(x.float())
class SupervisedAgent:
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.net = EnvPredNet(obs_space_dims, action_space_dims, reward_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
self.loss_fn = nn.MSELoss(reduce="sum")
def sample_action(self) -> float:
random_action = torch.clamp(torch.randn(1), -3, 3)
return random_action
def pred_state(self, state, action):
state = torch.tensor(np.array([state]))
state_action = torch.cat((state, action.unsqueeze(0)), dim=1)
next_state = self.net(state_action)
return next_state
def update(self, pred_state, gt_state, reward):
loss = self.loss_fn(
pred_state, torch.tensor([[*gt_state, reward]], dtype=torch.float32)
)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e4)
obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
reward_space_dims = 1
agent = SupervisedAgent(obs_space_dims, action_space_dims, reward_space_dims)
agent.net.train()
for episode in range(total_num_episodes):
state, info = wrapped_env.reset()
done = False
while not done:
action = agent.sample_action()
gt_next_state, reward, terminated, truncated, info = wrapped_env.step(action)
pred_state = agent.pred_state(state, action)
loss = agent.update(pred_state, gt_next_state, reward)
state = gt_next_state
done = terminated or truncated
print(f"Episode: {episode}, Loss: {loss}")
torch.save(agent.net.state_dict(), "env_predict_model.pth")
  1. 训练智能体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import random
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
class EnvPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims + action_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, obs_space_dims + reward_space_dims),
)
def forward(self, x: torch.Tensor):
return self.net(x.float())
class ActionPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, action_space_dims),
nn.Tanh(),
)
def forward(self, x: torch.Tensor):
action = self.net(x.float()) * 3
return action
class SupervisedAgent:
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
self.env_pred_net = EnvPredNet(obs_space_dims, action_space_dims, reward_space_dims)
self.action_pred_net = ActionPredNet(obs_space_dims, action_space_dims)
self.env_pred_net.load_state_dict(torch.load("env_predict_model.pth"))
self.env_pred_net.eval()
self.action_pred_net.train()
self.learning_rate = 1e-4
self.optimizer = torch.optim.AdamW(
self.action_pred_net.parameters(), lr=self.learning_rate
)
def get_action(self, state) -> float:
action = self.action_pred_net(state)
return action
def pred_state_reward(self, state, action):
state_action = torch.cat([torch.tensor([state]), action], dim=1)
return self.env_pred_net(state_action)
def update(self, pred_state_reward, state):
loss = (
pred_state_reward[0, 1].abs() * 10 # 立杆尽可能竖直
+ pred_state_reward[0, 2:4].abs().sum() # 小车速度和立杆角速度尽可能小
+ (pred_state_reward[0, 0] - state[0]).abs() * 0.1 # 小车位置尽可能不变
- pred_state_reward[0, 4].abs() # 奖励尽可能高
)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e4) # Total number of episodes
obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
reward_space_dims = 1
agent = SupervisedAgent(obs_space_dims, action_space_dims, reward_space_dims)
for episode in range(total_num_episodes):
state, info = wrapped_env.reset()
done = False
reward_sum = 0
while not done:
action = agent.get_action(torch.tensor([state]))
gt_next_state, reward, terminated, truncated, info = wrapped_env.step(
action.detach().numpy()[0]
)
pred_state_reward = agent.pred_state_reward(state, action)
loss = agent.update(pred_state_reward, state)
state = gt_next_state
done = terminated or truncated
reward_sum += reward
print(f"Episode: {episode}, Reward: {reward_sum}, Loss: {loss}")
torch.save(agent.action_pred_net.state_dict(), "action_predict_model.pth")

5. 总结

  • 通过结合 REINFORCE 算法和倒立摆任务,本文展示了强化学习的基本原理和具体实现。
  • 同时,提出了针对环境不可微问题的创新方法,即通过建立环境仿真器来将不可微问题转化为可微问题,从而使得深度学习能够在强化学习任务中发挥作用。
  • 有监督学习和强化学习的对比:
    • 二阶段有监督学习适合在已知环境模型的基础上,快速训练并优化策略,特别是离线学习和仿真场景。
    • 强化学习则适用于更为复杂、不确定的环境,尤其是无法精确建模的动态场景,并且可以在实时交互中自我改进。

关键问题列表(以 Qwen2.5-0.5B-prefill 为例)

  1. pulsar2 llm_build 模式和标准编译模式的区别?共用了什么部分?
    1. 标准编译模式有前端,llm_build 模式没有前端
    2. llm_build 量化不共用标准编译,weight 8 bit minmax / decoder layer 之间用 bf16 浮点 / 层内动态量化
  2. ax-llm-build / ax-llm 两个 repo 的关系?他们用来做什么?
    1. ax-llm-build 给上板准备除了模型之外的文件,例如 embed / 数据类型转换等
    2. ax-llm 输入编译后模型文件,输出是板上可运行的 chatbot
  3. 模型输出文件的含义?l%d.axmodel 表示层,qwen_post.axmodel 表示 hidden size -> vocab size 的 fc 吗?model.embed_tokens.weight.bfloat16.bin 是 token embedding 吗?
    1. token embedding / post.axmodel 都用 fc / 查表去做
  4. 为什么分层部署?动态量化发生在 decode layer 层之间吗?在 ppl 里用 cpu 动态量化 activation 吗?decoder layer 中有没有用到动态量化?
    1. teng 可以做动态量化,不需要用 cpu
  5. decode layer 用 flash attention 了吗?v2 / v3 ?
    1. 没有,用了标准 attention,softmax 单独计算
  6. 用 kv cache 了吗?如何用?DRAM 够用吗?
    1. 用了,DRAM 够用
  7. GQA 用了吗?如何用?repeat 再做 self-attention?
    1. 有实际节约效率
  8. 有没有用到 llm 专用的量化算法?
    1. 没有
  9. decode method 在 ppl 中用 cpu 实现吗?可以放到 npu 上吗?
    1. 用 cpu 实现,目前直接用 argmax 了
    2. 用 npu 实现的话可以用 top-k
  10. continue batching 用到了吗?(单 batch 似乎不用考虑这个问题,假如 ax650 作为云端芯片就需要考虑了)
    1. 没有
  11. paged attention 用到了吗?(似乎用内存分页管理就够了?)
    1. 没有
  12. 有哪部分是用浮点运行的?哪部分用定点运行的?
    1. 除了 conv 相关的都用浮点,例如 softmax
  13. 对于长上下文如何处理?qwen2 似乎禁用了 sliding_window,那么如何处理长上下文?计算效率如何?
    1. 最长测试过 1024
  14. ssm 状态空间模型支持吗?例如 mamba
    1. 没有
  15. 假如有一个 sparse attention 模型,会快吗?还是要补成 dense 再算?
    1. 没有专用加速算子,可能不会太快
  16. 可能考虑投机解码这种相对更高端的加速技术吗?例如 SpecInfer 这种
    1. 没有

0. 背景

  • 本文主要介绍 hugging face 托管的 LLM 如何下载,并用 transformers repo 推理

1. 相关工具介绍

  1. hugging face 是一个托管 LLM 的网站,类似 github / dockerhub,可以上传下载大模型,也可以在线推理等
  2. transformers 是一个 hugging face 维护的开源大模型代码库,包含约 300 个开源大模型的网络结构,可以配合 hugging face 下载的大模型,实现一键推理

2. 模型文件下载

2.1 下载工具

  1. hugging face 有专用工具 huggingface-cli,作用类似于低配版 git,可以实现远程 hugging face repo 的登录、登出、上传、下载等
  2. 对于 Qwen2-0.5B-Instruct 模型,huggingface-cli 可直接免登录下载,而例如 Llama3.2-1B 等模型需要向 meta 提交申请并获得同意后下载

2.2 下载文件内容

  1. hugging face 模型文件格式较为固定,一般为:
    1. config.json
      1. 这个文件包含了模型的配置信息,比如模型的架构参数、词表大小、激活函数类型等
      2. 它通常用于在加载模型时,确保模型的配置与原始模型一致
    2. generation_config.json
      1. 这个文件包含了生成文本时的配置信息,比如最大长度、停止序列、解码相关配置(温度、top-ktop-p、惩罚系数)等,以及依赖的 transformers 版本
      2. 这些参数影响模型生成文本的行为
    3. LICENSE
      1. 这个文件包含了模型的许可证信息,说明了你可以如何使用这个模型,以及使用时需要遵守的法律条款
    4. merges.txt
      1. 这个文件通常用于字节对编码(Byte Pair Encoding, BPE)分词器,它包含了词汇的合并规则
      2. BPE 是一种用于创建词汇表和编码文本的算法
    5. model.safetensors
      1. 这是一个保存模型权重的文件,使用了 SafeTensors 格式
      2. 这是一种高效的文件格式,用于存储和加载深度学习模型的权重
    6. README.md
      1. 这个文件包含了模型的说明文档,通常包括模型的简介、如何使用模型、训练细节、性能指标等信息
    7. tokenizer_config.json
      1. 这个文件包含了分词器的配置信息,比如分词器的类型、是否使用 BPE
    8. tokenizer.json
      1. 这个文件包含了分词器的预训练信息,比如词汇表、特殊标记等
      2. 它用于将文本转换为模型可以理解的数值输入
    9. vocab.json
      1. 这个文件包含了模型的词汇表,即模型在训练时使用的所有单词或标记的列表
      2. 分词器使用这个词汇表将文本分割成模型可以理解的输入
  2. Qwen2-0.5B-Instruct 中的 "Instruct" 是指此模型是经过指令遵循微调后的模型

2.3 模型参数解析

  1. 下载的文件中 model.safetensors 就是模型参数
  2. safetensors 是由 Hugging Face 开发的一种新型文件格式,专门用于安全地存储和加载机器学习模型的权重,这种格式特别关注模型的安全性、隐私保护和快速加载
    1. 安全性:safetensors 格式的设计目标之一是提高模型存储的安全性,它不允许执行代码,从而减少了模型文件被恶意篡改的风险
    2. 快速加载:与传统的 PyTorch 序列化格式(如 .pth.bin)相比,safetensors 可以更快地加载模型权重,尤其是在 CPUGPU
    3. 跨语言和跨框架兼容性:safetensors 支持在不同的编程语言和机器学习框架之间共享模型权重,例如可以在 Python 中序列化并在 C++ / Java / JavaScript 中加载
    4. 懒加载:支持懒加载,即可以选择只读取文件中的一部分张量,这对于分布式设置中的多节点或 GPU 非常有用
  3. 如何解析一个 model.safetensors 文件?
1
2
3
4
5
6
from safetensors import safe_open
tensors = {}
# framework="pt" means pytorch
with safe_open("model.safetensors", framework="pt", device="cpu") as f:
for key in f.keys():
tensors[key] = f.get_tensor(key)

3. 构建模型并加载预训练参数

1
2
3
4
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
model_path, torch_dtype="auto", device_map="auto"
)
  • hugging facetransformers 库提供了一个 AutoModelForCausalLM 类,可以根据模型路径,读取配置文件并自动加载模型

4. 输入构建

输入构建分成三个部分:

  1. 初始化 tokenizer
  2. prompt 指令化
  3. text to token idtokenize

4.1 初始化 tokenizer

1
2
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path)
  • transformers 会自动读取 tokenizer 相关配置文件,创建对应的 tokenizer 对象

4.2 prompt 指令化

1
2
3
4
5
6
7
8
9
10
11
12
13
prompt = "艾萨克牛顿是谁?"
messages = [
{
"role": "system",
"content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.",
},
{"role": "user", "content": prompt},
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
  • 上面的 messageQwen2 指令微调后模型的输入格式,包含了 rolecontent 两个字段
  • 具体来说需要两个步骤:
    1. prompt to message:将 prompt 转换为 message 格式
    2. message to chat:在 message 中插入特殊字符,形成 chat 格式,特殊字符主要包括:
      1. <|im_start|>:表示对话开始
      2. <|im_end|>:表示对话结束
      3. \n:间隔 rolecontent
  • 最终生成的 text 实际内容为:
1
<|im_start|>system\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\n<|im_start|>user\n艾萨克牛顿是谁?<|im_end|>\n<|im_start|>assistant
  • 一些额外知识:
    1. prompt 输入模板信息在 tokenizer_config.json 中被定义过,keychat_template
      1
      {% for message in messages %}{% if loop.first and messages[0]['role'] != 'system' %}{{ '<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n' }}{% endif %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}
    2. 模板用到的代码是 Jinja2 写的,Jinja2 是一种模板引擎,tokenizer 会根据这个模板生成 chat 格式的 text
    3. 代码中的 messagesadd_generation_prompt 是模板的输入参数

4.3 tokenize(text to token id)

1
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
  • 这个过程就是 tokenize,将 text 转换为 token id
    • 首先根据 tokenizer.json 中声明的 token encode 方法(例如 BPE),将 text 分词
    • 然后根据 vocab.json 中的词汇表,将分词后的词转换为 token id(查表)
    • 最后将 token id 转换为 tensor 格式(tensor of uint64 dtype

5. 模型前向传播

1
2
3
4
generated_ids = model.generate(
**model_inputs,
max_new_tokens=512,
)
  • generate 方法是 transformers 提供的一个高级封装方法,可以实现 LLM 的推理
    • max_new_tokens 参数表示最大生成的 token 数量
    • generate 方法会自动迭代调用模型的前向传播方法和解码方法,实现文本生成
  • 本章节的重点关注 generate 的模型推理过程,可以分成:
  1. token id to token embedding
  2. position emebedding 构造
  3. attention mask 构造
  4. decoder layer forward

5.1 token id to token embedding

  • nn.Embedding 本质就是查表,将 token id 转换为 token embedding

5.2 position emebedding 构造

  • Qwen2 模型使用的 position embedding 方法是 RoPE (rotary position embedding)

5.2.1 RoPE 介绍

RoPE 的核心思想是将 query embeddingkey embedding 旋转一定角度,让模型感知到序列中的位置信息。

  • RoPE 的输入是:
    1. position id(例如:[0, 1, 2, ..., seq_len - 1]
    2. query embedding (shape = [seq_len, head_dim])
    3. key embedding (shape = [seq_len, head_dim])
  • RoPE 的输出是:
    1. 经过 RoPE 处理后的 query embedding
    2. 经过 RoPE 处理后的 key embedding
  • RoPE 的计算过程是:
    1. 构造 cosRseq_len×head_dim\cos\in\mathbb{R}^{seq\_len\times head\_dim}sinRseq_len×head_dim\sin\in\mathbb{R}^{seq\_len\times head\_dim} 两个矩阵
    2. 用两个矩阵去旋转 query embeddingkey embedding
  • RoPE 的实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
# 构造 RoPE 旋转矩阵
inv_freq = 1.0 / (1000000 ** (torch.arange(0, dim, 2, dtype=torch.int64).float().to(device) / dim)) # 1. 计算频率的倒数,shape = [dim // 2]
freq = inv_freq_expanded.float() @ position_ids_expanded.float() # 2. 计算频率,shape = [seq_len, dim // 2]
emb = torch.cat((freqs, freqs), dim=-1) # 3. 构造 RoPE 矩阵,shape = [seq_len, dim]
cos = emb.cos()[None, None, ...] # 4. 计算 RoPE 的 cos,扩展到多 batch 和多头,shape = [1, 1, seq_len, dim]
sin = emb.sin()[None, None, ...] # 5. 计算 RoPE 的 sin,扩展到多 batch 和多头,shape = [1, 1, seq_len, dim]
# 旋转 query embedding 和 key embedding
def rotate_half(x):
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
q = (q * cos) + (rotate_half(q) * sin) # 6. 旋转 query embedding,shape = [batch_size, num_heads, seq_len, head_dim]
k = (k * cos) + (rotate_half(k) * sin) # 7. 旋转 key embedding,shape = [batch_size, num_heads, seq_len, head_dim]
  • 以上 RoPE 的公式解释:
    • xm,i=xm,icos(mθi)xm,i+d/2sin(mθi)x'_{m,i}=x_{m,i}\cdot \cos (m\theta_i)-x_{m,i+d/2}\cdot \sin (m\theta_i)
    • xm,i+d/2=xm,icos(mθi)+xm,i+d/2sin(mθi)x'_{m,i+d/2}=x_{m,i}\cdot \cos (m\theta_i)+x_{m,i+d/2}\cdot \sin (m\theta_i)
    • 其中:
      • xm,ix_{m,i}query embeddingkey embedding 的第 mm 个位置的第 ii 个维度的值
      • θi=10000002id\theta_i=1000000^{-\frac{2i}{d}}
      • mθi=m10000002idm\theta_i=m\cdot 1000000^{-\frac{2i}{d}}mm 是位置 id
      • ddhead_dim
  • Qwen 模型中的 RoPE 实现代码是 RoPE 的一个简化版本,主要是为了提高计算效率,官方的 RoPE 计算公式如下:
    • xm,2i=xm,2icos(mθi)xm,2i+1sin(mθi)x'_{m,2i}=x_{m,2i}\cdot \cos (m\theta_i)-x_{m,2i+1}\cdot \sin (m\theta_i)
    • xm,2i+1=xm,2icos(mθi)+xm,2i+1sin(mθi)x'_{m,2i+1}=x_{m,2i}\cdot \cos (m\theta_i)+x_{m,2i+1}\cdot \sin (m\theta_i)
    • RdΘ,mx=[x1x2x3x4xd1xd][cos(mθ1)cos(mθ1)cos(mθ2)cos(mθ2)cos(mθd/2)cos(mθd/2)]+[x2x1x4x3xdxd1][sin(mθ1)sin(mθ1)sin(mθ2)sin(mθ2)sin(mθd/2)sin(mθd/2)]\mathbf{R}_{d\Theta,m}\mathbf{x} = \begin{bmatrix} \mathbf{x}_1 \\ \mathbf{x}_2 \\ \mathbf{x}_3 \\ \mathbf{x}_4 \\ \vdots \\ \mathbf{x}_{d-1} \\ \mathbf{x}_d \end{bmatrix} \otimes \begin{bmatrix} \cos(m\theta_1) \\ \cos(m\theta_1) \\ \cos(m\theta_2) \\ \cos(m\theta_2) \\ \cdots \\ \cos(m\theta_{d/2}) \\ \cos(m\theta_{d/2}) \end{bmatrix} + \begin{bmatrix} -\mathbf{x}_2 \\ \mathbf{x}_1 \\ -\mathbf{x}_4 \\ \mathbf{x}_3 \\ \vdots \\ -\mathbf{x}_d \\ \mathbf{x}_{d-1} \end{bmatrix} \otimes \begin{bmatrix} \sin(m\theta_1) \\ \sin(m\theta_1) \\ \sin(m\theta_2) \\ \sin(m\theta_2) \\ \cdots \\ \sin(m\theta_{d/2}) \\ \sin(m\theta_{d/2}) \end{bmatrix}
    • 区别在于:
      • 官方的 RoPE 是同一个位置的相邻维度(2i 和 2i+1)之间旋转
      • QwenRoPE 是同一个位置的跳跃维度(i 和 i+d/2)之间旋转

5.3 attention mask 构造

  • 实现中,attention mask 实际上并不需要构造,因为 torch.nn.functional.scaled_dot_product_attentionis_causal 参数可以自动实现 attention mask
  • 具体来说:
    1. torch.nn.functional.scaled_dot_product_attentionis_causal == True 时,会自动构造一个 attention biasshape = [query_len, key_len],其中 bias[i, j] = -inf if i > j else 0
    2. query @ key.transpose(-2, -1) 得到 attention score 后,会自动加上这个 attention bias
    3. 然后再经过 softmax,这样就实现了 causal attention
  • 而且 causal attentionGPT-like LLM 中,只在 prefill 阶段使用,generate 阶段不使用(或者说隐式使用)

5.4 decoder layer forward

  • LLM 的计算量几乎都体现在 decoder layer forward
  • layer norm 位置角度来看,Qwen2 模型属于 pre-LM,即:
    1. x=x+MHA(LayerNorm(x))x=x+MHA(LayerNorm(x))
    2. x=x+FFN(LayerNorm(x))x=x+FFN(LayerNorm(x))
    3. x=LayerNorm(x)x = LayerNorm(x)
  • 模型包含 24decoder layer,每层包含 2 个子层:
    1. multi-head self-attentionMHA
    2. feed forward networkFFN
  • 为了减小计算量和参数量,Qwen2 模型使用了 GQA (grouped query attention) 方法:
    1. 对于 query 来讲,每层 MHA 包含 14 个头,每个头的 head_dim = 64
    2. 对于 keyvalue 来讲,每层 MHA 包含 2 个头,每个头的 head_dim = 64
    3. 计算时,需要将 keyvaluehead 通过 repeat 扩展到 14 个头
  • 同时,Qwen2 使用了 KV cache 算法,即:
    1. prefill 阶段,keyvalue 会被缓存下来
    2. generate 阶段,之前的 keyvalue 会被直接使用不再计算,只计算最后一个 tokenquery / key / value
  • prefill 阶段:
    • 输入为 hidden stateshape = [1, 36, 896],其中 36seq_len896hidden_dim
    • 输入逐层经过 decoder layer,最后输出 hidden state,和输入 shape 一样(shape = [1, 36, 896]
    • 在这一阶段,每一层的每个头的每个位置(token position)的 keyvalue 都会被缓存下来,shape = [2, 1, 24, 2, 36, 64],其中:
      • 2key / value
      • 1batch_size
      • 24layers
      • 36seq_len
      • 2key_value_head_num
      • 64head_dim
    • 每个 decoder layer 层的 MHA 都需要 causal attention,即 is_causal = True
    • 输出的 hidden state 最后一个 token positionfeature (shape = [1, 896]) 会被输入到 fc 层,输出 shape = [1, 151936]151936vocab_size
    • 然后通过 decode method 得到最终的 token id
  • generate 阶段:
    • 输入为 预填充阶段最终输出的 token id 对应的 token embeddingshape = [1, 1, 896]
    • 输入逐层经过 decoder layer,最后输出 hidden state,和输入 shape 一样(shape = [1, 1, 896]
    • 在这一阶段,每一层的每个头的之前的位置(token position)的 keyvalue 都会被直接使用,不再计算;只计算最后一个 tokenquery / key / value,并将计算得到的 key / value 缓存下来,缓存的 key / value 会拼接到之前的 key / value 上,形成新的 key / valueshape = [2, 1, 24, 14, 36 + 1, 64]
    • 每个 decoder layer 层的 MHA 的输入 shape 如下:
      • query: shape = [1, 14, 1, 64]
      • key / value: shape = [1, 14, 37, 64]
      • is_causal = False,因为 query 长度为 1,不需要 causal attention
    • 最后一层 decoder layer 的输出经过 fcdecode method 得到最终的 token id
    • 重复直到生成的 token id 数量达到 max_new_tokens 或生成的 token ideos 为止

6. decode method

  • decode method 实际上属于模型 forward 过程中的一部分,但是为了方便理解,这里单独拎出来讲
  • decode method 的作用是将 vocab logits 映射到 token id
  • 最简答的 decode methodargmax 方法,即取 vocab logits 中最大的值对应的 token id,但是这种方法会导致生成的文本过于单一
  • 实际上 Qwen2 使用了五个串联的 decode method,分别是:
    • RepetitionPenaltyLogitsProcessor
    • TemperatureLogitsWarper
    • TopKLogitsWarper
    • TopPLogitsWarper
    • SoftmaxSampler

6.1 RepetitionPenaltyLogitsProcessor

RepetitionPenaltyLogitsProcessor 的作用是惩罚重复的 token id,即:

  1. 在预测的 vocab logits 中,找到之前所有 token id 对应的 logits
  2. 如果 logits > 0,则将 logits 除以一个 penaltypenalty 的值取 1.1
  3. 如果 logits < 0,则将 logits 乘以一个 penaltypenalty 的值取 1.1
  4. 目的是让模型更倾向于生成不重复的 token id

6.2 TemperatureLogitsWarper

TemperatureLogitsWarper 的作用是调整 vocab logits 的温度,即:

  1. vocab logits 除以一个 temperaturetemperature 的值取 0.7
  2. 目的是提高模型的生成多样性

6.3 TopKLogitsWarper

TopKLogitsWarper 的作用是截断 vocab logits,即:

  1. 保留 vocab logits 中最大的 k 个值,其他值置为 -infk 的值取 20
  2. 目的是降低模型的生成多样性,提高生成的准确性

6.4 TopPLogitsWarper

TopPLogitsWarper 的作用是截断 vocab probs,即:

  1. vocab logit 通过 softmax 变成 vocab probs
  2. vocab probs 从大到小排序,累加到 p 大于 top-p 为止,保留这些 vocab logits,其他值置为 -inf
  3. top-p 的值取 0.8,最终至少需要保留一个 token id
  4. 目的是降低模型的生成多样性,提高生成的准确性

6.5 SoftmaxSampler

  1. vocab logits (此时只有少量元素不是 -inf) 通过 softmax 变成 vocab probs
  2. 根据 vocab probs 采样一个 token id,作为最终的输出

7. 总结

  • 本文主要介绍了 hugging facetransformers 的使用方法,以及 Qwen2-0.5B-Instruct 模型的推理过程
  • GPT-like 模型的推理过程主要包括 tokenizeRoPEattention maskdecoder layer forwarddecode method 等步骤
  • Qwen2 模型使用了 RoPEGQAKV cache 等技术,提高了模型的计算效率和参数量
  • decode method 使用了 RepetitionPenaltyLogitsProcessorTemperatureLogitsWarperTopKLogitsWarperTopPLogitsWarperSoftmaxSampler 等方法,提高了模型的生成多样性和准确性