多智能体路径规划算法(1.SIPP改良,考虑边约束)
比如对于agent1在t4时刻在(2,3)位置,在t5时刻在(3,3)位置。如果agent2在t4时刻在(3,3)位置,在t4时刻在(2,3)位置。如下图,如果这个地图中有agent1, agent2,其中agent1的优先级高于agent2,那么比如在t4时刻,agent2就不可以到达(2,3)这个位置。如何描述障碍物呢,通过点约束和边约束。高优先级的路径搜索出之后,每条路径就会产生点约束和边约
·
一、算法介绍
1.关键词解释
(1)优先级:
每个智能体都有一个优先级,对于低优先级的智能体来说,高优先级的是障碍物。如何描述障碍物呢,通过点约束和边约束。
(2)点约束:
每个规划好的agenta在一个时间步t都有一个固定的位置pos1。对于比它优先级低的agentb来说,在时间t,agentb是不允许进入pos1的。
如下图,如果这个地图中有agent1, agent2,其中agent1的优先级高于agent2,那么比如在t4时刻,agent2就不可以到达(2,3)这个位置。

(3)边约束:
点约束并不能处理所有的碰撞情况。比如对于agent1在t4时刻在(2,3)位置,在t5时刻在(3,3)位置。如果agent2在t4时刻在(3,3)位置,在t4时刻在(2,3)位置。这两个智能体在相同时间点并没有在同一个位置,但是它们在t3-t4时间段,交换位置了,产生对向碰撞。
2.整体算法框架
按照优先级遍历每个智能体,对于优先级最高的智能体来说,直接采用A*搜索出一条无碰撞的路径。
高优先级的路径搜索出之后,每条路径就会产生点约束和边约束,低优先级在满足点约束和边约束的情况下进行A*搜索,搜索出一条路径。

二、python代码
import heapq
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import time
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.animation as animation
# ------------------------------
# 地图和 Agent 设置
# ------------------------------
map_data = {
"dimensions": [3, 3],
"obstacles": [(0, 1), (2, 1)]
}
agents = [
{"name": "agent0", "start": (0, 0), "goal": (2, 2)},
{"name": "agent1", "start": (2, 2), "goal": (0, 0)}
]
# ------------------------------
# SIPP 节点
# ------------------------------
class Node:
def __init__(self, x, y, t, cost, parent):
self.x = x
self.y = y
self.t = t
self.cost = cost
self.parent = parent
def __lt__(self, other):
return (self.cost + self.t) < (other.cost + other.t)
# ------------------------------
# SIPP + 优先级路径规划器
# ------------------------------
class SIPPPlanner:
def __init__(self, grid, obstacles, max_time):
self.grid = grid
self.H, self.W = grid
self.obstacles = set(obstacles)
self.max_time = max_time
self.vertex_constraints = set()
self.edge_constraints = set()
def is_valid(self, x, y):
return 0 <= x < self.H and 0 <= y < self.W and (x, y) not in self.obstacles
def is_constrained(self, x, y, t):
return (x, y, t) in self.vertex_constraints
def is_edge_constrained(self, x1, y1, x2, y2, t):
return (x1, y1, x2, y2, t) in self.edge_constraints
def neighbors(self, x, y):
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]: # 包括等待
nx, ny = x + dx, y + dy
if self.is_valid(nx, ny):
yield nx, ny
def heuristic(self, x1, y1, x2, y2):
return abs(x1 - x2) + abs(y1 - y2)
def plan(self, start, goal):
open_set = []
heapq.heappush(open_set, (0, Node(start[0], start[1], 0, 0, None)))
visited = set()
while open_set:
_, node = heapq.heappop(open_set)
if (node.x, node.y, node.t) in visited:
continue
visited.add((node.x, node.y, node.t))
if (node.x, node.y) == goal:
return self.reconstruct_path(node)
if node.t >= self.max_time:
continue
for nx, ny in self.neighbors(node.x, node.y):
nt = node.t + 1
if self.is_constrained(nx, ny, nt):
continue
if self.is_edge_constrained(node.x, node.y, nx, ny, nt):
continue
new_node = Node(nx, ny, nt, node.cost + 1, node)
heapq.heappush(open_set, (new_node.cost + self.heuristic(nx, ny, *goal), new_node))
return None
def reconstruct_path(self, node):
path = []
while node:
path.append((node.t, node.x, node.y))
node = node.parent
return list(reversed(path))
def add_constraints_from_path(self, path):
for i in range(len(path)):
t, x, y = path[i]
self.vertex_constraints.add((x, y, t))
if i > 0:
t_prev, x_prev, y_prev = path[i - 1]
# self.edge_constraints.add((x_prev, y_prev, x, y, t))
self.edge_constraints.add((x, y, x_prev, y_prev, t))
# 保持占据终点
final_t, gx, gy = path[-1]
for t in range(final_t + 1, self.max_time + 2):
self.vertex_constraints.add((gx, gy, t))
# ------------------------------
# 可视化函数
# ------------------------------
def animate_paths(grid_dim, obstacles, agents, paths):
H, W = grid_dim
fig, ax = plt.subplots(figsize=(6, 6))
ax.set_xlim(0, W)
ax.set_ylim(0, H)
ax.set_aspect('equal')
ax.invert_yaxis()
ax.set_xticks(range(W))
ax.set_yticks(range(H))
ax.grid(True)
# 画障碍物
for (x, y) in obstacles:
ax.add_patch(patches.Rectangle((y, x), 1, 1, color='black'))
colors = ['red', 'blue', 'green', 'orange', 'purple']
scatters = []
texts = []
for i, agent in enumerate(agents):
scat = ax.scatter([], [], s=300, c=colors[i], label=agent['name'])
scatters.append(scat)
txt = ax.text(0, 0, '', fontsize=12, color=colors[i])
texts.append(txt)
ax.legend(loc='upper right')
max_len = max(len(p) for p in paths)
def update(frame):
for i, path in enumerate(paths):
if frame < len(path):
t, x, y = path[frame]
else:
t, x, y = path[-1]
scatters[i].set_offsets([y + 0.5, x + 0.5])
texts[i].set_position((y + 0.5, x + 0.5))
texts[i].set_text(f"{agents[i]['name']} t={t}")
ax.set_title(f"Time step: {frame}")
return scatters + texts
ani = animation.FuncAnimation(fig, update, frames=max_len, interval=800, blit=True, repeat=False)
ani.save('paths_animation.gif', writer='pillow', fps=1)
plt.show()
# ------------------------------
# 主程序
# ------------------------------
def main():
H, W = map_data["dimensions"]
planner = SIPPPlanner((H, W), map_data["obstacles"], max_time=20)
all_paths = []
for agent in agents:
path = planner.plan(agent["start"], agent["goal"])
if not path:
print(f"No path found for {agent['name']}")
else:
print(f"{agent['name']} path:")
for t, x, y in path:
print(f" t={t}: ({x}, {y})")
planner.add_constraints_from_path(path)
all_paths.append(path)
print(all_paths)
animate_paths(map_data["dimensions"], map_data["obstacles"], agents, all_paths)
if __name__ == '__main__':
main()
三、结果展示

更多推荐
所有评论(0)