Zhangzhe's Blog

The projection of my life.

0%

强化学习(4)——用DQN算法训练Agent玩Taxi游戏

URL

TL;DR

  • 本文介绍了如何使用 DQN (Deep Q Network) 算法训练 AgentTaxi 游戏,重点介绍了 DQN 算法的原理和实现细节。
  • DQN 算法是一种结合了深度学习和 Q 学习的方法,通过使用神经网络来逼近 Q 函数,和 Q Learning 算法类似,只能用于解决离散状态动作下的强化学习问题(例如 Taxi 游戏),并用 Experience Replay 机制解决了和环境交互的数据效率问题。

Algorithm

0. Taxi Game

  • Taxi 游戏是一个强化学习环境,状态空间和动作空间都是离散的,适合用于测试 DQN 等离散状态动作的强化学习算法
    • 状态空间:500500 种状态,表示 Taxi 的位置、乘客的位置和目的地的位置
    • 动作空间:66 个动作,分别是:
      • 0: Move south (down)
      • 1: Move north (up)
      • 2: Move east (right)
      • 3: Move west (left)
      • 4: Pickup passenger
      • 5: Drop off passenger
    • 奖励机制:
      • +20: 送达乘客
      • -10: 乘客在错误位置上车或下车
      • -1: 每一步的惩罚
    • 结束条件:
      • 送达乘客
      • 达到 200 时间步
  • 训练 200episode 后的效果如下:
    episode-episode-200.gif
  • 训练 1000episode 后的效果如下:
    episode-episode-1000.gif

1. Q 函数

  • 价值函数:常用的价值函数有两种,分别是 Q 函数和 V 函数,这里的价值是 累积奖励的期望值(包含折扣),公式表示:E[t=0γtrt]\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t]
  • Q 函数:状态动作值函数,用于评估在状态 s 下采取动作 a 的价值,即 Q(s, a) 表示在状态 s 下采取动作 a 的价值,Qπ(s,a)=E[t=0γtrts0=s,a0=a,π]Q^{\pi}(s,a)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,a_0=a,\pi],其中:
    • E\mathbb E 表示期望操作,即求期望值
    • rtr_t 是在时间步 tt 收到的即时奖励
    • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • π\pi 是智能体的策略,决定了在每个状态下选择什么动作
  • V 函数:状态值函数,用于评估在状态 s 下的价值,Vπ(s)=E[t=0γtrts0=s,π]V^\pi(s)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,\pi]

特性 V 函数 (状态价值函数) Q 函数 (状态-动作价值函数)
定义 评估状态 s 的价值,即在状态 s 开始,按照策略 π 执行的期望累积奖励 评估状态 s 和动作 a 的价值,即在状态 s 选择动作 a,然后按照策略 π 执行的期望累积奖励
输入 状态 s 状态 s 和动作 a
输出 状态的价值,即期望的回报 状态-动作对的价值,即从状态 s 执行动作 a 后的期望回报
更新方式 V(s)V(s) 通过 Q(s,a)Q(s,a) 来更新,通常用贝尔曼方程 Q(s,a)Q(s,a) 直接通过贝尔曼方程更新,通常用于 Q-learningDQN
公式 Vπ(s)=E[t=0γtrts0=s,π]V^\pi(s)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,\pi] Qπ(s,a)=E[t=0γtrts0=s,a0=a,π]Q^\pi(s,a)=\mathbb E[\sum_{t=0}^{\infty}\gamma^tr_t|s_0=s,a_0=a,\pi]
主要用途 用于评估某一状态的好坏,特别适合评估策略 用于评估某一状态下选择特定动作的好坏,通常用于 Q-learning 等算法

2. Q Learning

  • 决策:维护一个 Q TableRnum of state×num of actionQ\ Table\in\mathbb R^{num\ of\ state\times num\ of\ action},用于存储每个状态动作对的价值,对于每一个状态 s,以 1ϵ1-\epsilon 的概率选择具有最大 Q 值的动作 a,以 ϵ\epsilon 的概率随机选择动作 a,即

π(s)={random(a), with probabilityϵargmaxaQ(s,a), with probability1ϵ\pi(s)=\left\{ \begin{array}{rcl} random(a) & \text{, with probability} & \epsilon\\ \arg\max_aQ(s,a) & \text{, with probability} & 1-\epsilon \end{array} \right.

  • 更新:使用 Q Learning 算法更新 Q 函数,即 Q(s,a)Q(s,a)+α[r+γmaxaQ(s,a)Q(s,a)]Q(s,a)\leftarrow Q(s,a)+\alpha[r+\gamma\max_{a'}Q(s',a')-Q(s,a)],其中:
    • α\alpha 是学习率(0<α10<\alpha\leq1),用于控制更新的步长
    • rr 是在状态 s 采取动作 a 后获得的即时奖励
    • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • ss' 是在状态 s 采取动作 a 后转移到的下一个状态

3. DQN Algorithm

  • Q Learning 一个问题是:在较大的状态空间和动作空间下,使用 Q Table 存储所有状态动作对的价值是不现实的
  • 因此 DQN 算法使用神经网络来逼近 Q 函数,即 Q(s,a;θ)Q(s,a)Q(s,a;\theta)\approx Q^*(s,a),其中 θ\theta 是神经网络的参数,用于逼近 Q 函数
  • DQN 算法的目标是最小化 Q 函数的均方误差,即:

    Loss(θ)=Est,at,rt+1,st+1[(ytQ(st,at;θ))2]yt=rt+1+γmaxaQ(st+1,a;θ)Loss(\theta)=\mathbb E_{s_t,a_t,r_{t+1},s_{t+1}}[(y_t-Q(s_t,a_t;\theta))^2]\\ y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-)

    • 此公式是 DQN 最重要的公式,DQN 的整个训练过程都是围绕这个公式展开的
    • Est,at,rt+1,st+1\mathbb E_{s_t,a_t,r_{t+1},s_{t+1}} 这个表示对当前从经验池(Agent 之前的状态动作记录)中采样的数据计算二范数损失,经验池中每个样本都是一个四元组 (st,at,rt+1,st+1)(s_t,a_t,r_{t+1},s_{t+1})表示在状态 sts_t 采取动作 ata_t 后获得的即时奖励 rt+1r_{t+1} 和转移到的下一个状态 st+1s_{t+1}
    • yty_t 是目标 Q 值,表示在状态 sts_t 采取动作 ata_t 后的目标 Q 值(也就是训练的 ground truth),当然这个值是由 Bellman Equation 计算得到的,即 yt=rt+1+γmaxaQ(st+1,a;θ)y_t=r_{t+1}+\gamma\max_{a'}Q(s_{t+1},a';\theta^-),其中:
      • θ\theta^- 是目标网络的参数,是每 T 个时间步更新一次的 DQN 网络参数,用于固定目标 Q
      • rt+1r_{t+1} 是在状态 sts_t 采取动作 ata_t 后获得的即时奖励
      • γ\gamma 是折扣因子(0γ<10\leq\gamma<1),用于折扣未来的奖励
    • Q(st,at;θ)Q(s_t,a_t;\theta) 是当前 Q 值,表示在状态 sts_t 采取动作 ata_t 后的当前 Q 值,即实时 DQN 网络输出的 Q

4. DQN Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
from tqdm import tqdm
from typing import Tuple
# 定义一个简单的神经网络模型来近似 Q 函数
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_dim)
def forward(self, x):
x = torch.nn.functional.one_hot(
x.to(torch.int64), num_classes=env.observation_space.n
).float()
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x) # 输出每个动作的 Q 值
class DQNAgent:
def __init__(
self,
env,
learning_rate: float,
initial_epsilon: float,
epsilon_decay: float,
final_epsilon: float,
discount_factor: float = 0.95,
batch_size: int = 64,
memory_size: int = 10000,
):
self.env = env
self.lr = learning_rate
self.discount_factor = discount_factor
self.epsilon = initial_epsilon
self.epsilon_decay = epsilon_decay
self.final_epsilon = final_epsilon
self.batch_size = batch_size
self.memory = deque(maxlen=memory_size)
self.training_error = []
# Q 网络和目标网络
self.q_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network = DQN(
input_dim=env.observation_space.n, output_dim=env.action_space.n
)
self.target_network.load_state_dict(self.q_network.state_dict()) # 初始化目标网络
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
def get_action(self, obs: Tuple[int, int, bool]) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return self.env.action_space.sample() # 探索:随机选择动作
else:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
q_values = self.q_network(obs_tensor)
return torch.argmax(q_values).item() # 利用:选择 Q 值最大的动作
def update(self):
"""从经验回放中随机抽取一个批次的经验,进行 Q 网络的更新"""
if len(self.memory) < self.batch_size:
return # 如果经验回放池中的样本不足一个批次,则不进行更新
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
rewards = torch.tensor(rewards, dtype=torch.float32)
next_states = torch.tensor(next_states, dtype=torch.float32)
dones = torch.tensor(dones, dtype=torch.int64)
# 计算 Q 值:使用当前网络和目标网络
q_values = self.q_network(states)
next_q_values = self.target_network(next_states)
# 选择下一状态的最大 Q 值
max_next_q_values = next_q_values.max(dim=1)[0]
# 计算目标 Q 值
target_q_values = (
rewards + (1 - dones) * self.discount_factor * max_next_q_values
)
# 计算当前 Q 值和目标 Q 值的损失
q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
loss = nn.functional.mse_loss(q_value, target_q_values)
# 优化 Q 网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.training_error.append(loss.item())
def decay_epsilon(self):
"""逐渐减少 epsilon"""
self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
def store_experience(self, obs, action, reward, next_obs, done):
"""将经验存储到经验回放池"""
self.memory.append((obs, action, reward, next_obs, done))
def update_target_network(self):
"""每隔一段时间更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
# 超参数
learning_rate = 0.001
n_episodes = 1000 + 2
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)
final_epsilon = 0.1
batch_size = 64
memory_size = 10000
target_update_freq = 10 # 目标网络更新的频率
# 创建环境
env = gym.make("Taxi-v3", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
env,
video_folder="./Taxi_video",
episode_trigger=lambda episode_id: episode_id % 200 == 0,
name_prefix="episode",
)
# 创建智能体
agent = DQNAgent(
env=env,
learning_rate=learning_rate,
initial_epsilon=start_epsilon,
epsilon_decay=epsilon_decay,
final_epsilon=final_epsilon,
batch_size=batch_size,
memory_size=memory_size,
)
# 训练过程
for episode in tqdm(range(n_episodes)):
obs, info = env.reset()
done = False
# 玩一局
while not done:
action = agent.get_action(obs)
next_obs, reward, terminated, truncated, info = env.step(action)
# 存储经验到经验回放池
agent.store_experience(obs, action, reward, next_obs, terminated or truncated)
# 更新 Q 网络
agent.update()
# 如果当前回合结束,则更新状态
obs = next_obs
done = terminated or truncated
# 每隔一定步骤更新目标网络
if episode % target_update_freq == 0:
agent.update_target_network()
# 衰减 epsilon
agent.decay_epsilon()
env.close()
  • Q 网络:
    • 输入:Rbatch×state_dim\mathbb R^{batch\times state\_dim}
    • 输出是:Rbatch×action_dim\mathbb R^{batch\times action\_dim},表示每个动作的 Q
  • DQNϵ\epsilon 的概率随机选择动作,以 1ϵ1-\epsilon 的概率选择具有最大 Q 值的动作,其中 ϵ\epsilon 是一个逐渐减小的值(从 1.0 逐渐降低到 0.1),用于控制探索和利用的平衡,这一点和 Q Learning 算法一样
  • 为了减少 Q 网络的方差,DQN 算法使用了两个网络:Q 网络和目标网络,Q 网络用于计算当前 Q 值,目标网络用于计算目标 Q 值,每隔一定步骤更新目标网络的参数,用于固定目标 Q
  • DQN 的经验回放池使用了一个 deque 队列(训练数据是在队列中随机采样),用于存储每一步的 st,at,rt+1,st+1,dones_t,a_t,r_{t+1},s_{t+1},done 信息,用 deque 的原因有两个:
    1. 先进先出,可以保证使用的经验是不断被更新的
    2. 不会因为存储经验过多而导致内存溢出,增加训练的稳定性

Thoughts

  • DQN 中利用影子网络当做目标网络的思想,启发了后续很多算法的设计,例如 MoCo 系列算法
  • 参数更新的本质原理还是绕不开贝尔曼方程和时序差分的思想,只是在实现上有所不同