1. 项目准备

1.1. 问题导入

Sarsa算法和Q-learning算法是两种基于表格的经典强化学习方法,本文将对比探究这两种方法在解决悬崖行走(Cliff Walking)问题时的表现。

1.2. 环境介绍

本次实验所用的训练环境为gym库的“悬崖行走”(CliffWalking-v0)环境。

如上图所示,该问题需要智能体从起点S点出发,到达终点G,同时避免掉进悬崖(cliff)。智能体每走一步就有-1分的惩罚,掉进悬崖会有−100分的惩罚,但游戏不会结束,智能体会回到出发点,然后游戏继续,直到智能体到达重点结束游戏。


2. SARSA算法

2.1. 算法简介

  • SARSA全称是state-action-reward-state'-action',目的是学习特定的state下,特定action的价值Q,最终建立和优化一个Q表格,以state为行,action为列,根据与环境交互得到的reward来更新Q表格,更新公式为:

  • SARSA在训练中为了更好的探索环境,采用ε-greedy方式来训练,有一定概率随机选择动作输出。

2.2. 算法伪码

2.3. 算法实现

(1) 前期准备

  • 导入模块
1
2
import numpy as np
import gym
  • 设置超参数
1
2
3
4
5
6
7
8
TRAIN_EPOCHS = 500             # 训练轮数
LOG_GAP = 50 # 日志打印间隔

LEARNING_RATE = 0.1 # 学习率
GAMMA = 0.95 # 奖励衰减因子
EPSILON = 0.1 # 随机选取动作的概率

MODEL_PATH = "./sarsa.npy" # Q表格保存路径

(2) 构建智能体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class SarsaAgent(object):
def __init__(self, obs_dim, act_dim, learning_rate=0.01,
gamma=0.9, epsilon=0.1):
self.act_dim = act_dim # 动作维度,即可选动作数
self.lr = learning_rate # 学习率
self.gamma = gamma # reward衰减因子
self.epsilon = epsilon # 随机选取动作的概率
self.Q = np.zeros((obs_dim, act_dim)) # Q表格

# 依据输入的状态,采样输出的动作值,包含探索
def sample(self, obs):
if np.random.uniform(0, 1) < self.epsilon:
return np.random.choice(self.act_dim) # 随机探索选取动作
else: # 根据table的Q值选动作
return self.predict(obs) # 根据表格的Q值选动作

# 依据输入的观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
act_list = np.where(Q_list == maxQ)[0] # 找出最大Q值对应的动作
return np.random.choice(act_list) # 随机选取一个动作

# 更新Q-Table的学习方法
def learn(self, obs, action, reward, next_obs, next_act, done):
'''【On-Policy】
obs:交互前的状态,即s[t];
action:本次交互选择的动作,即a[t];
reward:本次动作获得的奖励,即r;
next_obs:本次交互后的状态,即s[t+1];
next_act:根据当前Q表格,针对next_obs会选择的动作,即a[t+1];
done:episode是否结束;
'''
current_Q = self.Q[obs, action] # 当前的Q值
if done: # 如果没有下一个状态了,即当前episode已结束
target_Q = reward # 目标值就是本次动作的奖励值
else: # 否则采用SARSA的公式获取目标值
target_Q = reward + self.gamma * self.Q[next_obs, next_act]
self.Q[obs, action] += self.lr * (target_Q - current_Q)

# 保存Q表格的数据到文件
def save(self, path):
np.save(path, self.Q)
print("\033[1;32m Save data into file: `%s`. \033[0m" % path)

# 从文件中读取数据到Q表格
def restore(self, path):
self.Q = np.load(path)
print("\033[1;33m Load data from file: `%s`. \033[0m" % path)

(3) 训练与测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# run_episode()是agent在一个episode中训练学习的函数,
# 它使用agent.sample()与环境交互,使用agent.learn()训练Q表格
def run_episode(env, agent, render=False):
done, total_steps, total_reward = False, 0, 0
obs = env.reset() # 重置环境,开始新的episode
action = agent.sample(obs) # 根据状态选择动作

while not done:
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
next_act = agent.sample(next_obs) # 根据状态选取动作
agent.learn(obs, action, reward, next_obs, next_act, done) # 学习

obs, action = next_obs, next_act # 记录新的状态和动作
total_reward += reward
total_steps += 1
if render: # 如果需要渲染一帧图形
env.render()
return total_reward, total_steps


# test_episode()是agent在一个episode中测试效果的函数,
# 需要评估agent能在一个episode中拿到多少奖励total_reward
def test_episode(env, agent, render=False):
agent.restore(MODEL_PATH) # 读取训练好的模型参数
done, total_reward = False, 0
obs = env.reset() # 重置环境,开始新的episode
while not done:
action = agent.predict(obs) # 根据状态预测动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
total_reward += reward
obs = next_obs
if render: # 如果需要渲染一帧图形
env.render()
return total_reward
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
env = gym.make("CliffWalking-v0")   # 创建悬崖环境

agent = SarsaAgent(
env.observation_space.n, # 状态的数量
env.action_space.n, # 动作的种类数
learning_rate=LEARNING_RATE, # 学习率
gamma=GAMMA, # 奖励衰减因子
epsilon=EPSILON, # 随机选取动作的概率
) # 创建SARSA智能体

for ep in range(TRAIN_EPOCHS + 1):
ep_reward, ep_steps = run_episode(env, agent, False)
if ep % LOG_GAP == 0: # 定期输出一次分数
print("Episode: %3d; Steps: %3d; Reward: %.1f" %
(ep, ep_steps, ep_reward))

agent.save(MODEL_PATH) # 保存模型参数(Q表格)
test_reward = test_episode(env, agent, False) # 测试模型
print("【Eval】\t Reward: %.1f" % test_reward)

实验结果如下(Reward值越大,说明学习效果越好):

1
2
3
4
5
6
7
8
9
10
11
12
13
Episode:   0; Steps: 857; Reward: -2144.0
Episode: 50; Steps: 33; Reward: -33.0
Episode: 100; Steps: 30; Reward: -129.0
Episode: 150; Steps: 44; Reward: -44.0
Episode: 200; Steps: 15; Reward: -15.0
Episode: 250; Steps: 19; Reward: -118.0
Episode: 300; Steps: 26; Reward: -125.0
Episode: 350; Steps: 19; Reward: -19.0
Episode: 400; Steps: 17; Reward: -17.0
Episode: 450; Steps: 22; Reward: -22.0
Episode: 500; Steps: 19; Reward: -19.0

【Eval】 Reward: -15.0

3. Q-learning算法

3.1. 算法简介

  • Q-learning也是采用Q表格的方式存储Q值(状态动作价值),决策部分与SARSA是一样的,采用ε-greedy方式增加探索。

  • Q-learningSARSA不一样的地方是更新Q表格的方式。

  • SARSA是on-policy的更新方式,先做出动作再更新。
  • Q-learning是off-policy的更新方式,更新learn()时无需获取下一步实际做出的动作next_action,并假设下一步动作是取最大Q值的动作。
  • Q-learning的更新公式为:

3.2. 算法伪码

3.3. 算法实现

(1) 前期准备

  • 导入模块
1
2
import numpy as np
import gym
  • 设置超参数
1
2
3
4
5
6
7
8
TRAIN_EPOCHS = 500               # 训练轮数
LOG_GAP = 50 # 日志打印间隔

LEARNING_RATE = 0.1 # 学习率
GAMMA = 0.95 # 奖励衰减因子
EPSILON = 0.1 # 随机选取动作的概率

MODEL_PATH = "./q_learning.npy" # Q表格保存路径

(2) 构建智能体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class QLearningAgent(object):
def __init__(self, obs_dim, act_dim, learning_rate=0.01,
gamma=0.9, epsilon=0.1):
self.act_dim = act_dim # 动作维度,即可选动作数
self.lr = learning_rate # 学习率
self.gamma = gamma # 奖励衰减因子
self.epsilon = epsilon # 随机选取动作的概率
self.Q = np.zeros((obs_dim, act_dim)) # Q表格

# 依据输入的状态,采样输出的动作值,包含探索
def sample(self, obs):
if np.random.uniform(0, 1) < self.epsilon:
return np.random.choice(self.act_dim) # 随机探索选取动作
else: # 根据table的Q值选动作
return self.predict(obs) # 根据表格的Q值选动作

# 依据输入的观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
act_list = np.where(Q_list == maxQ)[0] # 找出最大Q值对应的动作
return np.random.choice(act_list) # 随机选取一个动作

# 更新Q-Table的学习方法
def learn(self, obs, action, reward, next_obs, done):
'''【Off-Policy】
obs:交互前的状态,即s[t];
action:本次交互选择的动作,即a[t];
reward:本次动作获得的奖励,即r;
next_obs:本次交互后的状态,即s[t+1];
done:episode是否结束;
'''
cur_Q = self.Q[obs, action]
if done:
target_Q = reward
else:
target_Q = reward + self.gamma * np.max(self.Q[next_obs, :])
self.Q[obs, action] += self.lr * (target_Q - cur_Q) # 更新表格

# 保存Q表格的数据到文件
def save(self, path):
np.save(path, self.Q)
print("\033[1;32m Save data into file: `%s`. \033[0m" % path)

# 从文件中读取数据到Q表格
def restore(self, path):
self.Q = np.load(path)
print("\033[1;33m Load data from file: `%s`. \033[0m" % path)

(3) 训练与测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# run_episode()是agent在一个episode中训练学习的函数,
# 它使用agent.sample()与环境交互,使用agent.learn()训练Q表格
def run_episode(env, agent, render=False):
done, total_steps, total_reward = False, 0, 0
obs = env.reset() # 重开一局,重置环境

while not done:
action = agent.sample(obs) # 根据状态选择动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一次交互
agent.learn(obs, action, reward, next_obs, done) # 学习

obs = next_obs # 记录新的状态
total_reward += reward
total_steps += 1
if render: # 如果需要渲染一帧图形
env.render()
return total_reward, total_steps


# test_episode()是agent在一个episode中测试效果的函数,
# 需要评估agent能在一个episode中拿到多少奖励total_reward
def test_episode(env, agent, render=False):
agent.restore(MODEL_PATH) # 读取训练好的模型参数
done, total_reward = False, 0
obs = env.reset() # 重开一局,重置环境
while not done:
action = agent.predict(obs) # 根据状态选取动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一次交互
total_reward += reward
obs = next_obs # 记录新的状态
if render:
env.render()
return total_reward
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
env = gym.make("CliffWalking-v0")   # 创建悬崖环境

agent = QLearningAgent(
env.observation_space.n, # 状态维度
env.action_space.n, # 动作维度
learning_rate=LEARNING_RATE, # 学习率
gamma=GAMMA, # 奖励衰减因子
epsilon=EPSILON, # 随机选取动作的概率
) # 创建Q-learning智能体

for ep in range(TRAIN_EPOCHS + 1):
ep_reward, ep_steps = run_episode(env, agent, False)
if ep % LOG_GAP == 0: # 定期输出一次分数
print("Episode: %3d; Steps: %3d; Reward: %.1f" %
(ep, ep_steps, ep_reward))

agent.save(MODEL_PATH) # 保存模型参数(Q表格)
test_reward = test_episode(env, agent, False) # 测试模型
print("【Eval】\t Reward: %.1f" % test_reward)

实验结果如下(Reward值越大,说明学习效果越好):

1
2
3
4
5
6
7
8
9
10
11
12
13
Episode:   0; Steps: 519; Reward: -1608.0
Episode: 50; Steps: 20; Reward: -20.0
Episode: 100; Steps: 21; Reward: -21.0
Episode: 150; Steps: 47; Reward: -146.0
Episode: 200; Steps: 18; Reward: -18.0
Episode: 250; Steps: 30; Reward: -228.0
Episode: 300; Steps: 20; Reward: -20.0
Episode: 350; Steps: 13; Reward: -13.0
Episode: 400; Steps: 17; Reward: -17.0
Episode: 450; Steps: 14; Reward: -14.0
Episode: 500; Steps: 50; Reward: -248.0

【Eval】 Reward: -13.0

4. 实验结论

在解决悬崖行走问题的过程中,我们发现:

  • Q-learning对环境的探索比较激进胆大,更倾向于最优路线
  • SARSA对环境的探索就比较谨慎胆小,更倾向于安全路线


写在最后

  • 如果您发现项目存在问题,或者如果您有更好的建议,欢迎在下方评论区中留言讨论~
  • 这是本项目的链接:实验项目 - AI Studio,点击fork可直接在AI Studio运行~
  • 这是我的个人主页:个人主页 - AI Studio,来AI Studio互粉吧,等你哦~