Zhangzhe's Blog

The projection of my life.

0%

URL

TL;DR

  • 本文介绍了如何使用 DQN (Deep Q Network) 算法训练 AgentTaxi 游戏,重点介绍了 DQN 算法的原理和实现细节。
  • DQN 算法是一种结合了深度学习和 Q 学习的方法,通过使用神经网络来逼近 Q 函数,和 Q Learning 算法类似,只能用于解决离散状态动作下的强化学习问题(例如 Taxi 游戏),并用 Experience Replay 机制解决了和环境交互的数据效率问题。

Algorithm

0. Taxi Game

  • Taxi 游戏是一个强化学习环境,状态空间和动作空间都是离散的,适合用于测试 DQN 等离散状态动作的强化学习算法
    • 状态空间:500500 种状态,表示 Taxi 的位置、乘客的位置和目的地的位置
    • 动作空间:66 个动作,分别是:
      • 0: Move south (down)
      • 1: Move north (up)
      • 2: Move east (right)
      • 3: Move west (left)
      • 4: Pickup passenger
      • 5: Drop off passenger
    • 奖励机制:
      • +20: 送达乘客
      • -10: 乘客在错误位置上车或下车
      • -1: 每一步的惩罚
    • 结束条件:
      • 送达乘客
      • 达到 200 时间步
  • 训练 200episode 后的效果如下:
    episode-episode-200.gif
  • 训练 1000episode 后的效果如下:
    episode-episode-1000.gif

1. Q 函数

  • 价值函数:常用的价值函数有两种,分别是 Q 函数和 V 函数,这里的价值是 累积奖励的期望值(包含折扣),公式表示:E[t=0γtrt]\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t]
  • Q 函数:状态动作值函数,用于评估在状态 s 下采取动作 a 的价值,即 Q(s, a) 表示在状态 s 下采取动作 a 的价值,Qπ(s,a)=E[t=0γtrts0=s,a0=a,π]Q^{\pi}(s,a)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,a_0=a,\pi],其中:
    • E\mathbb E 表示期望操作,即求期望值
    • rtr_t 是在时间步 tt 收到的即时奖励
    • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • π\pi 是智能体的策略,决定了在每个状态下选择什么动作
  • V 函数:状态值函数,用于评估在状态 s 下的价值,Vπ(s)=E[t=0γtrts0=s,π]V^\pi(s)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,\pi]

特性 V 函数 (状态价值函数) Q 函数 (状态-动作价值函数)
定义 评估状态 s 的价值,即在状态 s 开始,按照策略 π 执行的期望累积奖励 评估状态 s 和动作 a 的价值,即在状态 s 选择动作 a,然后按照策略 π 执行的期望累积奖励
输入 状态 s 状态 s 和动作 a
输出 状态的价值,即期望的回报 状态-动作对的价值,即从状态 s 执行动作 a 后的期望回报
更新方式 V(s)V(s) 通过 Q(s,a)Q(s,a) 来更新,通常用贝尔曼方程 Q(s,a)Q(s,a) 直接通过贝尔曼方程更新,通常用于 Q-learningDQN
公式 Vπ(s)=E[t=0γtrts0=s,π]V^\pi(s)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,\pi] Qπ(s,a)=E[t=0γtrts0=s,a0=a,π]Q^\pi(s,a)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,a_0=a,\pi]
主要用途 用于评估某一状态的好坏,特别适合评估策略 用于评估某一状态下选择特定动作的好坏,通常用于 Q-learning 等算法

2. Q Learning

  • 决策:维护一个 Q TableRnum of state×num of actionQ\ Table\in\mathbb R^{num\ of\ state\times num\ of\ action},用于存储每个状态动作对的价值,对于每一个状态 s,以 1ϵ1-\epsilon 的概率选择具有最大 Q 值的动作 a,以 ϵ\epsilon 的概率随机选择动作 a,即

π(s)={random(a), with probabilityϵargmaxaQ(s,a), with probability1ϵ\pi(s)=\left\{ \begin{array}{rcl} random(a) & \text{, with probability} & \epsilon\\ \arg\max_aQ(s,a) & \text{, with probability} & 1-\epsilon \end{array} \right.

  • 更新:使用 Q Learning 算法更新 Q 函数,即 Q(s,a)Q(s,a)+α[r+γmaxaQ(s,a)Q(s,a)]Q(s,a)\leftarrow Q(s,a)+\alpha[r+\gamma\max_{a'}Q(s',a')-Q(s,a)],其中:
    • α\alpha 是学习率(0<α10<\alpha\leq1),用于控制更新的步长
    • rr 是在状态 s 采取动作 a 后获得的即时奖励
    • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • ss' 是在状态 s 采取动作 a 后转移到的下一个状态

3. DQN Algorithm

  • Q Learning 一个问题是:在较大的状态空间和动作空间下,使用 Q Table 存储所有状态动作对的价值是不现实的
  • 因此 DQN 算法使用神经网络来逼近 Q 函数,即 Q(s,a;θ)Q(s,a)Q(s,a;\theta)\approx Q^*(s,a),其中 θ\theta 是神经网络的参数,用于逼近 Q 函数
  • DQN 算法的目标是最小化 Q 函数的均方误差,即:

    Loss(θ)=Est,at,rt+1,st+1[(ytQ(st,at;θ))2]yt=rt+1+γmaxaQ(st+1,a;θ)Loss(\theta)=\mathbb E_{s_t,a_t,r_{t+1},s_{t+1}}[(y_t-Q(s_t,a_t;\theta))^2]\\ y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-)

    • 此公式是 DQN 最重要的公式,DQN 的整个训练过程都是围绕这个公式展开的
    • Est,at,rt+1,st+1\mathbb E_{s_t,a_t,r_{t+1},s_{t+1}} 这个表示对当前从经验池(Agent 之前的状态动作记录)中采样的数据计算二范数损失,经验池中每个样本都是一个四元组 (st,at,rt+1,st+1)(s_t,a_t,r_{t+1},s_{t+1})表示在状态 sts_t 采取动作 ata_t 后获得的即时奖励 rt+1r_{t+1} 和转移到的下一个状态 st+1s_{t+1}
    • yty_t 是目标 Q 值,表示在状态 sts_t 采取动作 ata_t 后的目标 Q 值(也就是训练的 ground truth),当然这个值是由 Bellman Equation 计算得到的,即 yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-),其中:
      • θ\theta^- 是目标网络的参数,是每 T 个时间步更新一次的 DQN 网络参数,用于固定目标 Q
      • rt+1r_{t+1} 是在状态 sts_t 采取动作 ata_t 后获得的即时奖励
      • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • Q(st,at;θ)Q(s_t,a_t;\theta) 是当前 Q 值,表示在状态 sts_t 采取动作 ata_t 后的当前 Q 值,即实时 DQN 网络输出的 Q

4. DQN Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
from tqdm import tqdm
from typing import Tuple
# 定义一个简单的神经网络模型来近似 Q 函数
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_dim)
def forward(self, x):
x = torch.nn.functional.one_hot(
x.to(torch.int64), num_classes=env.observation_space.n
).float()
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x) # 输出每个动作的 Q 值
class DQNAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.memory = deque(maxlen=memory_size)
self.training_error = []
# Q 网络和目标网络
self.q_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network.load_state_dict(self.q_network.state_dict()) # 初始化目标网络
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: Tuple[int, int, bool]) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return self.env.action_space.sample() # 探索:随机选择动作
else:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
return torch.argmax(q_values).item() # 利用:选择 Q 值最大的动作
def update(self):
"""从经验回放中随机抽取一个批次的经验,进行 Q 网络的更新"""
if len(self.memory) < self.batch_size:
return # 如果经验回放池中的样本不足一个批次,则不进行更新
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.int64)
# 计算 Q 值:使用当前网络和目标网络
q_values = self.q_network(states)
next_q_values = self.target_network(next_states)
# 选择下一状态的最大 Q 值
max_next_q_values = next_q_values.max(dim=1)[0]
# 计算目标 Q 值
target_q_values = (
rewards + (1 - dones) * self.discount_factor * max_next_q_values
)
# 计算当前 Q 值和目标 Q 值的损失
q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
loss = nn.functional.mse_loss(q_value, target_q_values)
# 优化 Q 网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.training_error.append(loss.item())
def decay_epsilon(self):
"""逐渐减少 epsilon"""
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def store_experience(self, obs, action, reward, next_obs, done):
"""将经验存储到经验回放池"""
self.memory.append((obs, action, reward, next_obs, done))
def update_target_network(self):
"""每隔一段时间更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
# 超参数
learning_rate = 0.001
n_episodes = 1000 + 2
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)
final_epsilon = 0.1
batch_size = 64
memory_size = 10000
target_update_freq = 10 # 目标网络更新的频率
# 创建环境
env = gym.make("Taxi-v3", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
env,
video_folder="./Taxi_video",
episode_trigger=lambda episode_id: episode_id % 200 == 0,
name_prefix="episode",
)
# 创建智能体
agent = DQNAgent(
env=env,
learning_rate=learning_rate,
initial_epsilon=start_epsilon,
epsilon_decay=epsilon_decay,
final_epsilon=final_epsilon,
batch_size=batch_size,
memory_size=memory_size,
)
# 训练过程
for episode in tqdm(range(n_episodes)):
obs, info = env.reset()
done = False
# 玩一局
while not done:
action = agent.get_action(obs)
next_obs, reward, terminated, truncated, info = env.step(action)
# 存储经验到经验回放池
agent.store_experience(obs, action, reward, next_obs, terminated or truncated)
# 更新 Q 网络
agent.update()
# 如果当前回合结束,则更新状态
obs = next_obs
done = terminated or truncated
# 每隔一定步骤更新目标网络
if episode % target_update_freq == 0:
agent.update_target_network()
# 衰减 epsilon
agent.decay_epsilon()
env.close()
  • Q 网络:
    • 输入:Rbatch×state_dim\mathbb R^{batch\times state\_dim}
    • 输出是:Rbatch×action_dim\mathbb R^{batch\times action\_dim},表示每个动作的 Q
  • DQNϵ\epsilon 的概率随机选择动作,以 1ϵ1-\epsilon 的概率选择具有最大 Q 值的动作,其中 ϵ\epsilon 是一个逐渐减小的值(从 1.0 逐渐降低到 0.1),用于控制探索和利用的平衡,这一点和 Q Learning 算法一样
  • 为了减少 Q 网络的方差,DQN 算法使用了两个网络:Q 网络和目标网络,Q 网络用于计算当前 Q 值,目标网络用于计算目标 Q 值,每隔一定步骤更新目标网络的参数,用于固定目标 Q
  • DQN 的经验回放池使用了一个 deque 队列(训练数据是在队列中随机采样),用于存储每一步的 st,at,rt+1,st+1,dones_t,a_t,r_{t+1},s_{t+1},done 信息,用 deque 的原因有两个:
    1. 先进先出,可以保证使用的经验是不断被更新的
    2. 不会因为存储经验过多而导致内存溢出,增加训练的稳定性

Thoughts

  • DQN 中利用影子网络当做目标网络的思想,启发了后续很多算法的设计,例如 MoCo 系列算法
  • 参数更新的本质原理还是绕不开贝尔曼方程和时序差分的思想,只是在实现上有所不同

TL;DR

  • 本文是一篇关于强化学习算法分类的综述性文章,主要介绍了强化学习算法的分类方法,包括基于模型的分类、基于价值函数的分类、基于策略的分类、基于目标的分类、基于学习方法的分类等

强化学习算法分类

reinforce_learnings.png
这张图表展示了强化学习(RL)算法的分类,分为无模型强化学习(Model-Free RL)和基于模型的强化学习(Model-Based RL)。

1. 无模型强化学习(Model-Free RL

无模型强化学习方法 不依赖于环境的模型,而是通过与环境的交互来学习最优策略。它进一步分为策略优化(Policy Optimization)和 Q 学习(Q-Learning)。

1.1 策略优化(Policy Optimization

策略优化方法通过直接优化策略函数来找到最优策略。常见的策略优化算法包括:

  • Policy Gradient: 通过计算策略梯度来更新策略函数,例如 REINFORCE 算法。
  • Actor-Critic (A2C、A3C): 结合了策略梯度(Policy Gradient)和价值迭代(Value Iteration)的思想。
  • Proximal Policy Optimization (PPO): 一种改进的 Actor-Critic 策略梯度方法,通过引入信任区域来提高训练的稳定性。
  • Trust Region Policy Optimization (TRPO): 另一种策略梯度方法,通过限制策略的更新范围来提高训练的稳定性。

1.2 Q 学习(Q-Learning

Q 学习方法通过学习动作值函数(Q 函数)来找到最优策略。常见的 Q 学习算法包括:

  • Deep Q-Network (DQN): 一种结合了深度学习和 Q 学习的方法,通过使用神经网络来逼近 Q 函数。
  • Double Deep Q-Network (DDQN): 一种改进的 DQN 方法,通过引入双网络结构来减少过估计问题。
  • Twin Delayed Deep Deterministic Policy Gradient (TD3): 一种改进的 DDPG 方法,通过引入延迟和双网络结构来提高训练的稳定性。
  • Soft Actor-Critic (SAC): 一种基于最大熵强化学习的方法,通过引入熵正则化来提高探索能力。

1.3 其他 Q 学习方法

  • Categorical DQN (C51): 一种改进的 DQN 方法,通过将 Q 函数离散化为多个类别来提高训练的稳定性。
  • Quantile Regression DQN (QR-DQN): 一种改进的 DQN 方法,通过引入分位数回归来提高训练的稳定性。
  • Hindsight Experience Replay (HER): 一种改进的 DQN 方法,通过引入后见经验回放来提高探索能力。

2. 基于模型的强化学习(Model-Based RL

基于模型的强化学习方法 依赖于环境的模型,通过使用模型来预测环境的动态性,从而找到最优策略。它进一步分为学习模型(Learn the Model)和给定模型(Given the Model)。

2.1 学习模型(Learn the Model

学习模型方法通过与环境的交互来学习环境的模型,然后使用模型来预测环境的动态性。常见的学习模型算法包括:

  • World Models: 一种基于模型的方法,通过使用神经网络来逼近环境的动态性。
  • Integrated Actionable Beliefs (IA2): 一种基于模型的方法,通过将信念状态和动作空间集成起来来提高训练的稳定性。

2.2 给定模型(Given the Model

给定模型方法 假设环境的模型是已知的,然后使用模型来预测环境的动态性。常见的给定模型算法包括:

  • AlphaZero: 一种基于模型的方法,通过使用蒙特卡洛树搜索和神经网络来逼近环境的动态性。
  • Model-Based Policy Optimization (MBPO): 一种基于模型的方法,通过使用模型来预测环境的动态性,然后使用策略优化方法来找到最优策略。
  • Model-Based Value Expansion (MBVE): 一种基于模型的方法,通过使用模型来预测环境的动态性,然后使用价值迭代方法来找到最优策略。

URL

TL;DR

  • 本文通过一个简单的例子介绍了如何用 Q Learning 算法训练一个 AgentBlackjack 游戏,环境是 OpenAI Gym 提供的 Blackjack-v0
  • Q Learning 算法是一种用于有限离散状态和动作环境下的强化学习算法,通过基于贝尔曼方程的更新策略,不断更新 Q Table 来学习最优策略,最终实现 Agent 的自动决策

Algorithm

q_learning_algo.jpeg

Q Learning 算法

1. 环境介绍

  1. Blackjack 游戏规则
graph TD
    A[开始] --> B[给玩家发两张牌,给庄家发两张牌,一张明牌,一张暗牌]
    
    B --> H{玩家是否有21点?}
    H -->|是| I[玩家有自然牌,检查庄家]
    H -->|否| N{玩家选择是否继续要牌?}
    
    I --> K{庄家是否也有自然牌?}
    K -->|是| L[平局]
    K -->|否| M[玩家胜利]
    
    N -->|要牌| O[发一张牌给玩家]
    N -->|停牌| P[玩家停牌,轮到庄家]
    
    O --> Q{玩家是否爆掉(超过21点)?}
    Q -->|是| R[玩家爆掉,庄家胜利]
    Q -->|否| N
    P --> S{庄家是否点数≥17?}
    S -->|是| T[庄家停牌]
    S -->|否| U[庄家要牌]
    
    U --> V{庄家是否爆掉?}
    V -->|是| M[玩家胜利]
    V -->|否| S
    
    T --> X{比较玩家和庄家的点数}
    X -->|玩家点数更大| M[玩家胜利]
    X -->|庄家点数更大| Z[庄家胜利]
    X -->|点数相同| L[平局]
    R --> AA[游戏结束]
    Z --> AA
    L --> AA
    M --> AA
  1. Blackjack 环境定义:
    • Observation space: Tuple(Discrete(32), Discrete(11), Discrete(2)),用一个三元组表示当前状态,分别是:
      • 玩家的点数
      • 庄家的明牌点数
      • 玩家是否有 Ace
    • Action space: Discrete(2)
      • 0 表示 stick (停牌)
      • 1 表示 hit (要牌)
    • Reward: +1 表示玩家胜利,-1 表示庄家胜利,0 表示平局
    • Starting state:
      • 玩家牌之和为 4 - 21 之间
      • 庄家明牌为 1 - 10 之间
      • 玩家是否有 Ace01
    • Episode End:
      • 玩家停牌
      • 玩家爆牌

2. Q Learning 算法

  1. Q Learning 算法是一种 Off-Policy 的强化学习算法,通过不断更新 Q Table 来学习最优策略
  2. Q Table 是一个 State-Action 表,用于存储每个状态下每个动作的 Q Value,因此 Q Table 的大小是 Observation space length * Action space length
  3. Q Value 的更新公式:

Q(s,a)=Q(s,a)+α(r+γmaxaQ(s,a)Q(s,a))Q(s, a) = Q(s, a) + \alpha \cdot (r + \gamma \cdot \max_{a'} Q(s', a') - Q(s, a))

  • Q(s,a)Q(s, a) 是当前状态 s 下采取动作 aQ Value
  • α\alpha 是学习率
  • rr 是当前状态下采取动作 a 后的奖励
  • γ\gamma 是折扣因子,用于平衡当前奖励和未来奖励,越大表示越重视未来奖励,越小表示越重视当前奖励
  • maxaQ(s,a)\max_{a'} Q(s', a') 是下一个状态 s' 下所有动作中最大的 Q Value

3. 使用 Q Learning 算法训练 AgentBlackjack

实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from matplotlib.patches import Patch
from tqdm import tqdm
from typing import Tuple
import gymnasium as gym
class BlackjackAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
):
"""Initialize a Reinforcement Learning agent with an empty dictionary
of state-action values (q_values), a learning rate and an epsilon.
Args:
learning_rate: The learning rate
initial_epsilon: The initial epsilon value
epsilon_decay: The decay for epsilon
final_epsilon: The final epsilon value
discount_factor: The discount factor for computing the Q-value
"""
self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.training_error = []
def get_action(self, env, obs: Tuple[int, int, bool]) -> int:
"""
Returns the best action with probability (1 - epsilon)
otherwise a random action with probability epsilon to ensure exploration.
"""
# with probability epsilon return a random action to explore the environment
if np.random.random() < self.epsilon:
return env.action_space.sample()
# with probability (1 - epsilon) act greedily (exploit)
else:
return int(np.argmax(self.q_values[obs]))
def update(
self,
obs: Tuple[int, int, bool],
action: int,
reward: float,
terminated: bool,
next_obs: Tuple[int, int, bool],
):
"""Updates the Q-value of an action."""
future_q_value = (not terminated) * np.max(self.q_values[next_obs])
temporal_difference = (
reward + self.discount_factor * future_q_value - self.q_values[obs][action]
)
self.q_values[obs][action] = (
self.q_values[obs][action] + self.lr * temporal_difference
)
self.training_error.append(temporal_difference)
def decay_epsilon(self):
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
env = gym.make("Blackjack-v1", sab=True)
# hyperparameters
learning_rate = 0.01
n_episodes = 100_000
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2) # reduce the exploration over time
final_epsilon = 0.1
agent = BlackjackAgent(
env=env,
learning_rate=learning_rate,
initial_epsilon=start_epsilon,
epsilon_decay=epsilon_decay,
final_epsilon=final_epsilon,
)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)
for episode in tqdm(range(n_episodes)):
obs, info = env.reset()
done = False
# play one episode
while not done:
action = agent.get_action(env, obs)
next_obs, reward, terminated, truncated, info = env.step(action)
# update the agent
agent.update(obs, action, reward, terminated, next_obs)
# update if the environment is done and the current obs
done = terminated or truncated
obs = next_obs
agent.decay_epsilon()
  1. 代码中定义了一个 BlackjackAgent 类,用于实现 Q Learning 算法
  2. 初始化:
    1. 初始化 Q Table 为一个空的字典,Q Value0
    2. 学习率为 0.01
    3. 折扣因子为 0.95
    4. 初始 epsilon1.0,随着训练逐渐减小为 0.1
  3. get_action 方法用于根据当前状态选择动作,以 epsilon 的概率选择随机动作,以 1 - epsilon 的概率选择最优动作
  4. update 方法用于更新 Q Value,根据 Q Learning 公式更新 Q Value

4. 训练结果

q_learning.png
q_learning_1.png
q_learning_2.png

Thoughts

  • Q Learning 算法的关键是更新 Q Table 的过程,原理是贝尔曼方程:Q(s,a)=E[Rt+1+γ maxaQ(st+1,a)St=s,At=a]Q(s,a)=\mathbb E[R_{t+1}+\gamma\ max_{a'}Q(s_{t+1},a')|S_t=s,A_t=a],表示当前状态下采取动作 aQ Value 等于即时奖励加上折扣后未来(下一个时间步)状态的最大 Q Value
  • 对于 State SpaceAction Space 比较大的问题,Q Learning 算法的 Q Table 可能会非常庞大,因此需要使用 Deep Q Learning 算法,用神经网络来近似 Q Value,以减少存储空间,也就是 DQN 算法

URL

TL;DR

  • 本博客是从零开始学习强化学习系列的第一篇,重点在于介绍强化学习的基础概念。主要介绍了 REINFORCE 算法的基本原理,并用 REINFORCE 算法训练 Agent 玩倒立摆游戏
  • REINFORCE 算法是一种基于梯度的策略优化算法,提出时间是 1992 年,算是强化学习的基础算法之一
  • 倒立摆游戏是一个非常简单的强化学习环境,但是可以很好地展示 REINFORCE 算法的效果

Algorithm

1. 强化学习基础

AE_loop_dark.png

  • 强化学习的基本流程如上图所示,主要包括:
    1. Agent:智能体,即我们要训练的模型
    2. Environment:环境,即智能体需要与之交互的环境,比如倒立摆游戏
    3. State:状态,也被称为 Observation (观测) 即环境的状态,比如倒立摆的角度
    4. Action:动作,即智能体在某个状态下可以采取的动作,比如向左或向右
    5. Reward:奖励,即智能体在某个状态下采取某个动作后得到的奖励,比如倒立摆保持平衡时给予正奖励
    6. Policy:策略,是智能体的核心部分,即智能体在某个状态下采取某个动作的概率分布,智能体需要根据策略来选择动作

2. 倒立摆游戏

episode-episode-8000.gif

  • 上图是用强化学习实际学习得到的倒立摆游戏效果,目标推动小车让杆尽可能竖直,这个游戏在 Gymnasium 库中,被定义为:
    1. Observation SpaceBox(-inf, inf, (4,), float64),观测状态用一个长度为 4 的向量表示,每个元素的取值范围为任意实数,其中每个维度数值的含义如下:
      1. 小车的位置
      2. 小车上杆子的垂直角度
      3. 小车的速度
      4. 小车上杆子的角速度
    2. Action SpaceBox(-3.0, 3.0, (1,), float32),动作空间为 [-3, 3] 之间的一个浮点数,表示智能体推小车的力(带方向)
    3. Reward:目标是使倒立摆尽可能长时间直立(在一定角度限制内),因此,当杆直立的每个时间步都会获得 +1 的奖励
    4. Starting State:起始状态为 (0, 0, 0, 0),然后随机施加 [-0.01, 0.01] 的均匀随机噪声
    5. Episode End:一次游戏结束,判定条件为:
      1. Truncation:游戏累积 1000 个时间步
      2. Termination:状态空间中元素出现无穷 或 立杆的垂直角度大于 0.2 弧度(约 11.5 度)

3. REINFORCE算法

  • REINFORCE 算法是一种基于策略梯度的强化学习算法,其核心思想是通过采样得到的轨迹来估计策略梯度,并通过梯度上升的方法来优化策略

3.1 从公式角度讲

  • 具体步骤如下:
    1. 初始化策略:随机初始化策略参数 θ\theta
    2. 采样轨迹:在当前策略 πθ\pi_\theta 下采样一条轨迹(状态、动作、奖励在时间维度上组成的序列) τ=(s0,a0,r1,s1,a1,r2,,sT)\tau = (s_0, a_0, r_1, s_1, a_1, r_2, \ldots, s_T)
    3. 计算累积回报:对于轨迹中的每个时间步 tt,计算从时间步 tt 开始的累积回报 Gt=k=tTγktrkG_t = \sum_{k=t}^T \gamma^{k-t} r_k,其中 γ\gamma 是折扣因子
    4. 计算累积回报期望:计算轨迹中每个时间步的累积回报期望 J(θ)=Eτπθ[t=0Tlogπθ(atst)Gt]J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \log \pi_\theta(a_t | s_t) G_t \right]
    5. 更新策略参数:根据轨迹中的状态、动作和回报,计算策略梯度 θJ(θ)=Eτπθ[t=0Tθlogπθ(atst)Gt]\nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t | s_t) G_t \right],并使用梯度上升法更新策略参数 θθ+αθJ(θ)\theta \leftarrow \theta + \alpha \nabla_\theta J(\theta),其中 α\alpha 是学习率
  • 通过不断重复上述步骤,策略会逐渐优化,使得智能体在环境中的表现越来越好
  • REINFORCE 算法的优点是简单易实现,但缺点是方差较大,收敛速度较慢
  • 代码实现可以参考 Gymnasium 教程

3.2 从实现代码角度讲

  1. 构建 Policy
    1. 构成:Policy 是一个 MLP 网络
    2. 输入:State(一个长度为 4 的浮点数向量)
    3. 输出:两个标量,分别表示正态分布的均值和标准差
  2. 构建 Agent
    1. 构成:一个 Agent 包含一个 Policy 以及对此 Policy 的使用和更新方法
    2. 使用:即如何使用 Agent 根据当前状态选择动作
    3. 更新:即如何根据环境的反馈(奖励)更新 Policy 的参数
  3. 训练 AgentAgentEnv 交互):
    1. 初始化 Agent 和环境
    2. 采样轨迹:在当前策略下采样动作,形成一条轨迹
    3. 计算回报:计算轨迹中每个时间步的回报
    4. 更新策略:根据策略梯度更新策略参数
    5. 重复上述步骤直到策略收敛
      代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import random
import numpy as np
import pandas as pd
import seaborn as sns
from typing import Tuple
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import matplotlib.pyplot as plt
import gymnasium as gym
plt.rcParams["figure.figsize"] = (10, 5)
class Policy_Network(nn.Module):
"""Parametrized Policy Network."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes a neural network that estimates the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
super().__init__()
hidden_space1 = 16 # Nothing special with 16, feel free to change
hidden_space2 = 32 # Nothing special with 32, feel free to change
# Shared Network
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# Policy Mean specific Linear Layer
self.policy_mean_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
# Policy Std Dev specific Linear Layer
self.policy_stddev_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Conditioned on the observation, returns the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
x: Observation from the environment
Returns:
action_means: predicted mean of the normal distribution
action_stddevs: predicted standard deviation of the normal distribution
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
return action_means, action_stddevs
class REINFORCE:
"""REINFORCE algorithm."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes an agent that learns a policy via REINFORCE algorithm [1]
to solve the task at hand (Inverted Pendulum v4).
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.gamma = 0.99 # Discount factor
self.eps = 1e-6 # small number for mathematical stability
self.probs = [] # Stores probability values of the sampled action
self.rewards = [] # Stores the corresponding rewards
self.net = Policy_Network(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray) -> float:
"""Returns an action, conditioned on the policy and observation.
Args:
state: Observation from the environment
Returns:
action: Action to be performed
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs = self.net(state)
# create a normal distribution from the predicted
# mean and standard deviation and sample an action
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
prob = distrib.log_prob(action)
action = action.numpy()
self.probs.append(prob)
return action
def update(self):
"""Updates the policy network's weights."""
running_g = 0
gs = []
# Discounted return (backwards) - [::-1] will return an array in reverse
for R in self.rewards[::-1]:
running_g = R + self.gamma * running_g
gs.insert(0, running_g)
deltas = torch.tensor(gs)
log_probs = torch.stack(self.probs)
# Update the loss with the mean log probability and deltas
# Now, we compute the correct total loss by taking the sum of the element-wise products.
loss = -torch.sum(log_probs * deltas)
# Update the policy network
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Empty / zero out all episode-centric/related variables
self.probs = []
self.rewards = []
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
wrapped_env = gym.wrappers.RecordVideo(
env,
video_folder="./InvertedPendulum_video",
episode_trigger=lambda episode_id: episode_id % 2000 == 0,
name_prefix="episode",
)
# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
agent = REINFORCE(obs_space_dims, action_space_dims)
reward_over_episodes = []
for episode in range(total_num_episodes):
obs, info = wrapped_env.reset()
done = False
while not done:
action = agent.sample_action(obs)
obs, reward, terminated, truncated, info = wrapped_env.step(action)
agent.rewards.append(reward)
done = terminated or truncated
reward_over_episodes.append(wrapped_env.return_queue[-1])
agent.update() # 每完成一次轨迹才会更新一次策略

重点代码分析:

  1. Policy 预测采样动作的均值和标准差:
    1
    2
    3
    4
    5
    shared_features = self.shared_net(x.float())
    action_means = self.policy_mean_net(shared_features) # 直接预测采样动作的均值
    action_stddevs = torch.log(
    1 + torch.exp(self.policy_stddev_net(shared_features))
    ) # 预测采样动作的标准差,保证标准差为正
  2. 采样动作:
    1
    2
    3
    distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)  # 根据 Policy 预测的均值和标准差构建正态分布
    action = distrib.sample() # 从正态分布中采样动作
    prob = distrib.log_prob(action) # 同时计算采样动作的概率,用于后续计算策略梯度来更新策略
  3. 更新策略:
    1
    2
    3
    4
    5
    6
    7
    8
    running_g = 0
    gs = []
    for R in self.rewards[::-1]:
    running_g = R + self.gamma * running_g
    gs.insert(0, running_g)
    deltas = torch.tensor(gs) # 计算折扣累积回报
    log_probs = torch.stack(self.probs)
    loss = -torch.sum(log_probs * deltas) # 根据策略累积折扣回报和策略概率计算期望策略累积折扣期望,目标是最大化期望

最终效果:
reinforce_learning_v2.png

可以看出,Agent 在训练过程中逐渐学会了如何控制小车,使得倒立摆尽可能直立,训练 5000 步就可以将倒立摆稳定保持 200 时间步

4. 思考和尝试

  • 由于长期做有监督深度学习项目,所以会思考:如果使用深度有监督学习模型来解决倒立摆问题,会有什么不同?
  • 但直接使用深度有监督学习模型来解决倒立摆问题是不现实的,因为不管是用奖励计算损失还是用观测状态计算损失,都无法通过梯度反向传播来优化模型,因为环境并不可微
  • 环境不可微 是强化学习和深度学习的根本区别之一,那么如何解决 “深度有监督学习无法解决倒立摆问题” 呢?
  • 一个简单有效的方法是使用两个阶段的模型:
    1. 第一阶段:训练一个深度有监督学习模型,作为环境仿真器
      1. 输入:状态(观测)+ 随机动作
      2. 输出:预测的下一个状态 + 预测的奖励
      3. 监督:真实环境下,输入随机动作后的新状态和奖励
    2. 第二阶段:训练一个深度有监督学习模型,作为智能体
      1. 输入:状态(观测)
      2. 输出:动作
      3. 监督:环境仿真器(冻结)预测的下一个状态和奖励(目标是奖励尽可能高 且 立杆尽可能竖直 且 小车速度尽可能小 且 立杆线速度尽可能小)
  • 通过两个阶段的模型训练,可以将环境的不可微性质转化为可微性质,从而使用深度有监督学习模型来解决倒立摆问题
  • 实现代码:
  1. 训练环境仿真器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import random
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
class EnvPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims + action_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, obs_space_dims + reward_space_dims),
)
def forward(self, x: torch.Tensor):
return self.net(x.float())
class SupervisedAgent:
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.net = EnvPredNet(obs_space_dims, action_space_dims, reward_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
self.loss_fn = nn.MSELoss(reduce="sum")
def sample_action(self) -> float:
random_action = torch.clamp(torch.randn(1), -3, 3)
return random_action
def pred_state(self, state, action):
state = torch.tensor(np.array([state]))
state_action = torch.cat((state, action.unsqueeze(0)), dim=1)
next_state = self.net(state_action)
return next_state
def update(self, pred_state, gt_state, reward):
loss = self.loss_fn(
pred_state, torch.tensor([[*gt_state, reward]], dtype=torch.float32)
)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e4)
obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
reward_space_dims = 1
agent = SupervisedAgent(obs_space_dims, action_space_dims, reward_space_dims)
agent.net.train()
for episode in range(total_num_episodes):
state, info = wrapped_env.reset()
done = False
while not done:
action = agent.sample_action()
gt_next_state, reward, terminated, truncated, info = wrapped_env.step(action)
pred_state = agent.pred_state(state, action)
loss = agent.update(pred_state, gt_next_state, reward)
state = gt_next_state
done = terminated or truncated
print(f"Episode: {episode}, Loss: {loss}")
torch.save(agent.net.state_dict(), "env_predict_model.pth")
  1. 训练智能体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import random
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
class EnvPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims + action_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, obs_space_dims + reward_space_dims),
)
def forward(self, x: torch.Tensor):
return self.net(x.float())
class ActionPredNet(nn.Module):
def __init__(self, obs_space_dims: int, action_space_dims: int):
super().__init__()
hidden_space1 = 128 # Nothing special with 16, feel free to change
hidden_space2 = 256 # Nothing special with 32, feel free to change
self.net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
nn.Linear(hidden_space2, action_space_dims),
nn.Tanh(),
)
def forward(self, x: torch.Tensor):
action = self.net(x.float()) * 3
return action
class SupervisedAgent:
def __init__(self, obs_space_dims: int, action_space_dims: int, reward_space_dims: int):
self.env_pred_net = EnvPredNet(obs_space_dims, action_space_dims, reward_space_dims)
self.action_pred_net = ActionPredNet(obs_space_dims, action_space_dims)
self.env_pred_net.load_state_dict(torch.load("env_predict_model.pth"))
self.env_pred_net.eval()
self.action_pred_net.train()
self.learning_rate = 1e-4
self.optimizer = torch.optim.AdamW(
self.action_pred_net.parameters(), lr=self.learning_rate
)
def get_action(self, state) -> float:
action = self.action_pred_net(state)
return action
def pred_state_reward(self, state, action):
state_action = torch.cat([torch.tensor([state]), action], dim=1)
return self.env_pred_net(state_action)
def update(self, pred_state_reward, state):
loss = (
pred_state_reward[0, 1].abs() * 10 # 立杆尽可能竖直
+ pred_state_reward[0, 2:4].abs().sum() # 小车速度和立杆角速度尽可能小
+ (pred_state_reward[0, 0] - state[0]).abs() * 0.1 # 小车位置尽可能不变
- pred_state_reward[0, 4].abs() # 奖励尽可能高
)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e4) # Total number of episodes
obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
reward_space_dims = 1
agent = SupervisedAgent(obs_space_dims, action_space_dims, reward_space_dims)
for episode in range(total_num_episodes):
state, info = wrapped_env.reset()
done = False
reward_sum = 0
while not done:
action = agent.get_action(torch.tensor([state]))
gt_next_state, reward, terminated, truncated, info = wrapped_env.step(
action.detach().numpy()[0]
)
pred_state_reward = agent.pred_state_reward(state, action)
loss = agent.update(pred_state_reward, state)
state = gt_next_state
done = terminated or truncated
reward_sum += reward
print(f"Episode: {episode}, Reward: {reward_sum}, Loss: {loss}")
torch.save(agent.action_pred_net.state_dict(), "action_predict_model.pth")

5. 总结

  • 通过结合 REINFORCE 算法和倒立摆任务,本文展示了强化学习的基本原理和具体实现。
  • 同时,提出了针对环境不可微问题的创新方法,即通过建立环境仿真器来将不可微问题转化为可微问题,从而使得深度学习能够在强化学习任务中发挥作用。
  • 有监督学习和强化学习的对比:
    • 二阶段有监督学习适合在已知环境模型的基础上,快速训练并优化策略,特别是离线学习和仿真场景。
    • 强化学习则适用于更为复杂、不确定的环境,尤其是无法精确建模的动态场景,并且可以在实时交互中自我改进。

URL

关键问题列表(以 Qwen2.5-0.5B-prefill 为例)

  1. pulsar2 llm_build 模式和标准编译模式的区别?共用了什么部分?
    1. 标准编译模式有前端,llm_build 模式没有前端
    2. llm_build 量化不共用标准编译,weight 8 bit minmax / decoder layer 之间用 bf16 浮点 / 层内动态量化
  2. ax-llm-build / ax-llm 两个 repo 的关系?他们用来做什么?
    1. ax-llm-build 给上板准备除了模型之外的文件,例如 embed / 数据类型转换等
    2. ax-llm 输入编译后模型文件,输出是板上可运行的 chatbot
  3. 模型输出文件的含义?l%d.axmodel 表示层,qwen_post.axmodel 表示 hidden size -> vocab size 的 fc 吗?model.embed_tokens.weight.bfloat16.bin 是 token embedding 吗?
    1. token embedding / post.axmodel 都用 fc / 查表去做
  4. 为什么分层部署?动态量化发生在 decode layer 层之间吗?在 ppl 里用 cpu 动态量化 activation 吗?decoder layer 中有没有用到动态量化?
    1. teng 可以做动态量化,不需要用 cpu
  5. decode layer 用 flash attention 了吗?v2 / v3 ?
    1. 没有,用了标准 attention,softmax 单独计算
  6. 用 kv cache 了吗?如何用?DRAM 够用吗?
    1. 用了,squence length 限制到 2k 之后,DRAM 够用
  7. GQA 用了吗?如何用?repeat 再做 self-attention?
    1. 有实际节约效率
  8. 有没有用到 llm 专用的量化算法?
    1. 没有
  9. decode method 在 ppl 中用 cpu 实现吗?可以放到 npu 上吗?
    1. 用 cpu 实现,目前直接用 argmax 了
    2. 用 npu 实现的话可以用 top-k
  10. continue batching 用到了吗?(单 batch 似乎不用考虑这个问题,假如 ax650 作为云端芯片就需要考虑了)
    1. 没有
  11. paged attention 用到了吗?(似乎用内存分页管理就够了?)
    1. 没有
  12. 有哪部分是用浮点运行的?哪部分用定点运行的?
    1. 除了 conv 相关的都用浮点,例如 softmax
  13. 对于长上下文如何处理?qwen2 似乎禁用了 sliding_window,那么如何处理长上下文?计算效率如何?
    1. 最长测试过 1024
  14. ssm 状态空间模型支持吗?例如 mamba
    1. 没有,后续可能从节省 kv cache 角度会考虑支持
  15. 假如有一个 sparse attention 模型,会快吗?还是要补成 dense 再算?
    1. 没有专用加速算子,可能不会太快
  16. 可能考虑投机解码这种相对更高端的加速技术吗?例如 SpecInfer 这种
    1. 没有

0. 背景

  • 本文主要介绍 hugging face 托管的 LLM 如何下载,并用 transformers repo 推理

1. 相关工具介绍

  1. hugging face 是一个托管 LLM 的网站,类似 github / dockerhub,可以上传下载大模型,也可以在线推理等
  2. transformers 是一个 hugging face 维护的开源大模型代码库,包含约 300 个开源大模型的网络结构,可以配合 hugging face 下载的大模型,实现一键推理

2. 模型文件下载

2.1 下载工具

  1. hugging face 有专用工具 huggingface-cli,作用类似于低配版 git,可以实现远程 hugging face repo 的登录、登出、上传、下载等
  2. 对于 Qwen2-0.5B-Instruct 模型,huggingface-cli 可直接免登录下载,而例如 Llama3.2-1B 等模型需要向 meta 提交申请并获得同意后下载

2.2 下载文件内容

  1. hugging face 模型文件格式较为固定,一般为:
    1. config.json
      1. 这个文件包含了模型的配置信息,比如模型的架构参数、词表大小、激活函数类型等
      2. 它通常用于在加载模型时,确保模型的配置与原始模型一致
    2. generation_config.json
      1. 这个文件包含了生成文本时的配置信息,比如最大长度、停止序列、解码相关配置(温度、top-ktop-p、惩罚系数)等,以及依赖的 transformers 版本
      2. 这些参数影响模型生成文本的行为
    3. LICENSE
      1. 这个文件包含了模型的许可证信息,说明了你可以如何使用这个模型,以及使用时需要遵守的法律条款
    4. merges.txt
      1. 这个文件通常用于字节对编码(Byte Pair Encoding, BPE)分词器,它包含了词汇的合并规则
      2. BPE 是一种用于创建词汇表和编码文本的算法
    5. model.safetensors
      1. 这是一个保存模型权重的文件,使用了 SafeTensors 格式
      2. 这是一种高效的文件格式,用于存储和加载深度学习模型的权重
    6. README.md
      1. 这个文件包含了模型的说明文档,通常包括模型的简介、如何使用模型、训练细节、性能指标等信息
    7. tokenizer_config.json
      1. 这个文件包含了分词器的配置信息,比如分词器的类型、是否使用 BPE
    8. tokenizer.json
      1. 这个文件包含了分词器的预训练信息,比如词汇表、特殊标记等
      2. 它用于将文本转换为模型可以理解的数值输入
    9. vocab.json
      1. 这个文件包含了模型的词汇表,即模型在训练时使用的所有单词或标记的列表
      2. 分词器使用这个词汇表将文本分割成模型可以理解的输入
  2. Qwen2-0.5B-Instruct 中的 "Instruct" 是指此模型是经过指令遵循微调后的模型

2.3 模型参数解析

  1. 下载的文件中 model.safetensors 就是模型参数
  2. safetensors 是由 Hugging Face 开发的一种新型文件格式,专门用于安全地存储和加载机器学习模型的权重,这种格式特别关注模型的安全性、隐私保护和快速加载
    1. 安全性:safetensors 格式的设计目标之一是提高模型存储的安全性,它不允许执行代码,从而减少了模型文件被恶意篡改的风险
    2. 快速加载:与传统的 PyTorch 序列化格式(如 .pth.bin)相比,safetensors 可以更快地加载模型权重,尤其是在 CPUGPU
    3. 跨语言和跨框架兼容性:safetensors 支持在不同的编程语言和机器学习框架之间共享模型权重,例如可以在 Python 中序列化并在 C++ / Java / JavaScript 中加载
    4. 懒加载:支持懒加载,即可以选择只读取文件中的一部分张量,这对于分布式设置中的多节点或 GPU 非常有用
  3. 如何解析一个 model.safetensors 文件?
1
2
3
4
5
6
from safetensors import safe_open
tensors = {}
# framework="pt" means pytorch
with safe_open("model.safetensors", framework="pt", device="cpu") as f:
for key in f.keys():
tensors[key] = f.get_tensor(key)

3. 构建模型并加载预训练参数

1
2
3
4
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
model_path, torch_dtype="auto", device_map="auto"
)
  • hugging facetransformers 库提供了一个 AutoModelForCausalLM 类,可以根据模型路径,读取配置文件并自动加载模型

4. 输入构建

输入构建分成三个部分:

  1. 初始化 tokenizer
  2. prompt 指令化
  3. text to token idtokenize

4.1 初始化 tokenizer

1
2
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path)
  • transformers 会自动读取 tokenizer 相关配置文件,创建对应的 tokenizer 对象

4.2 prompt 指令化

1
2
3
4
5
6
7
8
9
10
11
12
13
prompt = "艾萨克牛顿是谁?"
messages = [
{
"role": "system",
"content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.",
},
{"role": "user", "content": prompt},
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
  • 上面的 messageQwen2 指令微调后模型的输入格式,包含了 rolecontent 两个字段
  • 具体来说需要两个步骤:
    1. prompt to message:将 prompt 转换为 message 格式
    2. message to chat:在 message 中插入特殊字符,形成 chat 格式,特殊字符主要包括:
      1. <|im_start|>:表示对话开始
      2. <|im_end|>:表示对话结束
      3. \n:间隔 rolecontent
  • 最终生成的 text 实际内容为:
1
<|im_start|>system\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\n<|im_start|>user\n艾萨克牛顿是谁?<|im_end|>\n<|im_start|>assistant
  • 一些额外知识:
    1. prompt 输入模板信息在 tokenizer_config.json 中被定义过,keychat_template
      1
      {% for message in messages %}{% if loop.first and messages[0]['role'] != 'system' %}{{ '<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n' }}{% endif %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}
    2. 模板用到的代码是 Jinja2 写的,Jinja2 是一种模板引擎,tokenizer 会根据这个模板生成 chat 格式的 text
    3. 代码中的 messagesadd_generation_prompt 是模板的输入参数

4.3 tokenize(text to token id)

1
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
  • 这个过程就是 tokenize,将 text 转换为 token id
    • 首先根据 tokenizer.json 中声明的 token encode 方法(例如 BPE),将 text 分词
    • 然后根据 vocab.json 中的词汇表,将分词后的词转换为 token id(查表)
    • 最后将 token id 转换为 tensor 格式(tensor of uint64 dtype

5. 模型前向传播

1
2
3
4
generated_ids = model.generate(
**model_inputs,
max_new_tokens=512,
)
  • generate 方法是 transformers 提供的一个高级封装方法,可以实现 LLM 的推理
    • max_new_tokens 参数表示最大生成的 token 数量
    • generate 方法会自动迭代调用模型的前向传播方法和解码方法,实现文本生成
  • 本章节的重点关注 generate 的模型推理过程,可以分成:
    1. token id to token embedding
    2. position emebedding 构造
    3. attention mask 构造
    4. decoder layer forward

5.1 token id to token embedding

  • nn.Embedding 本质就是查表,将 token id 转换为 token embedding

5.2 position emebedding 构造

  • Qwen2 模型使用的 position embedding 方法是 RoPE (rotary position embedding)

5.2.1 RoPE 介绍

RoPE 的核心思想是将 query embeddingkey embedding 旋转一定角度,让模型感知到序列中的位置信息。

  • RoPE 的输入是:
    1. position id(例如:[0, 1, 2, ..., seq_len - 1]
    2. query embedding (shape = [seq_len, head_dim])
    3. key embedding (shape = [seq_len, head_dim])
  • RoPE 的输出是:
    1. 经过 RoPE 处理后的 query embedding
    2. 经过 RoPE 处理后的 key embedding
  • RoPE 的计算过程是:
    1. 构造 cosRseq_len×head_dim\cos\in\mathbb{R}^{seq\_len\times head\_dim}sinRseq_len×head_dim\sin\in\mathbb{R}^{seq\_len\times head\_dim} 两个矩阵
    2. 用两个矩阵去旋转 query embeddingkey embedding
  • RoPE 的实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
# 构造 RoPE 旋转矩阵
inv_freq = 1.0 / (1000000 ** (torch.arange(0, dim, 2, dtype=torch.int64).float().to(device) / dim)) # 1. 计算频率的倒数,shape = [dim // 2]
freq = inv_freq_expanded.float() @ position_ids_expanded.float() # 2. 计算频率,shape = [seq_len, dim // 2]
emb = torch.cat((freqs, freqs), dim=-1) # 3. 构造 RoPE 矩阵,shape = [seq_len, dim]
cos = emb.cos()[None, None, ...] # 4. 计算 RoPE 的 cos,扩展到多 batch 和多头,shape = [1, 1, seq_len, dim]
sin = emb.sin()[None, None, ...] # 5. 计算 RoPE 的 sin,扩展到多 batch 和多头,shape = [1, 1, seq_len, dim]
# 旋转 query embedding 和 key embedding
def rotate_half(x):
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
q = (q * cos) + (rotate_half(q) * sin) # 6. 旋转 query embedding,shape = [batch_size, num_heads, seq_len, head_dim]
k = (k * cos) + (rotate_half(k) * sin) # 7. 旋转 key embedding,shape = [batch_size, num_heads, seq_len, head_dim]
  • 以上 RoPE 的公式解释:
    • xm,i=xm,icos(mθi)xm,i+d/2sin(mθi)x'_{m,i}=x_{m,i}\cdot \cos (m\theta_i)-x_{m,i+d/2}\cdot \sin (m\theta_i)
    • xm,i+d/2=xm,icos(mθi)+xm,i+d/2sin(mθi)x'_{m,i+d/2}=x_{m,i}\cdot \cos (m\theta_i)+x_{m,i+d/2}\cdot \sin (m\theta_i)
    • 其中:
      • xm,ix_{m,i}query embeddingkey embedding 的第 mm 个位置的第 ii 个维度的值
      • θi=10000002id\theta_i=1000000^{-\frac{2i}{d}}
      • mθi=m10000002idm\theta_i=m\cdot 1000000^{-\frac{2i}{d}}mm 是位置 id
      • ddhead_dim
  • Qwen 模型中的 RoPE 实现代码是 RoPE 的一个简化版本,主要是为了提高计算效率,官方的 RoPE 计算公式如下:
    • xm,2i=xm,2icos(mθi)xm,2i+1sin(mθi)x'_{m,2i}=x_{m,2i}\cdot \cos (m\theta_i)-x_{m,2i+1}\cdot \sin (m\theta_i)
    • xm,2i+1=xm,2icos(mθi)+xm,2i+1sin(mθi)x'_{m,2i+1}=x_{m,2i}\cdot \cos (m\theta_i)+x_{m,2i+1}\cdot \sin (m\theta_i)
    • RdΘ,mx=[x1x2x3x4xd1xd][cos(mθ1)cos(mθ1)cos(mθ2)cos(mθ2)cos(mθd/2)cos(mθd/2)]+[x2x1x4x3xdxd1][sin(mθ1)sin(mθ1)sin(mθ2)sin(mθ2)sin(mθd/2)sin(mθd/2)]\mathbf{R}_{d\Theta,m}\mathbf{x} = \begin{bmatrix} \mathbf{x}_1 \\ \mathbf{x}_2 \\ \mathbf{x}_3 \\ \mathbf{x}_4 \\ \vdots \\ \mathbf{x}_{d-1} \\ \mathbf{x}_d \end{bmatrix} \otimes \begin{bmatrix} \cos(m\theta_1) \\ \cos(m\theta_1) \\ \cos(m\theta_2) \\ \cos(m\theta_2) \\ \cdots \\ \cos(m\theta_{d/2}) \\ \cos(m\theta_{d/2}) \end{bmatrix} + \begin{bmatrix} -\mathbf{x}_2 \\ \mathbf{x}_1 \\ -\mathbf{x}_4 \\ \mathbf{x}_3 \\ \vdots \\ -\mathbf{x}_d \\ \mathbf{x}_{d-1} \end{bmatrix} \otimes \begin{bmatrix} \sin(m\theta_1) \\ \sin(m\theta_1) \\ \sin(m\theta_2) \\ \sin(m\theta_2) \\ \cdots \\ \sin(m\theta_{d/2}) \\ \sin(m\theta_{d/2}) \end{bmatrix}
    • 区别在于:
      • 官方的 RoPE 是同一个位置的相邻维度(2i 和 2i+1)之间旋转
      • QwenRoPE 是同一个位置的跳跃维度(i 和 i+d/2)之间旋转

5.3 attention mask 构造

  • 实现中,attention mask 实际上并不需要构造,因为 torch.nn.functional.scaled_dot_product_attentionis_causal 参数可以自动实现 attention mask
  • 具体来说:
    1. torch.nn.functional.scaled_dot_product_attentionis_causal == True 时,会自动构造一个 attention biasshape = [query_len, key_len],其中 bias[i, j] = -inf if i > j else 0
    2. query @ key.transpose(-2, -1) 得到 attention score 后,会自动加上这个 attention bias
    3. 然后再经过 softmax,这样就实现了 causal attention
  • 而且 causal attentionGPT-like LLM 中,只在 prefill 阶段使用,generate 阶段不使用(或者说隐式使用)

5.4 decoder layer forward

  • LLM 的计算量几乎都体现在 decoder layer forward
  • layer norm 位置角度来看,Qwen2 模型属于 pre-LM,即:
    1. x=x+MHA(LayerNorm(x))x=x+MHA(LayerNorm(x))
    2. x=x+FFN(LayerNorm(x))x=x+FFN(LayerNorm(x))
    3. x=LayerNorm(x)x = LayerNorm(x)
  • 模型包含 24decoder layer,每层包含 2 个子层:
    1. multi-head self-attentionMHA
    2. feed forward networkFFN
  • 为了减小计算量和参数量,Qwen2 模型使用了 GQA (grouped query attention) 方法:
    1. 对于 query 来讲,每层 MHA 包含 14 个头,每个头的 head_dim = 64
    2. 对于 keyvalue 来讲,每层 MHA 包含 2 个头,每个头的 head_dim = 64
    3. 计算时,需要将 keyvaluehead 通过 repeat 扩展到 14 个头
  • 同时,Qwen2 使用了 KV cache 算法,即:
    1. prefill 阶段,keyvalue 会被缓存下来
    2. generate 阶段,之前的 keyvalue 会被直接使用不再计算,只计算最后一个 tokenquery / key / value
  • prefill 阶段:
    • 输入为 hidden stateshape = [1, 36, 896],其中 36seq_len896hidden_dim
    • 输入逐层经过 decoder layer,最后输出 hidden state,和输入 shape 一样(shape = [1, 36, 896]
    • 在这一阶段,每一层的每个头的每个位置(token position)的 keyvalue 都会被缓存下来,shape = [2, 1, 24, 2, 36, 64],其中:
      • 2key / value
      • 1batch_size
      • 24layers
      • 36seq_len
      • 2key_value_head_num
      • 64head_dim
    • 每个 decoder layer 层的 MHA 都需要 causal attention,即 is_causal = True
    • 输出的 hidden state 最后一个 token positionfeature (shape = [1, 896]) 会被输入到 fc 层,输出 shape = [1, 151936]151936vocab_size
    • 然后通过 decode method 得到最终的 token id
  • generate 阶段:
    • 输入为 预填充阶段最终输出的 token id 对应的 token embeddingshape = [1, 1, 896]
    • 输入逐层经过 decoder layer,最后输出 hidden state,和输入 shape 一样(shape = [1, 1, 896]
    • 在这一阶段,每一层的每个头的之前的位置(token position)的 keyvalue 都会被直接使用,不再计算;只计算最后一个 tokenquery / key / value,并将计算得到的 key / value 缓存下来,缓存的 key / value 会拼接到之前的 key / value 上,形成新的 key / valueshape = [2, 1, 24, 14, 36 + 1, 64]
    • 每个 decoder layer 层的 MHA 的输入 shape 如下:
      • query: shape = [1, 14, 1, 64]
      • key / value: shape = [1, 14, 37, 64]
      • is_causal = False,因为 query 长度为 1,不需要 causal attention
    • 最后一层 decoder layer 的输出经过 fcdecode method 得到最终的 token id
    • 重复直到生成的 token id 数量达到 max_new_tokens 或生成的 token ideos 为止

6. decode method

  • decode method 实际上属于模型 forward 过程中的一部分,但是为了方便理解,这里单独拎出来讲
  • decode method 的作用是将 vocab logits 映射到 token id
  • 最简答的 decode methodargmax 方法,即取 vocab logits 中最大的值对应的 token id,但是这种方法会导致生成的文本过于单一
  • 实际上 Qwen2 使用了五个串联的 decode method,分别是:
    • RepetitionPenaltyLogitsProcessor
    • TemperatureLogitsWarper
    • TopKLogitsWarper
    • TopPLogitsWarper
    • SoftmaxSampler

6.1 RepetitionPenaltyLogitsProcessor

RepetitionPenaltyLogitsProcessor 的作用是惩罚重复的 token id,即:

  1. 在预测的 vocab logits 中,找到之前所有 token id 对应的 logits
  2. 如果 logits > 0,则将 logits 除以一个 penaltypenalty 的值取 1.1
  3. 如果 logits < 0,则将 logits 乘以一个 penaltypenalty 的值取 1.1
  4. 目的是让模型更倾向于生成不重复的 token id

6.2 TemperatureLogitsWarper

TemperatureLogitsWarper 的作用是调整 vocab logits 的温度,即:

  1. vocab logits 除以一个 temperaturetemperature 的值取 0.7
  2. 目的是提高模型的生成多样性

6.3 TopKLogitsWarper

TopKLogitsWarper 的作用是截断 vocab logits,即:

  1. 保留 vocab logits 中最大的 k 个值,其他值置为 -infk 的值取 20
  2. 目的是降低模型的生成多样性,提高生成的准确性

6.4 TopPLogitsWarper

TopPLogitsWarper 的作用是截断 vocab probs,即:

  1. vocab logit 通过 softmax 变成 vocab probs
  2. vocab probs 从大到小排序,累加到 p 大于 top-p 为止,保留这些 vocab logits,其他值置为 -inf
  3. top-p 的值取 0.8,最终至少需要保留一个 token id
  4. 目的是降低模型的生成多样性,提高生成的准确性

6.5 SoftmaxSampler

  1. vocab logits (此时只有少量元素不是 -inf) 通过 softmax 变成 vocab probs
  2. 根据 vocab probs 采样一个 token id,作为最终的输出

7. 总结

  • 本文主要介绍了 hugging facetransformers 的使用方法,以及 Qwen2-0.5B-Instruct 模型的推理过程
  • GPT-like 模型的推理过程主要包括 tokenizeRoPEattention maskdecoder layer forwarddecode method 等步骤
  • Qwen2 模型使用了 RoPEGQAKV cache 等技术,提高了模型的计算效率和参数量
  • decode method 使用了 RepetitionPenaltyLogitsProcessorTemperatureLogitsWarperTopKLogitsWarperTopPLogitsWarperSoftmaxSampler 等方法,提高了模型的生成多样性和准确性

0. 背景知识

  1. NAS 是什么?
    • NAS 的全称是 Network Attached Storage,网络附加存储,说白了就是一个低性能高容量的私有云服务器
  2. NAS 能干什么用?
    • NAS 可以作为家庭影院数据中心,NAS + 局域网文件传输协议 + 智能电视 + 家庭影院软件可以实现非常好的家庭观影体验
    • NAS 可以作为私有云服务器,承担例如图床/视频床等功能,也可以建网站
  3. 怎么做一个 NAS
    • 通常情况下,直接买一个成品 NAS 是一个省心的选择,外观看通常是一个可以插硬盘的盒子,本质是一个 PC
    • 我更喜欢自己手搓

1. 手搓 NAS 方案

  1. NAS 手搓需要搞定 NAS 本体和网络两大块
    1. NAS 本体负责下载数据,保存数据和局域网内数据共享
    2. 网络部分负责让 NAS 本体成为一个公网可访问的设备
  2. NAS 本体用树莓派 5 实现是一个不错的选择
  3. 网络部分用光猫桥接 + DDNS + 端口转发方案

2. NAS 本体

硬件

  1. NAS 本体是一个 4GB 版本树莓派 5
  2. 一个 32GBSD 卡作为系统盘
  3. 一块 1TBUSB 3.0 的移动机械硬盘(后续根据需求扩容)

软件

  1. 操作系统:Ubuntu 24.04 Desktop arm64下载地址
    1. Ubuntu 的好处:社区活跃,遇到问题容易解决
    2. 坏处:预安装软件很多,比较大
  2. 其他软件:根据需求安装,例如:
    1. Samba 局域网共享软件
    2. RDP 远程桌面软件
    3. Transmission 磁力链接下载器

3. 网络

NAS 本体比起来,网络才是手搓 NAS 最难的部分,主要包含如下几个部分

公网 IP

  1. 选择的宽带运营商是电信,电信的入户光纤是动态公网 IPv4 地址(这一点还是比较良心的)
  2. 光猫默认开启了路由模式,因此路由器的输入已经变成了局域网,为了简化网络拓扑,需要给电信客服打电话要求将光猫改成桥接模式,即光猫只承担光电转换功能,输出端口 IP 和输入端口 IP 一致
  3. 光猫开启桥接模式之后,路由器输入端口变成了公网 IP,通过 PPPoE 拨号上网,同时实测发现,光猫桥接模式下,下载速度更快

DDNS

  1. 虽然路由器的输入已经变成公网 IP,但是动态的,几乎每天都在发生变化
  2. 因此需要用到 DDNS 服务,让动态 IP 绑定到静态的域名上,域名之前已购买,只需要在顶级域名之前加上一个 A 记录的子域名
  3. 由于家里用的路由器是小米 be6500 pro 型号,这个路由器本身并不支持腾讯云(dnspod)的 DDNS 服务,所以有两条路可以选:
    1. 刷机到 Openwrt 路由器固件,这个开源固件功能十分强大,但坏处是路由器无法再绑定到米家 APP 实现远程控制的一些官方功能
    2. 另外一种方法是通过某种方法开启路由器的 ssh 权限,登录到路由器内部,在内部开启 dnspod DDNS
  4. 这里选择了第二种方法,ssh 开启教程在 这里
  5. 开启 ssh 后,就可以登录到路由器 terminal,然而路由器使用的是小米自己魔改的 XiaoQiang Linux 操作系统,包管理工具以及软件源什么的完全搞不懂,也很难查到相关资料,所以常用的 DDNS 服务软件例如 ddns-go 什么的也无法正常安装和配置
  6. 在仔细了解了 DDNS 服务的基本原理之后,决定用 SHELL 手写一个 DDNS 服务
    1. DDNS 的原理(仅在 dnspod 服务商上测试过):客户端(路由器)定期检查输入端口 IP,当发现和 dnspod 服务商记录的这个域名绑定的 IP 不一致时,就给 dnspod 发一条修改绑定关系的请求,dnspod 更改后,新的 “域名——IP” 绑定关系就建立了
    2. 这里涉及到几个关键:
      1. API Token:上面这个修改过程显然不是任何人都可以改的,你只能修改自己名下的域名绑定的 IP,所以需要一个密钥来和 dnspod 服务器交互,这个密钥在 https://console.dnspod.cn/account/token/token 创建
      2. 查找域名和记录对应的 ID:由于 dnspodDDNS API 要求域名和对应的记录是以 ID 的方式描述的,所以需要查到域名 ID 和记录 ID
        1. 域名 ID
          1. curl -s "https://dnsapi.cn/Domain.List" -d "login_token=<your_token>&format=json" 得到输出查询结果
          2. 然后在查询结果中找到名下多个域名中想要查询域名的 ID
        2. 记录 ID
          1. curl -s "https://dnsapi.cn/Record.List" -d "login_token=<your_token>&format=json&domain_id=<your_domain_id>&sub_domain=<your_sub_domain>" 得到查询结果
          2. 在查询到的此域名下多条记录中,找到关注的记录 ID
      3. 查询当前 IPcurl -s http://ipinfo.io/ip
      4. 更新 DNS 记录:curl -s -X POST "https://dnsapi.cn/Record.Modify" -d "login_token=${API_TOKEN}" -d "format=json" -d "domain_id=${DOMAIN_ID}" -d "record_id=${RECORD_ID}" -d "sub_domain=${SUB_DOMAIN}" -d "record_line=${RECORD_LINE}" -d "record_type=${RECORD_TYPE}" -d "value=${CURRENT_IP}"
    3. 然后将此 DDNS_update.sh 文件注册到 crontab 中,每 5 分钟更新一次
  7. 确实可以正常更新 DNS,不过忙完之后才发现可以在树莓派上实现,不用自己写 SHELL

端口转发

  • 外网只能访问到路由器,如果想要通过 ssh 实现外网直连树莓派,那么需要在路由器上配置端口转发
  • 由于目前只有 SSHRDP 两个远程访问需求,所以只开了 223389TCP 端口转发(开的越少内网设备越安全)

4. 使用体验和后续计划

使用体验

  1. 用外网可以直连树莓派,可以用 SSH 远程给树莓派下发一些下载任务,也可以用 RDP 处理一些需要 GUI 的需求
  2. 设置了移动硬盘开机自动挂载,索尼电视安装 KODI 通过 Samba 协议看 4K 电影体验很震撼

后续计划

  1. 开放 http 协议端口,在树莓派上用 Nginx 等引擎让树莓派作为个人博客的图床和视频床
  2. 扩展硬盘架,树莓派外接 16 pin PCIE 转多口 SATA 扩展板 ,连接机械硬盘架

URL

TL;DR

  • 本文提出一种名为 Low Rank Adaption (LoRA) 的大模型微调技术,可有效降低大模型微调过程中的可微调参数(降低 10000 倍)和显存占用(降低 3 倍)
  • 具体做法是在 Linear 算子和 Embedding 算子中插入可训练的参数量较少的低秩分解矩阵,冻结原始参数,只训练低秩分解矩阵

Algorithm

总体流程

lora_1.png

  • 这张图几乎包含了 LoRA 全部的信息量:
    1. LoRA 在原始 Linear 参数的基础上,加入了低秩分解矩阵 ARd×r,BRr×dA\in\mathbb{R}^{d\times r},B\in\mathbb{R}^{r\times d}
    2. r << d,所以叫
    3. 原始参数冻结,只训练 AB
    4. 矩阵 A 用均值为 0 的正态分布初始化
    5. 矩阵 B 用全 0 初始化

对应代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import torch
import torch.nn as nn
class LoRALinear(nn.Module):
def __init__(self, in_features, out_features, r):
super(LoRALinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.r = r
# 线性层的权重矩阵
self.weight = nn.Parameter(torch.randn(out_features, in_features))
self.bias = nn.Parameter(torch.zeros(out_features))
# LoRA的低秩分解矩阵
self.A = nn.Parameter(torch.randn(r, out_features))
self.B = nn.Parameter(torch.zeros(in_features, r))
def forward(self, x):
# 应用线性层
output = torch.matmul(x, self.weight) + self.bias
# 应用LoRA的低秩分解矩阵
output = output + torch.matmul(x, torch.matmul(self.B, self.A))
return output
def convert_to_standard_linear(self):
# 将LoRA参数转换到标准线性层中
self.weight = nn.Parameter(self.weight + torch.matmul(self.B, self.A))
# 删除LoRA的低秩分解矩阵
del self.A
del self.B
return self
class LoRATransformerLayer(nn.Module):
def __init__(self, d_model, r):
super(LoRATransformerLayer, self).__init__()
self.d_model = d_model
self.r = r
# 自注意力模块的权重矩阵
self.Wq = LoRALinear(d_model, d_model, r)
self.Wk = LoRALinear(d_model, d_model, r)
self.Wv = LoRALinear(d_model, d_model, r)
self.Wo = nn.Linear(d_model, d_model)
def forward(self, x):
# 计算查询、键和值的投影
q = self.Wq(x)
k = self.Wk(x)
v = self.Wv(x)
# 计算自注意力得分和输出
attn_scores = torch.matmul(q, k.transpose(-2, -1)) / (self.d_model**0.5)
attn_weights = torch.softmax(attn_scores, dim=-1)
attn_output = torch.matmul(attn_weights, v)
# 计算最终的输出
output = self.Wo(attn_output)
return output
def convert_to_standard_transformer(self):
# 将LoRA参数转换到标准Transformer网络中
self.Wq = self.Wq.convert_to_standard_linear()
self.Wk = self.Wk.convert_to_standard_linear()
self.Wv = self.Wv.convert_to_standard_linear()
return self
# 示例用法
d_model = 512
r = 8
layer = LoRATransformerLayer(d_model, r)
input_tensor = torch.randn(10, 32, d_model)
output_tensor = layer(input_tensor)
print(output_tensor.shape) # 输出: torch.Size([10, 32, 512])
# 转换到标准Transformer网络
standard_layer = layer.convert_to_standard_transformer()
print(standard_layer)

实际使用

  1. 实际使用时,可以用 LoRA_Layer替换大模型中所有 Transformer 层的 LinearEmbedding,对 FFN 中的 MLP 不做替换
  2. 可以不同的任务微调不同的 LoRA 模型
  3. 部署时可以用重参数化融合的方法,将 LoRA 训练参数融合到原始模型中,不会付出任何推理代价

Thought

  • 有道理,也挺好用,但据说真正做大模型预训练 / 微调的没人用

URL

TL;DR

  • 本文提出一种类似 GPT-4 的图文多模态模型 Large Language and Vision Assistant (LLaVA),基于开源的 CLIPLLaMA 分别作为图文编码器,因此 LLaVA 也完全开源

Algorithm

多模态指令遵循数据生成

  • 已有:图形——文本对数据集
  • 需要:图文指令遵循数据集,格式为:
    • 图片:原始图片
    • 问题:由 GPT-4 生成,输入原始 “图片-文本” 给 GPT-4,让 GPT-4 就这些信息提问
    • 答案:同上,让 GPT-4 回答自己提出的问题

模型结构

  • 图像模型:CLIP ViT-L/14 已做过图像文本对齐的预训练图像编码器模型
  • 大语言模型:LLaMA 预训练模型
  • 连接层:简单的线性映射层

如何训练和微调

训练

  • 冻结图像编码模型
  • 冻结 LLM 模型
  • 训练连接层

微调

  • 冻结图像编码模型
  • 训练 LLM 模型
  • 训练连接层
graph TD;
    A([视觉编码器]) --> B([连接层])
    B --> C([LLaMA语言模型])
    D[语言指令(例如:“请根据这张图片生成一个详细的描述”)] --> C
    C --> E[文本响应]
    F[图像] --> A
    G[系统消息(例如:对话历史记录)] --> C

Thought

  • 简单直接,来自开源,也回馈开源,很棒!

URL

TL;DR

  • 本文提出一种跨模态开放集目标检测算法,即:输入一张图片 和 需要检测内容的文本描述,给出框
  • 其中文本描述可以是开放的(任意内容的文本)
  • 本文最重要的部分是模型结构中图文多模态内容的融合

Algorithm

groundingdino.png

  • 本质是通过多次 Cross-Attention 来做多模态信息融合
  • text backbone 实际是 BERT
  • image backbone 实际是 SwinTransformer
  • 其中的 Language-guide Query Selection 是根据文本特征,找到图像特征中最匹配的部分初始化跨模态解码器

Thought

  • 这篇论文想要解决的任务时开放集目标检测,但其多模态信息融合方式让其出圈,成了多模态领域的经典