强化学习3-1 Q-learning学习
摘要:Q-learning是一种无模型强化学习算法,无需预先知道环境转移概率,仅通过与环境交互的(s,a,r,s')数据更新Q值。其核心优势在于直接学习状态-动作对的预期累积奖励,取代了对环境建模的需求。与有模型算法(如动态规划)不同,Q-learning仅需当前状态、可选动作、即时奖励和实际转移状态,通过试错学习适应未知环境。文中提供了强化学习框架代码,包含环境可视化、多智能体性能比较等功能,支
Q-learning 学习
Q-learning 是无模型(model-free) 强化学习算法,不需要预先知道环境的转移概率 ( P(s’|s,a) )(即执行动作 ( a ) 从状态 ( s ) 转移到 ( s’ ) 的概率),仅依赖实际交互中观察到的“状态-动作-奖励-下一状态”(( s,a,r,s’ ))数据更新 Q 值。
-
无需转移概率的原因
- Q-learning 直接学习“状态-动作对”的预期累积奖励(Q 值),而非建模环境的转移规则。
- 它通过不断与环境交互,用实际观察到的 ( s’ ) 替代对所有可能下一状态的概率加权求和,简化了学习过程。
-
仅需依赖的信息
- 当前状态 ( s ) 和可选动作 ( a )(通过
get_legal_actions获取)。 - 执行动作后的即时奖励 ( r )(环境反馈)。
- 执行动作后到达的实际下一状态 ( s’ )(环境反馈)。
- 当前状态 ( s ) 和可选动作 ( a )(通过
-
与“有模型算法”的区别
- 有模型算法(如动态规划)需要预先知道 ( P(s’|s,a) ) 和奖励函数 ( R(s,a) ),才能计算最优策略。
- Q-learning 完全不需要这些先验知识,仅通过试错(trial and error)从交互中学习,更适用于环境模型未知的场景。
首先安装环境依赖
!pip3 install -q gymnasium[classic-control]
接着定义强化学习框架。主要用于训练、评估和可视化不同智能体(agent)在特定环境中的表现。
import os
import sys
import pandas as pd
from IPython.display import clear_output
import random
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import clear_output
from tqdm import tqdm, trange
%matplotlib inline
SEED = 42
if "google.colab" in sys.modules and not os.path.exists(".setup_complete"):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/setup_colab.sh -O- | bash
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ["DISPLAY"] = ":1"
def moving_average(x, span=100):
return pd.DataFrame({"x": np.asarray(x)}).x.ewm(span=span).mean().values
def seed_everything(env, seed=None):
if seed is None:
seed = SEED
random.seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
np.random.seed(seed)
env.reset(seed=seed)
# 可视化智能体在环境中的行为,实时显示每一步的状态和累积奖励
def visualize_agent(env, agent, max_steps=100, delay=0.1):
"""
Visualize the agent's behavior in the environment.
Args:
env: The environment
agent: The trained agent
max_steps: Maximum number of steps to take
delay: Time delay between steps for visualization
"""
s, _ = env.reset()
total_reward = 0
for step in range(max_steps):
# Render the environment
clear_output(True)
plt.figure(figsize=(8, 6))
plt.imshow(env.render())
plt.title(f"Step: {step}, Total Reward: {total_reward:.2f}")
plt.axis("off")
plt.show()
# Get action from the agent
a = agent.get_best_action(s) # Use best action for visualization
# Take a step in the environment
next_s, r, done, _, _ = env.step(a)
# Update state and reward
s = next_s
total_reward += r
# Add delay for better visualization
time.sleep(delay)
if done:
# Show final state
clear_output(True)
plt.figure(figsize=(8, 6))
plt.imshow(env.render())
plt.title(f"Final State - Steps: {step + 1}, Total Reward: {total_reward:.2f}")
plt.axis("off")
plt.show()
break
# 同时训练多个不同的智能体并比较它们的性能
# 支持多组随机种子实验以确保结果稳定性
# 实时绘制平均奖励曲线及置信区间(标准差)
def benchmark_agents(
exp_setups,
num_episodes=1000,
plot_every=100,
t_max=10000,
span=100,
patch_every=None,
patch_foo=None,
num_seeds=3,
):
all_rewards = {}
envs = {exp_setup["name"]: exp_setup["env"]() for exp_setup in exp_setups}
agents_buiders = {exp_setup["name"]: exp_setup["agent_builder"] for exp_setup in exp_setups}
train_foo = {exp_setup["name"]: exp_setup["train_foo"] for exp_setup in exp_setups}
for seed in range(num_seeds):
SEED = seed + 42 # Using different seeds
agents = {agent_name: agent() for agent_name, agent in agents_buiders.items()}
# Create a separate environment for each agent using the env function
for agent_name, agent in agents.items():
agents[agent_name].env = envs[agent_name]
seed_rewards = {agent_name: [] for agent_name in agents_buiders}
# Seed each environment separately
for agent_name in agents:
seed_everything(envs[agent_name], seed=SEED)
tbar = trange(num_episodes)
tbar.set_description(f"Seed {seed + 1}/{num_seeds}")
for i in tbar:
for agent_name, agent in agents.items():
seed_rewards[agent_name].append(train_foo[agent_name](envs[agent_name], agent))
if i % 10 == 0:
tbar.set_postfix({agent_name: seed_rewards[agent_name][-1] for agent_name in agents}, refresh=True)
# Store rewards for this seed
for agent_name, rewards_list in seed_rewards.items():
if agent_name not in all_rewards:
all_rewards[agent_name] = []
all_rewards[agent_name].append(rewards_list)
# Average rewards across seeds
avg_rewards = {
agent_name: np.mean(np.array(seed_results), axis=0) for agent_name, seed_results in all_rewards.items()
}
# Calculate standard deviation for confidence intervals
std_rewards = {
agent_name: np.std(np.array(seed_results), axis=0) for agent_name, seed_results in all_rewards.items()
}
# Plot average performance across seeds with confidence tubes
clear_output(True)
plt.figure(figsize=(10, 6))
for agent_name, rewards_list in avg_rewards.items():
mean_rewards = moving_average(rewards_list, span=span)
std_rewards_smoothed = moving_average(std_rewards[agent_name], span=span)
# Plot mean line
plt.plot(mean_rewards, label=f"{agent_name} (avg of {num_seeds} seeds)")
# Plot confidence tubes (mean ± std)
plt.fill_between(
range(len(mean_rewards)),
mean_rewards - std_rewards_smoothed,
mean_rewards + std_rewards_smoothed,
alpha=0.2,
)
# Draw solid contour lines for the confidence tube borders
plt.plot(range(len(mean_rewards)), mean_rewards - std_rewards_smoothed, "--", color="gray", alpha=0.7)
plt.plot(range(len(mean_rewards)), mean_rewards + std_rewards_smoothed, "--", color="gray", alpha=0.7)
plt.title(
f"{envs[list(envs.keys())[0]].spec.id} - Average performance across {num_seeds} seeds with confidence intervals"
)
plt.legend()
plt.show()
return avg_rewards
实现Q-learning agent
import math
import random
from collections import defaultdict
import numpy as np
class QLearningAgent:
def __init__(self, alpha, epsilon, discount, env):
"""
Q-Learning Agent
based on https://inst.eecs.berkeley.edu/~cs188/sp19/projects.html
Instance variables you have access to
- self.epsilon (exploration prob)
- self.alpha (learning rate)
- self.discount (discount rate aka gamma)
Functions you should use
- self.get_legal_actions(state) {state, hashable -> list of actions, each is hashable}
which returns legal actions for a state
- self.get_qvalue(state,action)
which returns Q(state,action)
- self.set_qvalue(state,action,value)
which sets Q(state,action) := value
!!!Important!!!
Note: please avoid using self._qValues directly.
There's a special self.get_qvalue/set_qvalue for that.
"""
self.env = env
self._qvalues = defaultdict(lambda: defaultdict(lambda: 0))
self.alpha = alpha
self.epsilon = epsilon
self.discount = discount
def get_legal_actions(self, _state):
return list(range(self.env.action_space.n))
def get_qvalue(self, state, action):
"""Returns Q(state,action)"""
return self._qvalues[state][action]
def set_qvalue(self, state, action, value):
"""Sets the Qvalue for [state,action] to the given value"""
self._qvalues[state][action] = value
# ---------------------START OF YOUR CODE---------------------#
def get_value(self, state):
"""
Compute your agent's estimate of V(s) using current q-values
V(s) = max_over_action Q(state,action) over possible actions.
Note: please take into account that q-values can be negative.
"""
possible_actions = self.get_legal_actions(state)
# If there are no legal actions, return 0.0
if len(possible_actions) == 0:
return 0.0
value = max(self.get_qvalue(state, action) for action in possible_actions)
return value
def update(self, state, action, reward, next_state, *args, **kwargs):
"""
You should do your Q-Value update here:
Q(s,a) := (1 - alpha) * Q(s,a) + alpha * (r + gamma * V(s'))
"""
# agent parameters
gamma = self.discount
learning_rate = self.alpha
current_q = self.get_qvalue(state, action)
next_v = self.get_value(next_state)
new_q_value = (1 - learning_rate) * current_q + learning_rate * (reward + gamma * next_v)
self.set_qvalue(state, action, new_q_value)
def get_best_action(self, state):
"""
Compute the best action to take in a state (using current q-values).
"""
possible_actions = self.get_legal_actions(state)
# If there are no legal actions, return None
if len(possible_actions) == 0:
return None
max_q = max(self.get_qvalue(state, action) for action in possible_actions)
best_actions = [action for action in possible_actions if self.get_qvalue(state, action) == max_q]
best_action = best_actions[0]
return best_action
def get_action(self, state):
"""
Compute the action to take in the current state, including exploration.
With probability self.epsilon, we should take a random action.
otherwise - the best policy action (self.get_best_action).
Note: To pick randomly from a list, use random.choice(list).
To pick True or False with a given probablity, generate uniform number in [0, 1]
and compare it with your probability
"""
# Pick Action
possible_actions = self.get_legal_actions(state)
action = None
# If there are no legal actions, return None
if len(possible_actions) == 0:
return None
# agent parameters:
epsilon = self.epsilon
# Tip: Use self.env.np_random.random() to generate a random number
if self.env.np_random.random() < epsilon:
chosen_action = random.choice(possible_actions)
else:
chosen_action = self.get_best_action(state)
return chosen_action
定义Taxi-v3环境
import gymnasium as gym
env = gym.make("Taxi-v3", render_mode="rgb_array")
n_actions = env.action_space.n

可视化初始状态
s, _ = env.reset(seed=SEED)
plt.imshow(env.render())
plt.show()
模型训练循环
def play_and_train(env, agent, t_max=10**4):
"""
This function should
- run a full game, actions given by agent's e-greedy policy
- train agent using agent.update(...) whenever it is possible
- return total reward
"""
total_reward = 0.0
s, _ = env.reset()
for t in range(t_max):
# get agent to pick action given state s.
a = agent.get_action(s)
next_s, r, terminated, truncated, _ = env.step(a)
done = terminated
# train (update) agent for state s
agent.update(s, a, r, next_s)
s = next_s
total_reward += r
if done:
break
return total_reward
用 Q-Learning 算法训练智能体在 Taxi-v3 环境中学习最优策略,并实时监控训练效果。探索率(epsilon)按 0.99 的比例衰减,减少后期随机探索,让智能体更倾向于选择已知最优动作。
from IPython.display import clear_output
agent = QLearningAgent(alpha=0.5, epsilon=0.25, discount=0.99, env=env)
rewards = []
seed_everything(env)
for i in range(1000):
rewards.append(play_and_train(env, agent))
agent.epsilon *= 0.99
if i % 100 == 0:
clear_output(True)
plt.title("eps = {:e}, mean reward = {:.1f}".format(agent.epsilon, np.mean(rewards[-10:])))
plt.plot(rewards)
plt.plot(moving_average(rewards))
plt.show()
assert env.unwrapped.spec.id == "Taxi-v3" and np.mean(rewards[-100:]) >= 4.5, (
"Please make sure that your agent is able to learn the optimal policy"
)
最终所得训练的结果图如下所示
离散化状态空间
Q-Learning 算法需要离散状态,但是一些环境的状态是连续值(如小车位置、杆的角度等),解决方案是通过 “离散化” 处理连续状态 —— 将每个连续状态维度按一定精度四舍五入,再组合成元组(可作为字典键或 Q 表索引),让算法能够适用。
关键难点:为每个状态维度选择合适的四舍五入精度(n_digits),精度太高会导致状态数量过多(维度灾难),精度太低会丢失关键信息。
定义CartPole-v0环境,并打印初始状态
def make_env():
return gym.make("CartPole-v0", render_mode="rgb_array").env # .env unwraps the TimeLimit wrapper
env = make_env()
n_actions = env.action_space.n
print("first state: %s" % (env.reset()[0]))
plt.imshow(env.render())

我们需要估计观测分布。为此,我们将进行若干次试验,并记录所有状态。
可视化 OpenAI Gym 中 CartPole(倒立摆)环境的观测数据分布
def visualize_cartpole_observation_distribution(seen_observations):
seen_observations = np.array(seen_observations)
# The meaning of the observations is documented in
# https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py
# Get the number of dimensions from the state
n_dims = seen_observations.shape[1]
f, axarr = plt.subplots(1, n_dims, figsize=(16, 4), sharey=True)
titles = ["Cart Position", "Cart Velocity", "Pole Angle", "Pole Velocity At Tip"]
for i in range(n_dims):
ax = axarr[i]
ax.hist(seen_observations[:, i], bins=20)
ax.set_title(titles[i])
xmin, xmax = ax.get_xlim()
ax.set_xlim(min(xmin, -xmax), max(-xmin, xmax))
ax.grid()
f.tight_layout()
进行试验,采样可能的状态
def gather_samples(env, max_steps=100000):
seen_observations = []
total_steps = 0
while total_steps < max_steps:
s, _ = env.reset()
seen_observations.append(s)
done = False
while not done and total_steps < max_steps:
s, r, done, _, _ = env.step(env.action_space.sample())
seen_observations.append(s)
total_steps += 1
if total_steps >= max_steps:
break
return seen_observations
unwraped_env_samples = gather_samples(env)
visualize_cartpole_observation_distribution(unwraped_env_samples)
plt.show()
所得的状态分布如下
定义观测包装器(ObservationWrapper),把 CartPole 等环境的连续值观测状态转换成离散值状态,解决连续状态无法直接用于 Q-Learning 等强化学习算法的问题。
from gymnasium.core import ObservationWrapper
class Discretizer(ObservationWrapper):
def __init__(self, env, n_digits):
super().__init__(env)
self.n_digits = n_digits
def observation(self, state):
# Hint: you can do that with round(x, n_digits).
# You may pick a different n_digits for each dimension.
state = [round(x, self.n_digits) for x in state]
return tuple(state) # tuple to make it hashable
将观测值转换为离散值
env = Discretizer(make_env(), n_digits=1)
seen_observations = gather_samples(env)
visualize_cartpole_observation_distribution(seen_observations)
plt.show()
所得的离散值分布如下
学习离散化策略
接下来我们训练一个使用离散化状态空间的策略。
提示:
- 增加观测值某一维度的小数位数,会使状态空间的规模以10 倍的因子扩大。
- 若离散化粒度过于精细,智能体需要远超过 10000 步才能收敛。可以选择增加迭代次数并降低 ε 衰减速率,或调整离散化方式。实际应用中发现,这类问题出现的频率相当高。
- 若离散化粒度过于粗糙,智能体可能无法找到最优策略。但在实际操作中发现,在这个特定环境下这类问题很少发生。
- 先从粗粒度的离散化开始,仅在确有必要时再提高粒度。
- 若未对 ε 进行退火(annealing)时训练效果不佳,可考虑加入该机制,但需确保 ε 不会过快衰减至零。
定义agent,绘制收益曲线
agent = QLearningAgent(alpha=0.5, epsilon=0.25, discount=0.99, env=env)
rewards, epsilons = [], []
seed_everything(env)
for i in range(10000):
reward = play_and_train(env, agent)
rewards.append(reward)
epsilons.append(agent.epsilon)
# OPTIONAL: <YOUR CODE: adjust epsilon>
if i % 1000 == 0:
rewards_ewma = moving_average(rewards)
clear_output(True)
plt.plot(rewards, label="rewards")
plt.plot(rewards_ewma, label="rewards ewma@100")
plt.legend()
plt.grid()
plt.title("eps = {:e}, rewards ewma@100 = {:.1f}".format(agent.epsilon, rewards_ewma[-1]))
plt.show()
迭代次数与所得奖励如图所示
更多推荐



所有评论(0)