引言:从单点控制到协同优化的范式转变

在传统的交通信号控制系统中,信号灯往往依据预设的定时方案或基于单一交叉口的感应数据进行孤立的决策。然而,城市交通网络是一个典型的复杂系统,各个交叉口之间的车流相互关联、相互影响。一个交叉口的信号调整,其影响会像涟漪一样传递到上下游路段,形成“拥堵传导”效应。因此,将每个信号灯控制器视为一个独立的智能体(Agent),并使其通过感知、决策与协作来优化整体路网性能,成为了智能交通系统(ITS)研究的前沿方向。多智能体系统(Multi-Agent System, MAS)为解决这一分布式协同优化问题提供了强大的理论框架。

第一部分:构建基础路网——多交叉口场景设计

任何仿真的起点都是创建一个能够反映现实复杂性的路网。我们将构建一个包含四个相邻交叉口的“田”字形路网,这是研究多智能体协同的经典场景。

1.1 使用NETEDIT或Python脚本创建路网

方法一:使用SUMO-GUI的NETEDIT(推荐初学者)

  1. 打开NETEDIT,在“网络”模式下,使用“交叉口”工具创建四个节点(node),例如n1, n2, n3, n4,形成一个正方形。
  2. 使用“路段”工具连接这些节点,形成双向四车道道路。例如,创建从n1n2e1-2)和从n2n1e2-1)的路段。
  3. 为每个交叉口(即节点n1, n2, n3, n4)创建交通信号灯。在NETEDIT中,这通常在创建交叉口连接时自动生成,但需确保每个交叉口都有独立的信号灯逻辑ID(如tl_n1)。

方法二:使用netconvert.nod.xml, .edg.xml文件(更适合自动化)
创建simple_grid.nod.xml:

<nodes>
    <node id="n1" x="0" y="0" type="traffic_light"/>
    <node id="n2" x="200" y="0" type="traffic_light"/>
    <node id="n3" x="0" y="200" type="traffic_light"/>
    <node id="n4" x="200" y="200" type="traffic_light"/>
</nodes>

创建simple_grid.edg.xml:

<edges>
    <edge id="e1-2" from="n1" to="n2" numLanes="2" speed="13.89"/>
    <edge id="e2-1" from="n2" to="n1" numLanes="2" speed="13.89"/>
    <edge id="e1-3" from="n1" to="n3" numLanes="2" speed="13.89"/>
    <edge id="e3-1" from="n3" to="n1" numLanes="2" speed="13.89"/>
    <!-- ... 类似地定义其他边 e2-4, e4-2, e3-4, e4-3 ... -->
</edges>

然后使用命令行生成网络文件:

netconvert --node-files=simple_grid.nod.xml --edge-files=simple_grid.edg.xml --output-file=simple_grid.net.xml

至此,我们得到了一个具有四个信号控制交叉口的物理路网文件simple_grid.net.xml

1.2 定义车流与初始交通需求

创建flows.rou.xml文件,定义从各个方向驶入路网的车辆流。为了增加协同优化的必要性,我们设置主要的交通流向(如从西向东,从北向南),形成潮汐流或交叉流。

<routes>
    <vType id="car" accel="2.6" decel="4.5" sigma="0.5" length="5" maxSpeed="13.89"/>
    <flow id="west_east" type="car" begin="0" end="3600" period="5" departLane="best" from="e3-1" to="e2-4"/>
    <flow id="north_south" type="car" begin="0" end="3600" period="7" departLane="best" from="e2-1" to="e4-3"/>
    <!-- 添加其他次要流,增加路网负载和交互复杂性 -->
</routes>

第二部分:定义多智能体环境——信号灯即智能体

这是本文的核心。我们将使用TraCI(Traffic Control Interface)接口,通过Python编程实现对每个信号灯智能体的控制。

2.1 智能体的抽象与封装

每个信号灯智能体应包含以下核心属性:

  • ID:与SUMO中路网信号灯ID对应(如tl_n1)。
  • 观察(Observation):智能体所能感知的环境信息。
  • 行动(Action):智能体能够执行的决策,即切换信号相位。
  • 奖励(Reward):用于评估其决策好坏的信号。

我们创建一个TrafficLightAgent类:

import traci
import numpy as np

class TrafficLightAgent:
    def __init__(self, tl_id, net_data):
        self.id = tl_id
        self.net_data = net_data  # 包含路网信息,如车道连接关系
        self.phases = self._get_phases_from_net()  # 从路网获取可用相位列表
        self.current_phase_index = 0
        self.phase_duration = 10  # 初始相位持续时间(秒)
        self.last_change_step = 0

    def _get_phases_from_net(self):
        # 通过traci.trafficlight.getCompleteRedYellowGreenDefinition获取相位
        # 简化示例:假设是标准四相位
        return [0, 1, 2, 3]  # 对应SUMO中定义的相位索引

    def get_observation(self):
        """收集局部观察信息,作为智能体状态"""
        obs = []
        # 1. 各进口车道排队长度(关键状态)
        for lane in self._get_incoming_lanes():
            halted_vehicles = traci.lane.getLastStepHaltingNumber(lane)
            obs.append(halted_vehicles)
        # 2. 当前相位已持续时间
        obs.append(traci.simulation.getTime() - self.last_change_step)
        # 3. 相邻交叉口状态(简单通信或间接感知)
        # ... (后续扩展)
        return np.array(obs)

    def _get_incoming_lanes(self):
        # 通过traci.trafficlight.getControlledLanes获取该信号灯控制的所有车道
        all_lanes = traci.trafficlight.getControlledLanes(self.id)
        # 过滤出进口车道(逻辑判断,简化处理)
        incoming_lanes = [lane for lane in all_lanes if lane.startswith('e')]  # 示例
        return incoming_lanes

    def choose_action(self, obs):
        """决策函数:决定是否切换相位或保持"""
        # 这是一个规则策略的示例。在实际RL中,这里应替换为神经网络策略。
        queue_threshold = 5
        max_phase_time = 30
        current_queue = obs[:-1]  # 假设obs最后一位是时间
        current_phase_time = obs[-1]

        # 规则1:如果当前相位对应车道排队过长,且相位时间超过最小绿灯时间,则考虑切换
        if max(current_queue) > queue_threshold and current_phase_time > 10:
            return 1  # 动作:切换到下一个相位
        # 规则2:如果相位时间超过最大值,强制切换
        elif current_phase_time > max_phase_time:
            return 1
        else:
            return 0  # 动作:保持当前相位

    def execute_action(self, action):
        """在SUMO中执行动作"""
        if action == 1:  # 切换相位
            self.current_phase_index = (self.current_phase_index + 1) % len(self.phases)
            traci.trafficlight.setPhase(self.id, self.current_phase_index)
            self.last_change_step = traci.simulation.getTime()
        # action == 0 则不做任何操作,SUMO会维持当前相位

    def compute_reward(self):
        """计算局部奖励,例如负的总排队长度变化"""
        current_total_queue = sum([traci.lane.getLastStepHaltingNumber(lane) for lane in self._get_incoming_lanes()])
        # 这里需要记录上一步的排队长度来计算差分奖励
        reward = self.last_total_queue - current_total_queue
        self.last_total_queue = current_total_queue
        return reward

2.2 多智能体仿真主循环

主程序负责初始化SUMO、创建多个智能体实例,并驱动整个协同仿真过程。

import os, sys
import subprocess

def run_multi_agent_simulation(net_file, route_file, gui=False):
    # 启动SUMO,连接TraCI
    sumo_cmd = ["sumo-gui" if gui else "sumo", "-n", net_file, "-r", route_file, "--time-to-teleport", "300"]
    traci.start(sumo_cmd)

    # 创建多智能体列表
    tl_ids = traci.trafficlight.getIDList()
    agents = {tl_id: TrafficLightAgent(tl_id, None) for tl_id in tl_ids}  # 假设有4个信号灯

    step = 0
    max_steps = 3600  # 仿真1小时

    try:
        while step < max_steps:
            traci.simulationStep()

            for tl_id, agent in agents.items():
                # 每个智能体独立感知、决策、执行
                obs = agent.get_observation()
                action = agent.choose_action(obs)
                agent.execute_action(action)
                # reward = agent.compute_reward()  # 可用于学习

            step += 1

            # 全局数据收集(用于评估)
            if step % 300 == 0:  # 每5分钟打印一次全局数据
                avg_waiting_time = traci.simulation.getAverageWaitingTime() / 1000.0  # 转换为秒
                print(f"Step {step}, Global Avg Waiting Time: {avg_waiting_time:.2f}s")

    except Exception as e:
        print(f"Simulation interrupted: {e}")
    finally:
        traci.close()

if __name__ == "__main__":
    run_multi_agent_simulation("simple_grid.net.xml", "flows.rou.xml", gui=True)

第三部分:实现智能体间的通信与协调机制

独立的决策可能导致“局部最优,全局次优”。因此,智能体间的信息交换至关重要。我们实现两种简单的协调机制:

3.1 基于邻近感知的协调

扩展get_observation函数,使其包含上游或下游交叉口的排队信息。

def get_observation_with_neighbors(self, neighbor_agents):
    obs = self.get_observation()  # 基础局部观察
    neighbor_info = []
    # 获取指定邻居(例如直接上游)的排队长度
    for neighbor_id in self._get_upstream_neighbor_ids(): # 需要预先定义邻居关系
        if neighbor_id in neighbor_agents:
            neighbor_obs = neighbor_agents[neighbor_id].get_observation()
            neighbor_info.append(neighbor_obs[0]) # 取排队长度作为关键信息
    return np.concatenate([obs, np.array(neighbor_info)])

3.2 基于简单通信协议的协调

我们可以设计一个消息传递接口。例如,当一个智能体检测到严重拥堵(排队超过阈值)时,向其下游智能体发送“请求优先”消息。下游智能体在决策时,会考虑这个请求。

class CoordinatedTrafficLightAgent(TrafficLightAgent):
    def __init__(self, tl_id, net_data):
        super().__init__(tl_id, net_data)
        self.inbox = []  # 接收到的消息队列

    def send_message(self, receiver_id, msg_type, content):
        # 在实际多进程/多线程环境中,这里需要进程间通信
        # 简化示例:通过全局字典传递
        global_message_board[receiver_id].append({'from': self.id, 'type': msg_type, 'content': content})

    def process_messages(self):
        priority_request = False
        for msg in self.inbox:
            if msg['type'] == 'PRIORITY_REQUEST':
                if self._is_upstream_of_me(msg['from']):
                    priority_request = True
        self.inbox.clear()
        return priority_request

    def choose_action(self, obs):
        has_priority_request = self.process_messages()
        # 在决策逻辑中融入请求:如果收到上游的优先请求,更倾向于延长或切换到有利于该方向的绿灯
        if has_priority_request and self._current_phase_serves_upstream():
            return 0  # 保持相位
        # ... 原有的决策逻辑

第四部分:优化、评估与展望

4.1 评估指标体系

一个优秀的多智能体信号控制系统应从多维度评估:

  1. 全局效率指标:路网总旅行时间、平均速度、总通过车辆数。
  2. 公平性指标:不同方向车流等待时间的方差。
  3. 鲁棒性指标:在突发流量或事故场景下的性能衰减程度。
  4. 通信开销:智能体间消息传递的频率与数据量。

我们可以在仿真结束后,使用SUMO的输出文件(--tripinfo-output)或通过TraCI实时计算这些指标。

4.2 从规则控制到强化学习

本文实现的基于规则的决策器只是一个起点。真正的“智能”来自于学习。我们可以将每个TrafficLightAgent嵌入到一个深度强化学习框架中:

  • 观察空间get_observation返回的向量。
  • 行动空间:离散行动(切换相位)或连续行动(调整相位时长)。
  • 奖励函数:精心设计是关键。可以是局部排队长度的负值,也可以是包含全局信息的团队奖励(Team Reward),甚至是基于价值分解网络(VDN)或QMIX算法的混合奖励。
  • 训练架构:可以采用集中式训练分布式执行(CTDE)的范式,使用如MADDPG、MAPPO等多智能体强化学习算法。

4.3 挑战与未来方向

  1. 部分可观测性:每个智能体只能看到路网的一部分,决策具有不确定性。
  2. 非平稳性:当所有智能体同时学习时,环境对每个智能体来说都在变化。
  3. 信用分配:如何将全局性能的提升归因于每个智能体的个体动作。
  4. 可扩展性:路网规模增大时,智能体数量增加,协调复杂度指数上升。

未来的探索可以朝向分层控制、图神经网络(GNN)建模智能体间拓扑关系、以及与边缘计算结合实现低延迟分布式决策等方向发展。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐