引言:从理论设计到工程实践的跨越

在完成了多智能体强化学习(MARL)问题的严谨形式化设计后,我们来到了整个项目最激动人心的阶段——算法实现与训练验证。理论上的优雅设计需要在代码的“熔炉”中经受考验,而交通流优化的成效最终要体现在车辆排队缩短、通行速度提升的直观画面上。本阶段,我们将选择并实现两种具有代表性的MARL算法——独立Q学习(IQL)多智能体近端策略优化(MAPPO),在SUMO构建的“田”字形路网中对四个信号灯智能体进行训练,并通过详实的可视化对比,揭示从“杂乱无章”到“协同有序”的演进过程。

我们将深入算法核心,剖析代码细节,并直面工程实现中的真实挑战。这不仅是一次技术实践,更是理解分布式智能如何从数据中涌现协同策略的绝佳范例。

第一部分:算法选择与实现架构

1.1 为什么选择IQL和MAPPO?

在众多MARL算法中,我们选择这对“组合”进行对比实现,具有深刻的考量:

  • 独立Q学习 (IQL):作为基线算法,它是最简单的去中心化方法。每个智能体将自己视作单智能体,忽略其他智能体的存在,独立学习自己的Q函数。其价值在于:
    1. 实现简单,是检验问题是否“可学”的第一步。
    2. 暴露非平稳性问题:其训练的不稳定性直观展示了多智能体环境中独立学习的根本困境,为理解更高级算法的重要性提供了反面教材。
  • 多智能体近端策略优化 (MAPPO):作为当前最先进的CTDE(集中训练,分散执行)策略梯度算法之一,它代表了解决协同问题的主流方向。其优势在于:
    1. 策略优化:直接优化策略,通常比基于价值的Q学习更稳定、采样效率更高。
    2. CTDE架构:训练时利用全局信息(集中式批评家Critic)指导个体策略(行动者Actor)更新,有效应对信用分配和环境非平稳性;执行时个体仅依赖局部观察。
    3. PPO的鲁棒性:继承了PPO的信任域约束和裁剪机制,防止策略更新步幅过大,保证训练稳定性。

通过对比IQL的“挣扎”与MAPPO的“协同”,我们能深刻领悟MARL设计思想的力量。

1.2 整体训练框架设计

我们采用模块化设计,构建一个通用的训练框架,可灵活切换IQL和MAPPO算法。

project/
│
├── envs/
│   └── multi_agent_sumo_env.py   # 核心:多智能体SUMO环境封装
│
├── algorithms/
│   ├── iql/
│   │   ├── iql_agent.py          # IQL智能体网络
│   │   └── iql_trainer.py        # IQL训练循环
│   │
│   └── mappo/
│       ├── networks.py           # Actor-Critic网络结构
│       ├── mappo_trainer.py      # MAPPO训练循环 (含集中式Critic)
│       └── storage.py            # 经验存储与采样 (支持GAE)
│
├── configs/
│   ├── grid4x4_config.yaml       # 实验配置文件
│   └── mappo_params.yaml         # MAPPO超参数
│
├── models/                       # 保存训练好的模型
├── results/                      # 训练日志、评估数据
└── visualize/                    # 可视化脚本与结果

第二部分:核心代码实现剖析

2.1 多智能体SUMO环境封装 (MultiAgentSumoEnv)

这是连接SUMO仿真与RL算法的桥梁,其设计至关重要。

import traci
import numpy as np
from gym import spaces

class MultiAgentSumoEnv:
    def __init__(self, config):
        self.sumo_cfg = config['sumo_cfg']
        self.agents = config['agent_ids']  # e.g., ['tl_n1', 'tl_n2', ...]
        self.sim_step = 0
        self.decision_interval = 10  # 每10个SUMO步(1秒/步)做一次决策
        self.max_steps = config['max_steps']
        
        # 定义每个智能体的观察和动作空间
        self.observation_spaces = {}
        self.action_spaces = {}
        for aid in self.agents:
            # 观察:例如[各车道排队(4), 各车道速度(4), 相位(1), 相位时间(1)] -> 10维
            self.observation_spaces[aid] = spaces.Box(low=0, high=np.inf, shape=(10,))
            # 动作:离散,0=保持当前相位,1=切换到下一相位
            self.action_spaces[aid] = spaces.Discrete(2)
        
        self._launch_sumo()

    def reset(self):
        """重置环境,返回所有智能体的初始观察字典"""
        traci.load(["-c", self.sumo_cfg, "--start"])
        self.sim_step = 0
        observations = {}
        for aid in self.agents:
            observations[aid] = self._get_agent_obs(aid)
        return observations

    def step(self, actions):
        """
        执行联合动作。
        :param actions: 字典 {agent_id: action}
        :return: obs_dict, reward_dict, done_dict, info
        """
        # 1. 执行动作:设置信号灯相位
        for aid, action in actions.items():
            if action == 1:  # 切换相位
                self._change_phase(aid)
        
        # 2. 推进仿真多个步长(对应一个决策周期)
        local_rewards = {aid: 0 for aid in self.agents}
        for _ in range(self.decision_interval):
            traci.simulationStep()
            self.sim_step += 1
            # 累积每个步长的局部奖励(如负排队长度)
            for aid in self.agents:
                local_rewards[aid] += self._compute_local_reward(aid)
        
        # 3. 获取下一观察,判断是否结束
        next_obs = {aid: self._get_agent_obs(aid) for aid in self.agents}
        dones = {aid: (self.sim_step >= self.max_steps) for aid in self.agents}
        dones['__all__'] = all(dones.values())
        
        # 4. 计算全局奖励(用于MAPPO的集中式Critic)和收集信息
        global_reward = self._compute_global_reward()  # 例如,负的总旅行时间增量
        info = {'global_reward': global_reward, 'step': self.sim_step}
        
        return next_obs, local_rewards, dones, info
    
    def _get_agent_obs(self, agent_id):
        """获取单个智能体的局部观察"""
        obs = []
        # 获取该信号灯控制的所有进口车道ID (需预先在路网中定义映射关系)
        for lane in self.agent_lane_map[agent_id]['incoming']:
            obs.append(traci.lane.getLastStepHaltingNumber(lane))
            obs.append(traci.lane.getLastStepMeanSpeed(lane))
        # 相位信息
        obs.append(traci.trafficlight.getPhase(agent_id))
        obs.append(traci.trafficlight.getPhaseDuration(agent_id))
        return np.array(obs, dtype=np.float32)
    
    def _compute_global_reward(self):
        """计算全局奖励,如负的总排队长度"""
        total_queue = 0
        for lane in traci.lane.getIDList():
            total_queue += traci.lane.getLastStepHaltingNumber(lane)
        return -total_queue
    
    def get_global_state(self):
        """为集中式Critic提供全局状态(例如,关键链路的车辆密度矩阵)"""
        # 这是一个简化示例,实际中可以构建一个路网特征的矩阵表示
        state = []
        for edge in self.monitored_edges:
            density = traci.edge.getLastStepVehicleNumber(edge) / traci.edge.getLength(edge)
            state.append(density)
        return np.array(state, dtype=np.float32)

2.2 独立Q学习 (IQL) 实现

IQL的实现本质上是为每个智能体运行一个独立的DQN。

import torch
import torch.nn as nn
import random
from collections import deque

class IQLAgent(nn.Module):
    def __init__(self, obs_dim, act_dim, hidden_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, act_dim)
        )
    
    def forward(self, obs):
        return self.net(obs)

class IQLTrainer:
    def __init__(self, env, agent_ids, obs_dim, act_dim):
        self.env = env
        self.agents = {aid: IQLAgent(obs_dim, act_dim) for aid in agent_ids}
        self.optimizers = {aid: torch.optim.Adam(agent.parameters(), lr=1e-4) for aid, agent in self.agents.items()}
        self.replay_buffers = {aid: deque(maxlen=50000) for aid in agent_ids}
        self.gamma = 0.99
        self.batch_size = 64
        
    def train_episode(self):
        obs_dict = self.env.reset()
        total_reward = 0
        while True:
            # 1. 每个智能体根据自身观察选择动作(ε-greedy)
            actions = {}
            for aid, agent in self.agents.items():
                obs = torch.FloatTensor(obs_dict[aid]).unsqueeze(0)
                if random.random() < self.epsilon:  # 探索
                    action = random.randint(0, self.env.action_spaces[aid].n-1)
                else:  # 利用
                    with torch.no_grad():
                        q_values = agent(obs)
                        action = q_values.argmax().item()
                actions[aid] = action
            
            # 2. 环境步进
            next_obs_dict, reward_dict, done_dict, _ = self.env.step(actions)
            
            # 3. 为每个智能体独立存储经验
            for aid in self.agents.keys():
                self.replay_buffers[aid].append((
                    obs_dict[aid],
                    actions[aid],
                    reward_dict[aid],
                    next_obs_dict[aid],
                    done_dict[aid]
                ))
            
            # 4. 为每个智能体独立更新Q网络
            for aid, agent in self.agents.items():
                if len(self.replay_buffers[aid]) >= self.batch_size:
                    self._update_agent(aid)
            
            obs_dict = next_obs_dict
            total_reward += sum(reward_dict.values())
            
            if done_dict['__all__']:
                break
        
        return total_reward
    
    def _update_agent(self, agent_id):
        """标准的DQN更新步骤,但完全独立"""
        batch = random.sample(self.replay_buffers[agent_id], self.batch_size)
        obs, actions, rewards, next_obs, dones = zip(*batch)
        
        obs = torch.FloatTensor(obs)
        actions = torch.LongTensor(actions).unsqueeze(1)
        rewards = torch.FloatTensor(rewards)
        next_obs = torch.FloatTensor(next_obs)
        dones = torch.FloatTensor(dones)
        
        agent = self.agents[agent_id]
        # 当前Q值
        current_q = agent(obs).gather(1, actions).squeeze()
        # 目标Q值
        with torch.no_grad():
            next_q = agent(next_obs).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)
        
        # 计算损失并更新
        loss = nn.MSELoss()(current_q, target_q)
        self.optimizers[agent_id].zero_grad()
        loss.backward()
        self.optimizers[agent_id].step()

IQL的关键缺陷:在上面的_update_agent函数中,每个智能体用自己的next_obs计算目标Q值时,隐含地假设了环境是平稳的。然而,next_obs是由所有智能体的联合动作共同产生的,当其他智能体策略变化时,这个转移函数就变了,导致学习目标非平稳,这是IQL训练震荡、难以收敛的根本原因。

2.3 多智能体近端策略优化 (MAPPO) 实现

MAPPO的实现更为复杂,核心在于集中式Critic和PPO裁剪机制的运用。

# networks.py - 定义Actor和Centralized Critic网络
class Actor(nn.Module):
    """策略网络,每个智能体独立一个"""
    def __init__(self, obs_dim, act_dim, hidden_dim=64):
        super().__init__()
        self.actor = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, act_dim),
            nn.Softmax(dim=-1)  # 输出动作概率分布
        )
    
    def forward(self, obs):
        return self.actor(obs)
    
    def get_action(self, obs, deterministic=False):
        probs = self.forward(obs)
        dist = torch.distributions.Categorical(probs)
        if deterministic:
            action = dist.probs.argmax(dim=-1)
        else:
            action = dist.sample()
        log_prob = dist.log_prob(action)
        return action, log_prob

class CentralizedCritic(nn.Module):
    """集中式价值网络,输入所有智能体的观察和全局状态"""
    def __init__(self, num_agents, obs_dim, state_dim, hidden_dim=128):
        super().__init__()
        # 首先处理每个智能体的观察
        self.obs_encoder = nn.Linear(obs_dim, hidden_dim // 2)
        # 然后与全局状态融合
        critic_input_dim = (hidden_dim // 2) * num_agents + state_dim
        self.critic = nn.Sequential(
            nn.Linear(critic_input_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)  # 输出一个标量价值
        )
    
    def forward(self, obs_batch, state_batch):
        # obs_batch: [batch_size, num_agents, obs_dim]
        # state_batch: [batch_size, state_dim]
        batch_size = obs_batch.shape[0]
        # 编码每个智能体的观察
        encoded_obs = torch.relu(self.obs_encoder(obs_batch))  # [batch, num_agents, hidden//2]
        # 展平所有智能体的编码信息
        encoded_obs_flat = encoded_obs.view(batch_size, -1)  # [batch, num_agents * hidden//2]
        # 与全局状态拼接
        critic_input = torch.cat([encoded_obs_flat, state_batch], dim=-1)
        value = self.critic(critic_input)
        return value

# mappo_trainer.py - 核心训练循环 (简化版,省略了GAE和许多优化细节)
class MAPPOTrainer:
    def __init__(self, env, config):
        self.env = env
        self.num_agents = len(env.agents)
        self.actors = {aid: Actor(obs_dim, act_dim) for aid, (obs_dim, act_dim) in env.spaces.items()}
        self.critic = CentralizedCritic(self.num_agents, obs_dim, state_dim=env.global_state_dim)
        # ... 初始化优化器、超参数 (clip_param=0.2, value_coef=0.5, entropy_coef=0.01) ...
        
    def collect_rollout(self, num_steps):
        """收集一批经验数据"""
        obs = self.env.reset()
        memories = []
        
        for step in range(num_steps):
            actions, log_probs = {}, {}
            # 分布式执行:每个Actor根据自身观察选择动作
            for aid, actor in self.actors.items():
                obs_tensor = torch.FloatTensor(obs[aid]).unsqueeze(0)
                a, lp = actor.get_action(obs_tensor)
                actions[aid] = a.item()
                log_probs[aid] = lp.item()
            
            # 环境步进
            next_obs, rewards, dones, info = self.env.step(actions)
            global_state = self.env.get_global_state()
            
            # 存储经验,注意存储的是全局奖励和全局状态
            memories.append({
                'obs': obs.copy(),
                'actions': actions.copy(),
                'log_probs': log_probs.copy(),
                'rewards': rewards.copy(),
                'global_reward': info['global_reward'],
                'global_state': global_state.copy(),
                'dones': dones.copy()
            })
            obs = next_obs
            
            if dones['__all__']:
                break
        
        return memories
    
    def update(self, memories):
        """利用收集的经验更新Actor和Critic网络"""
        # 1. 将经验转换为张量,计算GAE优势估计 (此处简化,直接使用TD误差)
        # 2. 对Critic进行多次更新,拟合状态价值
        for _ in range(self.critic_epochs):
            # 使用集中式Critic计算预测价值
            obs_batch = torch.FloatTensor([m['obs'] for m in memories])  # [T, N, obs_dim]
            state_batch = torch.FloatTensor([m['global_state'] for m in memories])
            predicted_values = self.critic(obs_batch, state_batch).squeeze()
            # 计算价值目标 (例如,使用折扣累积回报)
            value_targets = self._compute_returns(memories)
            # 计算Critic损失 (MSE)
            value_loss = F.mse_loss(predicted_values, value_targets)
            # ... 反向传播更新Critic ...
        
        # 3. 对每个Actor进行多次PPO更新
        for aid, actor in self.actors.items():
            old_log_probs = torch.FloatTensor([m['log_probs'][aid] for m in memories])
            obs_batch = torch.FloatTensor([m['obs'][aid] for m in memories])
            actions_batch = torch.LongTensor([m['actions'][aid] for m in memories])
            advantages = self._compute_advantages(memories, aid)  # 利用集中式Critic计算的优势
            
            for _ in range(self.actor_epochs):
                # 计算新策略的动作概率和对数概率
                new_probs = actor(obs_batch)
                dist = torch.distributions.Categorical(new_probs)
                new_log_probs = dist.log_prob(actions_batch)
                entropy = dist.entropy().mean()
                
                # PPO裁剪目标函数
                ratio = torch.exp(new_log_probs - old_log_probs.detach())
                surr1 = ratio * advantages
                surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantages
                actor_loss = -torch.min(surr1, surr2).mean() - self.entropy_coef * entropy
                
                # ... 反向传播更新Actor ...

MAPPO的核心优势:集中式Critic在训练时看到了所有智能体的观察和全局状态,因此它对环境动力学有更全面的理解,能给出更准确的价值估计和优势函数。每个Actor在更新时,使用这个更准确的“指导信号”(优势函数)来调整自己的策略,从而学会了如何在全局目标下进行协作。同时,PPO的裁剪机制确保了策略更新的稳定性。

第三部分:训练过程与结果可视化

3.1 训练曲线分析

我们分别运行IQL和MAPPO进行训练(例如,各1000个回合),并记录关键指标。

# 训练监控指标
metrics = {
    'episode': [],
    'global_reward': [],  # 全局奖励 (负的总排队长度)
    'avg_travel_time': [],  # 平均旅行时间
    'throughput': [],  # 总通过车辆数
}

预期训练曲线对比图 (可通过TensorBoard或Matplotlib绘制):

  1. 全局奖励曲线:MAPPO的曲线应呈现稳步上升并最终收敛的趋势,表明智能体协同能力增强,整体排队长度下降。而IQL的曲线则会大幅震荡,甚至可能没有明显上升趋势,体现其学习的不稳定性。
  2. 平均旅行时间曲线:与全局奖励负相关,MAPPO训练后的平均旅行时间应有显著下降,IQL则改善有限。
  3. 智能体间策略相关性分析:可以计算不同智能体动作序列的互信息或相关系数。在MAPPO训练后期,智能体的策略会展现出协同模式(例如,形成特定方向的“绿波”),而在IQL中,这种相关性很弱。

3.2 交通流动态可视化

这是最直观的部分。我们利用SUMO-GUI和traci的记录功能,生成训练前(随机策略或固定时长)、训练后(MAPPO策略)的仿真对比视频。

实现步骤

  1. 录制仿真:使用SUMO的--fcd-output--netstate-dump选项,或者直接通过traci.gui.screenshot在关键步骤截图。
  2. 生成对比视频
    # 伪代码:加载训练好的MAPPO策略运行一次仿真,并录制
    env = MultiAgentSumoEnv(config)
    model = load_model('mappo_final.pth')
    obs = env.reset()
    frames = []
    while not done:
        actions = {}
        for aid in env.agents:
            obs_tensor = torch.FloatTensor(obs[aid]).unsqueeze(0)
            action, _ = model.actors[aid].get_action(obs_tensor, deterministic=True)
            actions[aid] = action.item()
        obs, _, done, _ = env.step(actions)
        # 截图或保存车辆位置数据
        img = traci.gui.screenshot(viewID='View #0')
        frames.append(img)
    # 使用imageio或cv2将frames保存为视频
    
  3. 关键指标动画:使用Matplotlib的FuncAnimation,在同一坐标系下绘制训练前后关键路段的车速热图、排队长度时序图。

可视化结果解读

  • 训练前:交通流呈现“走走停停”模式,拥堵在个别交叉口堆积并向上游蔓延,车辆等待时间长。
  • 训练后(MAPPO):可以观察到明显的协同现象。例如,当主干道车流到来时,沿线多个交叉口会协调保持绿灯(或适当延长),形成“绿波带”,车辆得以连续通过,排队现象大幅减少。智能体学会了“预见”车流并提前做准备。
  • IQL结果:可能在某些局部交叉口有改善,但缺乏整体协调,可能一个交叉口放行过快导致下游溢出,形成新的拥堵点。

第四部分:总结、挑战与展望

通过D77-80的实践,我们成功地将MARL理论应用于交通信号控制,并获得了直观的优化效果。MAPPO展现了其在处理多智能体协同问题上的强大能力,而IQL则作为反面教材揭示了独立学习的局限。

遇到的挑战与解决方案

  1. 超参数敏感:MAPPO对学习率、裁剪系数等超参数敏感。我们通过网格搜索或使用自适应优化器(如Adam)来缓解。
  2. 训练速度慢:SUMO仿真本身是计算瓶颈。我们采用了更快的仿真步长(如0.5秒/步),减少不必要的车辆类型,并使用多线程异步数据收集来加速。
  3. 奖励函数设计:最初设计的局部负排队奖励导致智能体“自私”。我们引入了考虑下游排队和全局旅行时间增量的奖励塑造,显著改善了协同效果。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐