Q-learning 学习

Q-learning 是无模型(model-free) 强化学习算法,不需要预先知道环境的转移概率 ( P(s’|s,a) )(即执行动作 ( a ) 从状态 ( s ) 转移到 ( s’ ) 的概率),仅依赖实际交互中观察到的“状态-动作-奖励-下一状态”(( s,a,r,s’ ))数据更新 Q 值。


  1. 无需转移概率的原因

    • Q-learning 直接学习“状态-动作对”的预期累积奖励(Q 值),而非建模环境的转移规则。
    • 它通过不断与环境交互,用实际观察到的 ( s’ ) 替代对所有可能下一状态的概率加权求和,简化了学习过程。
  2. 仅需依赖的信息

    • 当前状态 ( s ) 和可选动作 ( a )(通过 get_legal_actions 获取)。
    • 执行动作后的即时奖励 ( r )(环境反馈)。
    • 执行动作后到达的实际下一状态 ( s’ )(环境反馈)。
  3. 与“有模型算法”的区别

    • 有模型算法(如动态规划)需要预先知道 ( P(s’|s,a) ) 和奖励函数 ( R(s,a) ),才能计算最优策略。
    • Q-learning 完全不需要这些先验知识,仅通过试错(trial and error)从交互中学习,更适用于环境模型未知的场景。

首先安装环境依赖

!pip3 install -q gymnasium[classic-control]

接着定义强化学习框架。主要用于训练、评估和可视化不同智能体(agent)在特定环境中的表现。

import os
import sys
import pandas as pd
from IPython.display import clear_output

import random
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import clear_output
from tqdm import tqdm, trange

%matplotlib inline

SEED = 42

if "google.colab" in sys.modules and not os.path.exists(".setup_complete"):
    !wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/setup_colab.sh -O- | bash

    !touch .setup_complete

# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
    !bash ../xvfb start
    os.environ["DISPLAY"] = ":1"


def moving_average(x, span=100):
    return pd.DataFrame({"x": np.asarray(x)}).x.ewm(span=span).mean().values


def seed_everything(env, seed=None):
    if seed is None:
        seed = SEED
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    env.reset(seed=seed)

# 可视化智能体在环境中的行为,实时显示每一步的状态和累积奖励
def visualize_agent(env, agent, max_steps=100, delay=0.1):
    """
    Visualize the agent's behavior in the environment.

    Args:
        env: The environment
        agent: The trained agent
        max_steps: Maximum number of steps to take
        delay: Time delay between steps for visualization
    """
    s, _ = env.reset()
    total_reward = 0

    for step in range(max_steps):
        # Render the environment
        clear_output(True)
        plt.figure(figsize=(8, 6))
        plt.imshow(env.render())
        plt.title(f"Step: {step}, Total Reward: {total_reward:.2f}")
        plt.axis("off")
        plt.show()

        # Get action from the agent
        a = agent.get_best_action(s)  # Use best action for visualization

        # Take a step in the environment
        next_s, r, done, _, _ = env.step(a)

        # Update state and reward
        s = next_s
        total_reward += r

        # Add delay for better visualization
        time.sleep(delay)

        if done:
            # Show final state
            clear_output(True)
            plt.figure(figsize=(8, 6))
            plt.imshow(env.render())
            plt.title(f"Final State - Steps: {step + 1}, Total Reward: {total_reward:.2f}")
            plt.axis("off")
            plt.show()
            break

# 同时训练多个不同的智能体并比较它们的性能
# 支持多组随机种子实验以确保结果稳定性
# 实时绘制平均奖励曲线及置信区间(标准差)

def benchmark_agents(
    exp_setups,
    num_episodes=1000,
    plot_every=100,
    t_max=10000,
    span=100,
    patch_every=None,
    patch_foo=None,
    num_seeds=3,
):
    all_rewards = {}
    envs = {exp_setup["name"]: exp_setup["env"]() for exp_setup in exp_setups}
    agents_buiders = {exp_setup["name"]: exp_setup["agent_builder"] for exp_setup in exp_setups}
    train_foo = {exp_setup["name"]: exp_setup["train_foo"] for exp_setup in exp_setups}

    for seed in range(num_seeds):
        SEED = seed + 42  # Using different seeds
        agents = {agent_name: agent() for agent_name, agent in agents_buiders.items()}

        # Create a separate environment for each agent using the env function
        for agent_name, agent in agents.items():
            agents[agent_name].env = envs[agent_name]

        seed_rewards = {agent_name: [] for agent_name in agents_buiders}

        # Seed each environment separately
        for agent_name in agents:
            seed_everything(envs[agent_name], seed=SEED)

        tbar = trange(num_episodes)
        tbar.set_description(f"Seed {seed + 1}/{num_seeds}")
        for i in tbar:
            for agent_name, agent in agents.items():
                seed_rewards[agent_name].append(train_foo[agent_name](envs[agent_name], agent))
            if i % 10 == 0:
                tbar.set_postfix({agent_name: seed_rewards[agent_name][-1] for agent_name in agents}, refresh=True)

        # Store rewards for this seed
        for agent_name, rewards_list in seed_rewards.items():
            if agent_name not in all_rewards:
                all_rewards[agent_name] = []
            all_rewards[agent_name].append(rewards_list)

        # Average rewards across seeds
        avg_rewards = {
            agent_name: np.mean(np.array(seed_results), axis=0) for agent_name, seed_results in all_rewards.items()
        }

        # Calculate standard deviation for confidence intervals
        std_rewards = {
            agent_name: np.std(np.array(seed_results), axis=0) for agent_name, seed_results in all_rewards.items()
        }

        # Plot average performance across seeds with confidence tubes
        clear_output(True)
        plt.figure(figsize=(10, 6))
        for agent_name, rewards_list in avg_rewards.items():
            mean_rewards = moving_average(rewards_list, span=span)
            std_rewards_smoothed = moving_average(std_rewards[agent_name], span=span)

            # Plot mean line
            plt.plot(mean_rewards, label=f"{agent_name} (avg of {num_seeds} seeds)")

            # Plot confidence tubes (mean ± std)
            plt.fill_between(
                range(len(mean_rewards)),
                mean_rewards - std_rewards_smoothed,
                mean_rewards + std_rewards_smoothed,
                alpha=0.2,
            )

            # Draw solid contour lines for the confidence tube borders
            plt.plot(range(len(mean_rewards)), mean_rewards - std_rewards_smoothed, "--", color="gray", alpha=0.7)
            plt.plot(range(len(mean_rewards)), mean_rewards + std_rewards_smoothed, "--", color="gray", alpha=0.7)

        plt.title(
            f"{envs[list(envs.keys())[0]].spec.id} - Average performance across {num_seeds} seeds with confidence intervals"
        )
        plt.legend()
        plt.show()

    return avg_rewards

实现Q-learning agent

import math
import random
from collections import defaultdict

import numpy as np


class QLearningAgent:
    def __init__(self, alpha, epsilon, discount, env):
        """
        Q-Learning Agent
        based on https://inst.eecs.berkeley.edu/~cs188/sp19/projects.html
        Instance variables you have access to
          - self.epsilon (exploration prob)
          - self.alpha (learning rate)
          - self.discount (discount rate aka gamma)

        Functions you should use
          - self.get_legal_actions(state) {state, hashable -> list of actions, each is hashable}
            which returns legal actions for a state
          - self.get_qvalue(state,action)
            which returns Q(state,action)
          - self.set_qvalue(state,action,value)
            which sets Q(state,action) := value
        !!!Important!!!
        Note: please avoid using self._qValues directly.
            There's a special self.get_qvalue/set_qvalue for that.
        """

        self.env = env
        self._qvalues = defaultdict(lambda: defaultdict(lambda: 0))
        self.alpha = alpha
        self.epsilon = epsilon
        self.discount = discount

    def get_legal_actions(self, _state):
        return list(range(self.env.action_space.n))

    def get_qvalue(self, state, action):
        """Returns Q(state,action)"""
        return self._qvalues[state][action]

    def set_qvalue(self, state, action, value):
        """Sets the Qvalue for [state,action] to the given value"""
        self._qvalues[state][action] = value

    # ---------------------START OF YOUR CODE---------------------#

    def get_value(self, state):
        """
        Compute your agent's estimate of V(s) using current q-values
        V(s) = max_over_action Q(state,action) over possible actions.
        Note: please take into account that q-values can be negative.
        """
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return 0.0
        if len(possible_actions) == 0:
            return 0.0

        value = max(self.get_qvalue(state, action) for action in possible_actions)

        return value

    def update(self, state, action, reward, next_state, *args, **kwargs):
        """
        You should do your Q-Value update here:
           Q(s,a) := (1 - alpha) * Q(s,a) + alpha * (r + gamma * V(s'))
        """

        # agent parameters
        gamma = self.discount
        learning_rate = self.alpha

        current_q = self.get_qvalue(state, action)
        next_v = self.get_value(next_state)
        new_q_value = (1 - learning_rate) * current_q + learning_rate * (reward + gamma * next_v)

        self.set_qvalue(state, action, new_q_value)

    def get_best_action(self, state):
        """
        Compute the best action to take in a state (using current q-values).
        """
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return None
        if len(possible_actions) == 0:
            return None

        max_q = max(self.get_qvalue(state, action) for action in possible_actions)
        best_actions = [action for action in possible_actions if self.get_qvalue(state, action) == max_q]
        best_action = best_actions[0] 
   
        return best_action

    def get_action(self, state):
        """
        Compute the action to take in the current state, including exploration.
        With probability self.epsilon, we should take a random action.
            otherwise - the best policy action (self.get_best_action).

        Note: To pick randomly from a list, use random.choice(list).
              To pick True or False with a given probablity, generate uniform number in [0, 1]
              and compare it with your probability
        """

        # Pick Action
        possible_actions = self.get_legal_actions(state)
        action = None

        # If there are no legal actions, return None
        if len(possible_actions) == 0:
            return None

        # agent parameters:
        epsilon = self.epsilon

        # Tip: Use self.env.np_random.random() to generate a random number
        if self.env.np_random.random() < epsilon:
            chosen_action = random.choice(possible_actions)
        else:
            chosen_action = self.get_best_action(state)

        return chosen_action

定义Taxi-v3环境

import gymnasium as gym

env = gym.make("Taxi-v3", render_mode="rgb_array")

n_actions = env.action_space.n

在这里插入图片描述

可视化初始状态

s, _ = env.reset(seed=SEED)
plt.imshow(env.render())
plt.show()

模型训练循环

def play_and_train(env, agent, t_max=10**4):
    """
    This function should
    - run a full game, actions given by agent's e-greedy policy
    - train agent using agent.update(...) whenever it is possible
    - return total reward
    """
    total_reward = 0.0
    s, _ = env.reset()

    for t in range(t_max):
        # get agent to pick action given state s.
        a = agent.get_action(s)

        next_s, r, terminated, truncated, _ = env.step(a)
        done = terminated
        # train (update) agent for state s
        agent.update(s, a, r, next_s)

        s = next_s
        total_reward += r
        if done:
            break

    return total_reward

用 Q-Learning 算法训练智能体在 Taxi-v3 环境中学习最优策略,并实时监控训练效果。探索率(epsilon)按 0.99 的比例衰减,减少后期随机探索,让智能体更倾向于选择已知最优动作。

from IPython.display import clear_output

agent = QLearningAgent(alpha=0.5, epsilon=0.25, discount=0.99, env=env)

rewards = []
seed_everything(env)

for i in range(1000):
    rewards.append(play_and_train(env, agent))
    agent.epsilon *= 0.99

    if i % 100 == 0:
        clear_output(True)
        plt.title("eps = {:e}, mean reward = {:.1f}".format(agent.epsilon, np.mean(rewards[-10:])))
        plt.plot(rewards)
        plt.plot(moving_average(rewards))
        plt.show()

assert env.unwrapped.spec.id == "Taxi-v3" and np.mean(rewards[-100:]) >= 4.5, (
    "Please make sure that your agent is able to learn the optimal policy"
)

最终所得训练的结果图如下所示
在这里插入图片描述

离散化状态空间

Q-Learning 算法需要离散状态,但是一些环境的状态是连续值(如小车位置、杆的角度等),解决方案是通过 “离散化” 处理连续状态 —— 将每个连续状态维度按一定精度四舍五入,再组合成元组(可作为字典键或 Q 表索引),让算法能够适用。
关键难点:为每个状态维度选择合适的四舍五入精度(n_digits),精度太高会导致状态数量过多(维度灾难),精度太低会丢失关键信息。

定义CartPole-v0环境,并打印初始状态

def make_env():
    return gym.make("CartPole-v0", render_mode="rgb_array").env  # .env unwraps the TimeLimit wrapper


env = make_env()
n_actions = env.action_space.n

print("first state: %s" % (env.reset()[0]))
plt.imshow(env.render())

在这里插入图片描述
我们需要估计观测分布。为此,我们将进行若干次试验,并记录所有状态。
可视化 OpenAI Gym 中 CartPole(倒立摆)环境的观测数据分布

def visualize_cartpole_observation_distribution(seen_observations):
    seen_observations = np.array(seen_observations)

    # The meaning of the observations is documented in
    # https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py

    # Get the number of dimensions from the state
    n_dims = seen_observations.shape[1]

    f, axarr = plt.subplots(1, n_dims, figsize=(16, 4), sharey=True)
    titles = ["Cart Position", "Cart Velocity", "Pole Angle", "Pole Velocity At Tip"]

    for i in range(n_dims):
        ax = axarr[i]
        ax.hist(seen_observations[:, i], bins=20)
        ax.set_title(titles[i])
        xmin, xmax = ax.get_xlim()
        ax.set_xlim(min(xmin, -xmax), max(-xmin, xmax))
        ax.grid()
    f.tight_layout()

进行试验,采样可能的状态

def gather_samples(env, max_steps=100000):
    seen_observations = []
    total_steps = 0

    while total_steps < max_steps:
        s, _ = env.reset()
        seen_observations.append(s)
        done = False

        while not done and total_steps < max_steps:
            s, r, done, _, _ = env.step(env.action_space.sample())
            seen_observations.append(s)
            total_steps += 1

        if total_steps >= max_steps:
            break

    return seen_observations


unwraped_env_samples = gather_samples(env)
visualize_cartpole_observation_distribution(unwraped_env_samples)
plt.show()

所得的状态分布如下
在这里插入图片描述
定义观测包装器(ObservationWrapper),把 CartPole 等环境的连续值观测状态转换成离散值状态,解决连续状态无法直接用于 Q-Learning 等强化学习算法的问题。

from gymnasium.core import ObservationWrapper

class Discretizer(ObservationWrapper):
    def __init__(self, env, n_digits):
        super().__init__(env)
        self.n_digits = n_digits

    def observation(self, state):
        # Hint: you can do that with round(x, n_digits).
        # You may pick a different n_digits for each dimension.
        state = [round(x, self.n_digits) for x in state]

        return tuple(state)  # tuple to make it hashable

将观测值转换为离散值

env = Discretizer(make_env(), n_digits=1)
seen_observations = gather_samples(env)
visualize_cartpole_observation_distribution(seen_observations)
plt.show()

所得的离散值分布如下
在这里插入图片描述

学习离散化策略

接下来我们训练一个使用离散化状态空间的策略。
提示:

  1. 增加观测值某一维度的小数位数,会使状态空间的规模以10 倍的因子扩大。
  2. 若离散化粒度过于精细,智能体需要远超过 10000 步才能收敛。可以选择增加迭代次数并降低 ε 衰减速率,或调整离散化方式。实际应用中发现,这类问题出现的频率相当高。
  3. 若离散化粒度过于粗糙,智能体可能无法找到最优策略。但在实际操作中发现,在这个特定环境下这类问题很少发生。
  4. 先从粗粒度的离散化开始,仅在确有必要时再提高粒度。
  5. 若未对 ε 进行退火(annealing)时训练效果不佳,可考虑加入该机制,但需确保 ε 不会过快衰减至零。

定义agent,绘制收益曲线

agent = QLearningAgent(alpha=0.5, epsilon=0.25, discount=0.99, env=env)

rewards, epsilons = [], []
seed_everything(env)

for i in range(10000):
    reward = play_and_train(env, agent)
    rewards.append(reward)
    epsilons.append(agent.epsilon)

    # OPTIONAL: <YOUR CODE: adjust epsilon>

    if i % 1000 == 0:
        rewards_ewma = moving_average(rewards)

        clear_output(True)
        plt.plot(rewards, label="rewards")
        plt.plot(rewards_ewma, label="rewards ewma@100")
        plt.legend()
        plt.grid()
        plt.title("eps = {:e}, rewards ewma@100 = {:.1f}".format(agent.epsilon, rewards_ewma[-1]))
        plt.show()

迭代次数与所得奖励如图所示
在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐