智能体在车联网中的应用:第50天 实现MAPPO与IQL训练信号灯智能体及可视化分析
多智能体强化学习在交通信号控制中的实现与验证 本文介绍了基于SUMO仿真平台的多智能体强化学习(MARL)交通信号控制系统的实现过程。系统采用独立Q学习(IQL)和多智能体近端策略优化(MAPPO)两种算法,在"田"字形路网中对四个信号灯智能体进行训练。 核心实现包括: 模块化训练框架设计,包含环境封装、算法实现和可视化模块 多智能体SUMO环境的关键封装,处理观察空间、动作空
引言:从理论设计到工程实践的跨越
在完成了多智能体强化学习(MARL)问题的严谨形式化设计后,我们来到了整个项目最激动人心的阶段——算法实现与训练验证。理论上的优雅设计需要在代码的“熔炉”中经受考验,而交通流优化的成效最终要体现在车辆排队缩短、通行速度提升的直观画面上。本阶段,我们将选择并实现两种具有代表性的MARL算法——独立Q学习(IQL) 和 多智能体近端策略优化(MAPPO),在SUMO构建的“田”字形路网中对四个信号灯智能体进行训练,并通过详实的可视化对比,揭示从“杂乱无章”到“协同有序”的演进过程。
我们将深入算法核心,剖析代码细节,并直面工程实现中的真实挑战。这不仅是一次技术实践,更是理解分布式智能如何从数据中涌现协同策略的绝佳范例。
第一部分:算法选择与实现架构
1.1 为什么选择IQL和MAPPO?
在众多MARL算法中,我们选择这对“组合”进行对比实现,具有深刻的考量:
- 独立Q学习 (IQL):作为基线算法,它是最简单的去中心化方法。每个智能体将自己视作单智能体,忽略其他智能体的存在,独立学习自己的Q函数。其价值在于:
- 实现简单,是检验问题是否“可学”的第一步。
- 暴露非平稳性问题:其训练的不稳定性直观展示了多智能体环境中独立学习的根本困境,为理解更高级算法的重要性提供了反面教材。
- 多智能体近端策略优化 (MAPPO):作为当前最先进的CTDE(集中训练,分散执行)策略梯度算法之一,它代表了解决协同问题的主流方向。其优势在于:
- 策略优化:直接优化策略,通常比基于价值的Q学习更稳定、采样效率更高。
- CTDE架构:训练时利用全局信息(集中式批评家Critic)指导个体策略(行动者Actor)更新,有效应对信用分配和环境非平稳性;执行时个体仅依赖局部观察。
- PPO的鲁棒性:继承了PPO的信任域约束和裁剪机制,防止策略更新步幅过大,保证训练稳定性。
通过对比IQL的“挣扎”与MAPPO的“协同”,我们能深刻领悟MARL设计思想的力量。
1.2 整体训练框架设计
我们采用模块化设计,构建一个通用的训练框架,可灵活切换IQL和MAPPO算法。
project/
│
├── envs/
│ └── multi_agent_sumo_env.py # 核心:多智能体SUMO环境封装
│
├── algorithms/
│ ├── iql/
│ │ ├── iql_agent.py # IQL智能体网络
│ │ └── iql_trainer.py # IQL训练循环
│ │
│ └── mappo/
│ ├── networks.py # Actor-Critic网络结构
│ ├── mappo_trainer.py # MAPPO训练循环 (含集中式Critic)
│ └── storage.py # 经验存储与采样 (支持GAE)
│
├── configs/
│ ├── grid4x4_config.yaml # 实验配置文件
│ └── mappo_params.yaml # MAPPO超参数
│
├── models/ # 保存训练好的模型
├── results/ # 训练日志、评估数据
└── visualize/ # 可视化脚本与结果
第二部分:核心代码实现剖析
2.1 多智能体SUMO环境封装 (MultiAgentSumoEnv)
这是连接SUMO仿真与RL算法的桥梁,其设计至关重要。
import traci
import numpy as np
from gym import spaces
class MultiAgentSumoEnv:
def __init__(self, config):
self.sumo_cfg = config['sumo_cfg']
self.agents = config['agent_ids'] # e.g., ['tl_n1', 'tl_n2', ...]
self.sim_step = 0
self.decision_interval = 10 # 每10个SUMO步(1秒/步)做一次决策
self.max_steps = config['max_steps']
# 定义每个智能体的观察和动作空间
self.observation_spaces = {}
self.action_spaces = {}
for aid in self.agents:
# 观察:例如[各车道排队(4), 各车道速度(4), 相位(1), 相位时间(1)] -> 10维
self.observation_spaces[aid] = spaces.Box(low=0, high=np.inf, shape=(10,))
# 动作:离散,0=保持当前相位,1=切换到下一相位
self.action_spaces[aid] = spaces.Discrete(2)
self._launch_sumo()
def reset(self):
"""重置环境,返回所有智能体的初始观察字典"""
traci.load(["-c", self.sumo_cfg, "--start"])
self.sim_step = 0
observations = {}
for aid in self.agents:
observations[aid] = self._get_agent_obs(aid)
return observations
def step(self, actions):
"""
执行联合动作。
:param actions: 字典 {agent_id: action}
:return: obs_dict, reward_dict, done_dict, info
"""
# 1. 执行动作:设置信号灯相位
for aid, action in actions.items():
if action == 1: # 切换相位
self._change_phase(aid)
# 2. 推进仿真多个步长(对应一个决策周期)
local_rewards = {aid: 0 for aid in self.agents}
for _ in range(self.decision_interval):
traci.simulationStep()
self.sim_step += 1
# 累积每个步长的局部奖励(如负排队长度)
for aid in self.agents:
local_rewards[aid] += self._compute_local_reward(aid)
# 3. 获取下一观察,判断是否结束
next_obs = {aid: self._get_agent_obs(aid) for aid in self.agents}
dones = {aid: (self.sim_step >= self.max_steps) for aid in self.agents}
dones['__all__'] = all(dones.values())
# 4. 计算全局奖励(用于MAPPO的集中式Critic)和收集信息
global_reward = self._compute_global_reward() # 例如,负的总旅行时间增量
info = {'global_reward': global_reward, 'step': self.sim_step}
return next_obs, local_rewards, dones, info
def _get_agent_obs(self, agent_id):
"""获取单个智能体的局部观察"""
obs = []
# 获取该信号灯控制的所有进口车道ID (需预先在路网中定义映射关系)
for lane in self.agent_lane_map[agent_id]['incoming']:
obs.append(traci.lane.getLastStepHaltingNumber(lane))
obs.append(traci.lane.getLastStepMeanSpeed(lane))
# 相位信息
obs.append(traci.trafficlight.getPhase(agent_id))
obs.append(traci.trafficlight.getPhaseDuration(agent_id))
return np.array(obs, dtype=np.float32)
def _compute_global_reward(self):
"""计算全局奖励,如负的总排队长度"""
total_queue = 0
for lane in traci.lane.getIDList():
total_queue += traci.lane.getLastStepHaltingNumber(lane)
return -total_queue
def get_global_state(self):
"""为集中式Critic提供全局状态(例如,关键链路的车辆密度矩阵)"""
# 这是一个简化示例,实际中可以构建一个路网特征的矩阵表示
state = []
for edge in self.monitored_edges:
density = traci.edge.getLastStepVehicleNumber(edge) / traci.edge.getLength(edge)
state.append(density)
return np.array(state, dtype=np.float32)
2.2 独立Q学习 (IQL) 实现
IQL的实现本质上是为每个智能体运行一个独立的DQN。
import torch
import torch.nn as nn
import random
from collections import deque
class IQLAgent(nn.Module):
def __init__(self, obs_dim, act_dim, hidden_dim=128):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, act_dim)
)
def forward(self, obs):
return self.net(obs)
class IQLTrainer:
def __init__(self, env, agent_ids, obs_dim, act_dim):
self.env = env
self.agents = {aid: IQLAgent(obs_dim, act_dim) for aid in agent_ids}
self.optimizers = {aid: torch.optim.Adam(agent.parameters(), lr=1e-4) for aid, agent in self.agents.items()}
self.replay_buffers = {aid: deque(maxlen=50000) for aid in agent_ids}
self.gamma = 0.99
self.batch_size = 64
def train_episode(self):
obs_dict = self.env.reset()
total_reward = 0
while True:
# 1. 每个智能体根据自身观察选择动作(ε-greedy)
actions = {}
for aid, agent in self.agents.items():
obs = torch.FloatTensor(obs_dict[aid]).unsqueeze(0)
if random.random() < self.epsilon: # 探索
action = random.randint(0, self.env.action_spaces[aid].n-1)
else: # 利用
with torch.no_grad():
q_values = agent(obs)
action = q_values.argmax().item()
actions[aid] = action
# 2. 环境步进
next_obs_dict, reward_dict, done_dict, _ = self.env.step(actions)
# 3. 为每个智能体独立存储经验
for aid in self.agents.keys():
self.replay_buffers[aid].append((
obs_dict[aid],
actions[aid],
reward_dict[aid],
next_obs_dict[aid],
done_dict[aid]
))
# 4. 为每个智能体独立更新Q网络
for aid, agent in self.agents.items():
if len(self.replay_buffers[aid]) >= self.batch_size:
self._update_agent(aid)
obs_dict = next_obs_dict
total_reward += sum(reward_dict.values())
if done_dict['__all__']:
break
return total_reward
def _update_agent(self, agent_id):
"""标准的DQN更新步骤,但完全独立"""
batch = random.sample(self.replay_buffers[agent_id], self.batch_size)
obs, actions, rewards, next_obs, dones = zip(*batch)
obs = torch.FloatTensor(obs)
actions = torch.LongTensor(actions).unsqueeze(1)
rewards = torch.FloatTensor(rewards)
next_obs = torch.FloatTensor(next_obs)
dones = torch.FloatTensor(dones)
agent = self.agents[agent_id]
# 当前Q值
current_q = agent(obs).gather(1, actions).squeeze()
# 目标Q值
with torch.no_grad():
next_q = agent(next_obs).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
# 计算损失并更新
loss = nn.MSELoss()(current_q, target_q)
self.optimizers[agent_id].zero_grad()
loss.backward()
self.optimizers[agent_id].step()
IQL的关键缺陷:在上面的_update_agent函数中,每个智能体用自己的next_obs计算目标Q值时,隐含地假设了环境是平稳的。然而,next_obs是由所有智能体的联合动作共同产生的,当其他智能体策略变化时,这个转移函数就变了,导致学习目标非平稳,这是IQL训练震荡、难以收敛的根本原因。
2.3 多智能体近端策略优化 (MAPPO) 实现
MAPPO的实现更为复杂,核心在于集中式Critic和PPO裁剪机制的运用。
# networks.py - 定义Actor和Centralized Critic网络
class Actor(nn.Module):
"""策略网络,每个智能体独立一个"""
def __init__(self, obs_dim, act_dim, hidden_dim=64):
super().__init__()
self.actor = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, act_dim),
nn.Softmax(dim=-1) # 输出动作概率分布
)
def forward(self, obs):
return self.actor(obs)
def get_action(self, obs, deterministic=False):
probs = self.forward(obs)
dist = torch.distributions.Categorical(probs)
if deterministic:
action = dist.probs.argmax(dim=-1)
else:
action = dist.sample()
log_prob = dist.log_prob(action)
return action, log_prob
class CentralizedCritic(nn.Module):
"""集中式价值网络,输入所有智能体的观察和全局状态"""
def __init__(self, num_agents, obs_dim, state_dim, hidden_dim=128):
super().__init__()
# 首先处理每个智能体的观察
self.obs_encoder = nn.Linear(obs_dim, hidden_dim // 2)
# 然后与全局状态融合
critic_input_dim = (hidden_dim // 2) * num_agents + state_dim
self.critic = nn.Sequential(
nn.Linear(critic_input_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1) # 输出一个标量价值
)
def forward(self, obs_batch, state_batch):
# obs_batch: [batch_size, num_agents, obs_dim]
# state_batch: [batch_size, state_dim]
batch_size = obs_batch.shape[0]
# 编码每个智能体的观察
encoded_obs = torch.relu(self.obs_encoder(obs_batch)) # [batch, num_agents, hidden//2]
# 展平所有智能体的编码信息
encoded_obs_flat = encoded_obs.view(batch_size, -1) # [batch, num_agents * hidden//2]
# 与全局状态拼接
critic_input = torch.cat([encoded_obs_flat, state_batch], dim=-1)
value = self.critic(critic_input)
return value
# mappo_trainer.py - 核心训练循环 (简化版,省略了GAE和许多优化细节)
class MAPPOTrainer:
def __init__(self, env, config):
self.env = env
self.num_agents = len(env.agents)
self.actors = {aid: Actor(obs_dim, act_dim) for aid, (obs_dim, act_dim) in env.spaces.items()}
self.critic = CentralizedCritic(self.num_agents, obs_dim, state_dim=env.global_state_dim)
# ... 初始化优化器、超参数 (clip_param=0.2, value_coef=0.5, entropy_coef=0.01) ...
def collect_rollout(self, num_steps):
"""收集一批经验数据"""
obs = self.env.reset()
memories = []
for step in range(num_steps):
actions, log_probs = {}, {}
# 分布式执行:每个Actor根据自身观察选择动作
for aid, actor in self.actors.items():
obs_tensor = torch.FloatTensor(obs[aid]).unsqueeze(0)
a, lp = actor.get_action(obs_tensor)
actions[aid] = a.item()
log_probs[aid] = lp.item()
# 环境步进
next_obs, rewards, dones, info = self.env.step(actions)
global_state = self.env.get_global_state()
# 存储经验,注意存储的是全局奖励和全局状态
memories.append({
'obs': obs.copy(),
'actions': actions.copy(),
'log_probs': log_probs.copy(),
'rewards': rewards.copy(),
'global_reward': info['global_reward'],
'global_state': global_state.copy(),
'dones': dones.copy()
})
obs = next_obs
if dones['__all__']:
break
return memories
def update(self, memories):
"""利用收集的经验更新Actor和Critic网络"""
# 1. 将经验转换为张量,计算GAE优势估计 (此处简化,直接使用TD误差)
# 2. 对Critic进行多次更新,拟合状态价值
for _ in range(self.critic_epochs):
# 使用集中式Critic计算预测价值
obs_batch = torch.FloatTensor([m['obs'] for m in memories]) # [T, N, obs_dim]
state_batch = torch.FloatTensor([m['global_state'] for m in memories])
predicted_values = self.critic(obs_batch, state_batch).squeeze()
# 计算价值目标 (例如,使用折扣累积回报)
value_targets = self._compute_returns(memories)
# 计算Critic损失 (MSE)
value_loss = F.mse_loss(predicted_values, value_targets)
# ... 反向传播更新Critic ...
# 3. 对每个Actor进行多次PPO更新
for aid, actor in self.actors.items():
old_log_probs = torch.FloatTensor([m['log_probs'][aid] for m in memories])
obs_batch = torch.FloatTensor([m['obs'][aid] for m in memories])
actions_batch = torch.LongTensor([m['actions'][aid] for m in memories])
advantages = self._compute_advantages(memories, aid) # 利用集中式Critic计算的优势
for _ in range(self.actor_epochs):
# 计算新策略的动作概率和对数概率
new_probs = actor(obs_batch)
dist = torch.distributions.Categorical(new_probs)
new_log_probs = dist.log_prob(actions_batch)
entropy = dist.entropy().mean()
# PPO裁剪目标函数
ratio = torch.exp(new_log_probs - old_log_probs.detach())
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantages
actor_loss = -torch.min(surr1, surr2).mean() - self.entropy_coef * entropy
# ... 反向传播更新Actor ...
MAPPO的核心优势:集中式Critic在训练时看到了所有智能体的观察和全局状态,因此它对环境动力学有更全面的理解,能给出更准确的价值估计和优势函数。每个Actor在更新时,使用这个更准确的“指导信号”(优势函数)来调整自己的策略,从而学会了如何在全局目标下进行协作。同时,PPO的裁剪机制确保了策略更新的稳定性。
第三部分:训练过程与结果可视化
3.1 训练曲线分析
我们分别运行IQL和MAPPO进行训练(例如,各1000个回合),并记录关键指标。
# 训练监控指标
metrics = {
'episode': [],
'global_reward': [], # 全局奖励 (负的总排队长度)
'avg_travel_time': [], # 平均旅行时间
'throughput': [], # 总通过车辆数
}
预期训练曲线对比图 (可通过TensorBoard或Matplotlib绘制):
- 全局奖励曲线:MAPPO的曲线应呈现稳步上升并最终收敛的趋势,表明智能体协同能力增强,整体排队长度下降。而IQL的曲线则会大幅震荡,甚至可能没有明显上升趋势,体现其学习的不稳定性。
- 平均旅行时间曲线:与全局奖励负相关,MAPPO训练后的平均旅行时间应有显著下降,IQL则改善有限。
- 智能体间策略相关性分析:可以计算不同智能体动作序列的互信息或相关系数。在MAPPO训练后期,智能体的策略会展现出协同模式(例如,形成特定方向的“绿波”),而在IQL中,这种相关性很弱。
3.2 交通流动态可视化
这是最直观的部分。我们利用SUMO-GUI和traci的记录功能,生成训练前(随机策略或固定时长)、训练后(MAPPO策略)的仿真对比视频。
实现步骤:
- 录制仿真:使用SUMO的
--fcd-output或--netstate-dump选项,或者直接通过traci.gui.screenshot在关键步骤截图。 - 生成对比视频:
# 伪代码:加载训练好的MAPPO策略运行一次仿真,并录制 env = MultiAgentSumoEnv(config) model = load_model('mappo_final.pth') obs = env.reset() frames = [] while not done: actions = {} for aid in env.agents: obs_tensor = torch.FloatTensor(obs[aid]).unsqueeze(0) action, _ = model.actors[aid].get_action(obs_tensor, deterministic=True) actions[aid] = action.item() obs, _, done, _ = env.step(actions) # 截图或保存车辆位置数据 img = traci.gui.screenshot(viewID='View #0') frames.append(img) # 使用imageio或cv2将frames保存为视频 - 关键指标动画:使用Matplotlib的
FuncAnimation,在同一坐标系下绘制训练前后关键路段的车速热图、排队长度时序图。
可视化结果解读:
- 训练前:交通流呈现“走走停停”模式,拥堵在个别交叉口堆积并向上游蔓延,车辆等待时间长。
- 训练后(MAPPO):可以观察到明显的协同现象。例如,当主干道车流到来时,沿线多个交叉口会协调保持绿灯(或适当延长),形成“绿波带”,车辆得以连续通过,排队现象大幅减少。智能体学会了“预见”车流并提前做准备。
- IQL结果:可能在某些局部交叉口有改善,但缺乏整体协调,可能一个交叉口放行过快导致下游溢出,形成新的拥堵点。
第四部分:总结、挑战与展望
通过D77-80的实践,我们成功地将MARL理论应用于交通信号控制,并获得了直观的优化效果。MAPPO展现了其在处理多智能体协同问题上的强大能力,而IQL则作为反面教材揭示了独立学习的局限。
遇到的挑战与解决方案:
- 超参数敏感:MAPPO对学习率、裁剪系数等超参数敏感。我们通过网格搜索或使用自适应优化器(如Adam)来缓解。
- 训练速度慢:SUMO仿真本身是计算瓶颈。我们采用了更快的仿真步长(如0.5秒/步),减少不必要的车辆类型,并使用多线程异步数据收集来加速。
- 奖励函数设计:最初设计的局部负排队奖励导致智能体“自私”。我们引入了考虑下游排队和全局旅行时间增量的奖励塑造,显著改善了协同效果。
更多推荐


所有评论(0)