📢本篇文章是博主多智能体路径规划(MAPF)领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在👉强化学习专栏:

       【MAPF】多智能体路径规划---(1)《CBS 算法:多智能体路径规划的经典最优解框架》

CBS 算法:多智能体路径规划(MAPF)的经典最优解框架

目录

1. MAPF 的形式化定义

2. CBS 的核心:两层搜索

2.1 高层:在“约束树 CT(Constraint Tree)”上搜索

2.2 低层:单智能体最短路(通常 A*)

3. CBS 的完整流程

4. CBS 的伪代码

[Python] 一个可运行的CBS实例

[Results] 运行结果

[Notice]  为什么冲突不直接用“等待”动作替代?

核心误区 1:等待 ≠ 合法的“约束”

核心误区 2:等待一步可能引发连锁冲突

核心误区 3:等待一步可能导致代价不是最优

核心误区 4:有些冲突根本无法用等待解决

那 CBS 的“高层约束”到底是怎么加的?

5. 复杂度与优缺点

6.总结


        基于冲突搜索算法CBS(Conflict-Based Search)是解决 多智能体路径规划(Multi-Agent Path Finding, MAPF) 的经典算法之一。它的核心魅力在于:

  • 最优(可证明得到全局最小总代价解)

  • 两层搜索(高层在“约束树”上搜索,低层对单智能体做 A*)

  • 冲突驱动(只有出现冲突才增加约束)

在仓储机器人、AGV 调度、游戏 AI、群体机器人协作等场景中都很常见。


1. MAPF 的形式化定义

把环境表示为图 G=(V,E)(网格也属于一种图)。有 (n) 个智能体(a_1,\dots,a_n)

  • 智能体 (a_i)起点:(s_i\in V)

  • 终点:(g_i\in V)

  • 时间离散:(t=0,1,2,\dots)

  • 每步动作:移动到相邻点或等待

定义智能体(a_i)的路径为一个顶点序列(随时间变化):

\pi_i(t) \in V

且满足可行转移:

(\pi_i(t), \pi_i(t+1)) \in E \ \ \text{or}\ \ \pi_i(t+1)=\pi_i(t)

冲突(Conflict)

CBS 常用两类冲突:

(1) 顶点冲突 Vertex Conflict

\exists i\neq j,\ \pi_i(t)=\pi_j(t)

表示为:\langle i, v, t\rangle \Rightarrow \pi_i(t)\neq v

(2) 边冲突 Edge Conflict(交换位置)

\exists i\neq j,\ \pi_i(t)=\pi_j(t+1)\ \wedge\ \pi_i(t+1)=\pi_j(t)

表示为:\langle i, (u\to v), t\rangle \Rightarrow \neg(\pi_i(t)=u\ \wedge\ \pi_i(t+1)=v)

目标函数(最常见:总代价之和 SOC)

单个智能体成本(通常是步数):

cost(\pi_i)=|\pi_i|-1

总代价:

SOC(\pi)=\sum_{i=1}^{n} cost(\pi_i)

MAPF 的目标:找到一组无冲突路径 {\pi_i}使 (SOC) 最小。


2. CBS 的核心:两层搜索

CBS 把问题拆成两层:

2.1 高层:在“约束树 CT(Constraint Tree)”上搜索

高层节点(CT 节点)包含:

  • 约束集合 (C)

  • 当前解(每个智能体一条路径)\Pi = {\pi_i}

  • 代价(SOC(\Pi))

高层用 best-first(优先队列)按 (SOC) 从小到大扩展。

2.2 低层:单智能体最短路(通常 A*)

给定某个 CT 节点的约束集合 (C),对每个智能体分别求:

\pi_i = \arg\min_{\pi} cost(\pi)\quad \text{s.t.}\ \pi \ \text{}\ C \ \text{}\ s_i\to g_i


3. CBS 的完整流程

  1. 根节点:(C=\emptyset),对每个智能体跑一次 A* 得到最短路集合(\Pi)

  2. 检查(\Pi)中是否存在冲突:

  • 无冲突:返回(\Pi)(这是最优解)

  • 有冲突:取一个冲突 (i,j,\dots)分裂成两个子节点

  • 冲突分裂(核心):

  • 如果是顶点冲突(i,j,v,t)

    • 子节点 1:加约束 \langle i,v,t\rangle

    • 子节点 2:加约束\langle j,v,t\rangle

  • 如果是边冲突 (i,j,u\leftrightarrow v,t)

    • 子节点 1:加约束\langle i,(u\to v),t\rangle

    • 子节点 2:加约束\langle j,(v\to u),t\rangle

对每个子节点,只需要对新增约束影响到的那个智能体重新跑 A*,其他智能体路径沿用。


4. CBS 的伪代码

CBS(G, starts, goals):
  root.constraints = ∅
  root.paths[i] = A*(G, starts[i], goals[i], root.constraints, agent=i)  for all i
  root.cost = sum(cost(root.paths[i]))
  OPEN = {root}  (priority by cost)

  while OPEN not empty:
    P = pop lowest-cost node from OPEN
    conflict = first_conflict(P.paths)
    if conflict is None:
      return P.paths

    (i, j, type, details, t) = conflict
    for agent k in {i, j}:
      child = copy(P)
      child.constraints += constraint_for(agent=k, conflict)
      child.paths[k] = A*(..., constraints=child.constraints, agent=k)
      if child.paths[k] exists:
        child.cost = sum(cost(child.paths))
        push child into OPEN

  return failure

[Python] 一个可运行的CBS实例

 项目我可参考的代码已经放入GitCode里面,可以通过下面链接跳转:🔥

【MAPF】--- CBS算法 

后续相关MAPF算法也会不断在【MAPF】项目里更新,如果该项目对你有所帮助,请帮我点一个星星✨✨✨✨✨,鼓励分享,十分感谢!!!

若是下面代码复现困难或者有问题,也欢迎评论区留言

CBS算法代码

"""基于冲突的搜索(Conflict-Based Search)算法"""
from a_star import A_Star
from entity import *
from ct_node import CTNode

from copy import deepcopy
import heapq
import sys

"""CBS类"""
class CBS:
    def __init__(self, agents, size, obstacles):
        self.agents = agents # 智能体列表
        self.size = size # 地图大小
        self.obstacles = obstacles # 障碍物
        self.a_star = A_Star(size, obstacles) # 初始化底层规划器(A*算法)
        self.open_list = [] # 待扩展(检查)节点列表
        self.closed_list = [] # 已扩展(检查)节点列表

    # 检查问题合理性(是否有起点重复、终点重复、起点终点与障碍物冲突)
    def check_problem(self):
        start_positions = [] # 起点列表
        goal_positions = [] # 终点列表
        for agent in self.agents:
            # 检查起点重复
            if agent.start in start_positions:
                print("!!起点重复!!")
                return False
            start_positions.append(agent.start)
            # 检查终点重复
            if agent.goal in goal_positions:
                print("!!终点重复!!")
                return False
            goal_positions.append(agent.goal)
            # 检查起点终点与障碍物冲突
            if agent.start in self.obstacles or agent.goal in self.obstacles:
                print("!!起点终点与障碍物冲突!!")
                return False
            if(agent.start[0] < 0 or agent.start[0] >= self.size[0] or agent.start[1] < 0 or agent.start[1] >= self.size[1]):
                print("!!起点超出地图范围!!")
                return False
            if(agent.goal[0] < 0 or agent.goal[0] >= self.size[0] or agent.goal[1] < 0 or agent.goal[1] >= self.size[1]):
                print("!!终点超出地图范围!!")
                return False
        return True

    """冲突检测(第一个冲突)"""
    def search_first_conflict(self,solution):
        if solution is None:
            return 404 # 无解的解决方案

        paths = solution.paths
        # 检查顶点冲突
        for i in range(len(paths)):
            for j in range(i+1, len(paths)):
                max_t = max(len(paths[i].locations),len(paths[j].locations))
                for t in range(max_t):
                    # 注:到达终点后仍可能被碰撞
                    if t>=len(paths[j].locations):
                        if paths[i].locations[t] == paths[j].locations[len(paths[j].locations)-1]:
                            return VertexConflict(self.agents[i], self.agents[j], paths[i].locations[t], t)                        
                    elif t>=len(paths[i].locations):
                        if paths[j].locations[t] == paths[i].locations[len(paths[i].locations)-1]:
                            return VertexConflict(self.agents[i], self.agents[j], paths[j].locations[t], t)  
                    else: # 智能体i和j在t时刻到达同一位置
                        if paths[i].locations[t] == paths[j].locations[t]:
                            return VertexConflict(self.agents[i], self.agents[j], paths[i].locations[t], t)
        # 检查边冲突
        for i in range(len(paths)):
            for j in range(i+1, len(paths)):
                # 智能体i和j在t时刻交换位置
                for t in range(len(paths[i].locations)-1):
                    if t>=len(paths[j].locations)-1:
                        break
                    if(paths[i].locations[t] == paths[j].locations[t+1] and paths[i].locations[t+1] == paths[j].locations[t]):
                        return EdgeConflict(self.agents[i], self.agents[j], paths[i].locations[t], paths[j].locations[t], t+1)
        return None

    """冲突解决"""
    def resolve_conflict(self, best_node, conflict):
        # 顶点冲突
        if(isinstance(conflict, VertexConflict)):
            # 生成约束
            constraint1 = VertexConstraint(conflict.agent1.id, conflict.location, conflict.time)
            constraint2 = VertexConstraint(conflict.agent2.id, conflict.location, conflict.time)
            
            # 生成CT子节点,并求解决方案(含计算总成本)
            best_node.set_left_child([constraint1]) 
            best_node.set_right_child([constraint2])
            best_node.left_child.set_solution(self.get_solution(best_node.left_child))
            best_node.right_child.set_solution(self.get_solution(best_node.right_child))
            
            # 添加子节点进open_list
            heapq.heappush(self.open_list, best_node.left_child)
            heapq.heappush(self.open_list, best_node.right_child)

        # 边冲突
        elif(isinstance(conflict, EdgeConflict)):
            # 生成约束
            constraint1 = EdgeConstraint(conflict.agent1.id, conflict.begin, conflict.end, conflict.time)
            #### 注意:agent2的begin和end是相反的
            constraint2 = EdgeConstraint(conflict.agent2.id, conflict.end, conflict.begin, conflict.time)
            
            # 生成CT子节点,并求解决方案(含计算总成本)
            best_node.set_left_child([constraint1]) 
            best_node.set_right_child([constraint2])
            best_node.left_child.set_solution(self.get_solution(best_node.left_child))
            best_node.right_child.set_solution(self.get_solution(best_node.right_child))
            
            # 添加子节点进open_list
            heapq.heappush(self.open_list, best_node.left_child)
            heapq.heappush(self.open_list, best_node.right_child)
        return None

    """获取解决方案"""
    def get_solution(self, ctNode): 
        all_constraints = ctNode.get_all_constraints() # 获取完整约束(从根节点到当前节点)
        solution = Solution() # 初始化解决方案

        # 根节点: 初始化所有解决方案
        if(ctNode.parent is None): 
            for agent in self.agents:
                # 获取待求解智能体的全部约束
                agent_constraints = ctNode.get_agent_constraints(agent, all_constraints)
                # 调用底层规划求解
                path = self.a_star.search(agent, agent_constraints)
                
                if path is None: # 无解
                    print(str(agent.id)+"在当前节点约束下无解决方案")
                    return None
                solution.add_path(path)

        # 非根节点: 更新新增约束的智能体的解决方案,继承父节点其他解决方案
        else:
            solution = deepcopy(ctNode.parent.solution)
            for constraint in ctNode.constraints:
                # 获取待求解智能体的信息
                current_agent_id = constraint.agent_id
                current_agent = None
                current_agent_index = -1
                for i in range(len(self.agents)):
                    if(self.agents[i].id == current_agent_id):
                        current_agent = self.agents[i]
                        current_agent_index = i

                # 智能体不存在
                if(current_agent is None):
                    print("agent"+str(current_agent_id)+"不存在")
                    return None
                # 求解    
                else:
                    # 获取待求解智能体的全部约束
                    current_agent_constraints = ctNode.get_agent_constraints(current_agent, all_constraints)
                    # 调用底层规划求解
                    path = self.a_star.search(current_agent, current_agent_constraints)
                    
                    if path is None: # 无解
                        print(str(current_agent_id)+"在当前节点约束下无解决方案")
                        return None
                    else:
                        solution.paths[current_agent_index] = path # 更新该智能体的路径解决方案
        return solution

"""CBS主函数"""
def cbs_main(agents, size, obstacles):
    cbs = CBS(agents, size, obstacles) # 初始化环境
    if(not cbs.check_problem()): # 检查问题合理性
        return None
    root = CTNode([], None) # 根节点
    heapq.heappush(cbs.open_list, root) # 添加根节点
    root.set_solution(cbs.get_solution(root))

    # 主循环
    count = 0 # 计数
    while(cbs.open_list):
        count+=1
        if(count>999999999):
            print("扩展节点数超过999999999,退出")
            return None
        sys.stdout.write(f"\r————————————进度: 第{count}个节点————————————")
        sys.stdout.flush()

        best_node = heapq.heappop(cbs.open_list) # 最小成本节点(最佳优先搜索)
        cbs.closed_list.append(best_node) # 添加到已扩展节点列表

        # 冲突检测
        conflict = cbs.search_first_conflict(best_node.solution) 

        # 无解节点
        if conflict == 404: # 无解节点
            # cbs.open_list.remove(best_node) # 从待扩展节点列表中移除
            # cbs.closed_list.append(best_node) # 添加到已扩展节点列表
            continue
        # 有冲突,解决冲突
        elif(conflict):
            cbs.resolve_conflict(best_node, conflict) # 冲突解决
        # 无冲突,结束
        else:
            print()
            print("成功找到解决方案,共扩展了 "+str(count)+" 个节点")
            return best_node
    return None

[Results] 运行结果

​# 环境配置
Python                  3.11.5
torch                   2.1.0
torchvision             0.16.0
gym                     0.26.2


[Notice]  为什么冲突不直接用“等待”动作替代?

初学CBS算法的时候会有一个困惑:为什么不把所有智能体的路径用在时间树上展开,然后判断那个时间步里智能体出现了冲突。如果这样的话,只需要在冲突的时候,让冲突的智能体中的某一个暂停一个时间步即可避免冲突,实际上并不需要重规划,只需要给智能体添加一个暂停等待动作即可,这样的话算法不会复杂很多

这是在很多人第一次学 CBS 时最容易误解、但又最关键的点上

CBS 的高层约束 ≠ 简单地“让某个智能体等一步”

如果只是在冲突时强制等待一步:

  • ❌ 不能保证最优

  • ❌ 不能保证可行

  • ❌ 很多情况下根本解决不了冲突

CBS 必须通过“约束 + 重新规划”,而不仅是插一个 wait 动作。这个想发想法非常自然,但它只在极少数简单冲突中成立

我们的理解是:

  1. 把每个智能体的路径按时间步展开

  2. 在时间 t 发现冲突

  3. 让其中一个智能体在 t 等一步

  4. 冲突就没了

👉 那为什么还要重规划?

这在单一、局部、孤立冲突确实可能有效,但 CBS 要解决的是全局最优 MAPF,下面几个关键点会让这个策略崩溃。

核心误区 1:等待 ≠ 合法的“约束”

CBS 的约束是禁止某件事发生

而不是强制做某个动作

形式上:

  • 约束是:

  • 而“等一步”是:

这两者本质不同

CBS 约束 强制等待
本质 剪掉一部分搜索空间 强行固定路径
是否最优 ✅ 保证 ❌ 可能更差
是否完备
是否局部最优 ❌(允许绕路) ✅(只做局部修改)

CBS 从不指定“你必须怎么走”,它只说:

“你不能在 这个时间 占据 这个位置 / 这条边

剩下的由 A* 自己决定。


核心误区 2:等待一步可能引发连锁冲突

来看一个反例(这是关键)。

🔴 反例 1:狭窄走廊(经典死锁)

A → → → ← ← ← B

  • 一条 1 格宽走廊

  • A、B 从两端对向而行

  • 最短路:直走

你提出的方法:

  • 第一次冲突:让 A 等 1 步

  • 下一步:还是冲突

  • 再让 B 等?

  • 无限来回等待 → 死锁

CBS 的做法:

  • 给 A 或 B 加“边约束”

  • 重新规划

  • 发现:其中一个必须整体绕远路

  • 等待 ≠ 解决方案

核心误区 3:等待一步可能导致代价不是最优

🔴反例 2:等待比绕路更贵

#######
# A . #
# . . #
# . B #
#######
  • A、B 有一条公共短路径

  • 冲突点在中间

等待策略:

  • A 等 1 步 → B 先过

  • 总成本:+1

CBS 重规划:

  • A 绕一小圈(不等)

  • 总成本:+0 或 +1 更小

局部等待 ≠ 全局最优

CBS 的目标函数是:

不是:

“尽快解决当前这个冲突”

核心误区 4:有些冲突根本无法用等待解决

🔴 反例 3:边冲突(交换位置)

A ↔ B

  • 时间 t:A 在 u,B 在 v

  • 时间 t+1:A 在 v,B 在 u

让 A 等?

  • A 等 → B 仍然要从 v → u

  • 还是冲突(顶点冲突)

唯一解法:

  • 改变路径结构

  • 等待不够


那 CBS 的“高层约束”到底是怎么加的?

1️⃣ 高层不是“时间展开的路径树”

约束树 ≠ 把所有时间步都展开

✅ 正确理解:

约束树(CT)是一棵“假设树”

每个节点表示:

  • “假设某些行为不允许发生”

  • 在这些假设下,最优路径是什么


2️⃣ 冲突 → 二分世界(这是 CBS 的灵魂)

假设在节点 (N) 中发现冲突:

(a_i, a_j, v, t)

CBS 不是说“让谁等”,而是说:

世界 1:假设 i 不能在 (t) 到达 (v)
世界 2:假设 j 不能在 (t) 到达 (v)

也就是:

        N
       / \
   N_i     N_j
  • 左子树:约束 ⟨i, v, t⟩

  • 右子树:约束 ⟨j, v, t⟩

这两个世界里:

  • 智能体可以等

  • 也可以绕路

  • 也可以提前或延后到达

  • 只要不违反约束


3️⃣ 为什么必须“重规划”?

因为约束是:

“禁止某状态”

而不是:

“强制某动作”

所以你必须重新求解:

如果你只是插一个 wait:

  • 你是在 人为固定一条次优路径

  • 而不是在新的可行空间里找最优解

我们的想法它其实对应的是另一类算法

✔ 优先级规划(Prioritized Planning)

  • 给智能体固定顺序

  • 后面的对前面的等待

  • 快,但 不最优、不完备

✔ WHCA*

  • 局部时间窗

  • 冲突时插等待

  • 实时性好,但非最优

CBS 和它们的本质区别就在这里:

CBS 不“临时修补路径”,而是“系统性排除冲突模式”

CBS 的高层不是在修路径,而是在修“世界假设”

等待只是低层搜索中可能出现的一种结果,
永远不能被高层强制指定


5. 复杂度与优缺点

  • 最优性:若低层 A* 最优 + 高层按总代价 best-first,则 CBS 对 SOC 是最优的

  • 完备性:有限图 + 有等待动作 + 有界规划时域下可保证

  • 最坏复杂度:指数级(冲突多会导致 CT 节点爆炸)

  • 优势:对中等规模、多数情况下冲突不那么密集的问题非常强

  • 劣势:高密度拥挤场景会很慢;低层 A* 频繁重规划也会贵


6.总结

        CBS(Conflict-Based Search,冲突基搜索)是一种用于多智能体路径规划(MAPF)的经典最优算法,它采用“两层搜索”结构:高层在约束树上进行搜索,先让每个智能体独立规划最短路径并检测冲突,一旦发现顶点冲突或边冲突,就将当前节点分裂为两个子节点,分别对冲突双方中的一个添加“在某时刻禁止占据某位置/某条边”的约束;低层则只对新增约束影响到的那个智能体重新运行 A* 求最短路径并复用其他路径,循环直到所有路径无冲突,从而在保证完备性的同时得到总代价最小的全局最优解。

参考文献:Conflict-based search for optimal multi-agent path finding

参考代码:GitHub - YeBai2503/MAPF-CBS: 多智能体路径规划——基于冲突算法CBS的实现

 更多【MAPF】文章,请前往:【MAPF】多智能体路径规划专栏


        博客都是给自己看的笔记,如有误导深表抱歉。文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者添加VX:Rainbook_2,联系作者。✨

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐