一、算法介绍

1.关键词解释

(1)优先级:

每个智能体都有一个优先级,对于低优先级的智能体来说,高优先级的是障碍物。如何描述障碍物呢,通过点约束和边约束。

(2)点约束:

        每个规划好的agenta在一个时间步t都有一个固定的位置pos1。对于比它优先级低的agentb来说,在时间t,agentb是不允许进入pos1的。

        如下图,如果这个地图中有agent1, agent2,其中agent1的优先级高于agent2,那么比如在t4时刻,agent2就不可以到达(2,3)这个位置。

(3)边约束:

        点约束并不能处理所有的碰撞情况。比如对于agent1在t4时刻在(2,3)位置,在t5时刻在(3,3)位置。如果agent2在t4时刻在(3,3)位置,在t4时刻在(2,3)位置。这两个智能体在相同时间点并没有在同一个位置,但是它们在t3-t4时间段,交换位置了,产生对向碰撞。

2.整体算法框架

按照优先级遍历每个智能体,对于优先级最高的智能体来说,直接采用A*搜索出一条无碰撞的路径。

高优先级的路径搜索出之后,每条路径就会产生点约束和边约束,低优先级在满足点约束和边约束的情况下进行A*搜索,搜索出一条路径。

二、python代码

import heapq
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import time
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.animation as animation

# ------------------------------
# 地图和 Agent 设置
# ------------------------------
map_data = {
    "dimensions": [3, 3],
    "obstacles": [(0, 1), (2, 1)]
}

agents = [
    {"name": "agent0", "start": (0, 0), "goal": (2, 2)},
    {"name": "agent1", "start": (2, 2), "goal": (0, 0)}
]

# ------------------------------
# SIPP 节点
# ------------------------------
class Node:
    def __init__(self, x, y, t, cost, parent):
        self.x = x
        self.y = y
        self.t = t
        self.cost = cost
        self.parent = parent

    def __lt__(self, other):
        return (self.cost + self.t) < (other.cost + other.t)

# ------------------------------
# SIPP + 优先级路径规划器
# ------------------------------
class SIPPPlanner:
    def __init__(self, grid, obstacles, max_time):
        self.grid = grid
        self.H, self.W = grid
        self.obstacles = set(obstacles)
        self.max_time = max_time
        self.vertex_constraints = set()
        self.edge_constraints = set()

    def is_valid(self, x, y):
        return 0 <= x < self.H and 0 <= y < self.W and (x, y) not in self.obstacles

    def is_constrained(self, x, y, t):
        return (x, y, t) in self.vertex_constraints

    def is_edge_constrained(self, x1, y1, x2, y2, t):
        return (x1, y1, x2, y2, t) in self.edge_constraints

    def neighbors(self, x, y):
        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]:  # 包括等待
            nx, ny = x + dx, y + dy
            if self.is_valid(nx, ny):
                yield nx, ny

    def heuristic(self, x1, y1, x2, y2):
        return abs(x1 - x2) + abs(y1 - y2)

    def plan(self, start, goal):
        open_set = []
        heapq.heappush(open_set, (0, Node(start[0], start[1], 0, 0, None)))
        visited = set()

        while open_set:
            _, node = heapq.heappop(open_set)
            if (node.x, node.y, node.t) in visited:
                continue
            visited.add((node.x, node.y, node.t))

            if (node.x, node.y) == goal:
                return self.reconstruct_path(node)

            if node.t >= self.max_time:
                continue

            for nx, ny in self.neighbors(node.x, node.y):
                nt = node.t + 1
                if self.is_constrained(nx, ny, nt):
                    continue
                if self.is_edge_constrained(node.x, node.y, nx, ny, nt):
                    continue
                new_node = Node(nx, ny, nt, node.cost + 1, node)
                heapq.heappush(open_set, (new_node.cost + self.heuristic(nx, ny, *goal), new_node))

        return None

    def reconstruct_path(self, node):
        path = []
        while node:
            path.append((node.t, node.x, node.y))
            node = node.parent
        return list(reversed(path))

    def add_constraints_from_path(self, path):
        for i in range(len(path)):
            t, x, y = path[i]
            self.vertex_constraints.add((x, y, t))
            if i > 0:
                t_prev, x_prev, y_prev = path[i - 1]
                # self.edge_constraints.add((x_prev, y_prev, x, y, t))
                self.edge_constraints.add((x, y, x_prev, y_prev, t))

        # 保持占据终点
        final_t, gx, gy = path[-1]
        for t in range(final_t + 1, self.max_time + 2):
            self.vertex_constraints.add((gx, gy, t))

# ------------------------------
# 可视化函数
# ------------------------------
def animate_paths(grid_dim, obstacles, agents, paths):
    H, W = grid_dim
    fig, ax = plt.subplots(figsize=(6, 6))
    ax.set_xlim(0, W)
    ax.set_ylim(0, H)
    ax.set_aspect('equal')
    ax.invert_yaxis()
    ax.set_xticks(range(W))
    ax.set_yticks(range(H))
    ax.grid(True)

    # 画障碍物
    for (x, y) in obstacles:
        ax.add_patch(patches.Rectangle((y, x), 1, 1, color='black'))

    colors = ['red', 'blue', 'green', 'orange', 'purple']
    scatters = []
    texts = []
    for i, agent in enumerate(agents):
        scat = ax.scatter([], [], s=300, c=colors[i], label=agent['name'])
        scatters.append(scat)
        txt = ax.text(0, 0, '', fontsize=12, color=colors[i])
        texts.append(txt)

    ax.legend(loc='upper right')

    max_len = max(len(p) for p in paths)

    def update(frame):
        for i, path in enumerate(paths):
            if frame < len(path):
                t, x, y = path[frame]
            else:
                t, x, y = path[-1]
            scatters[i].set_offsets([y + 0.5, x + 0.5])
            texts[i].set_position((y + 0.5, x + 0.5))
            texts[i].set_text(f"{agents[i]['name']} t={t}")
        ax.set_title(f"Time step: {frame}")
        return scatters + texts

    ani = animation.FuncAnimation(fig, update, frames=max_len, interval=800, blit=True, repeat=False)
    ani.save('paths_animation.gif', writer='pillow', fps=1)

    plt.show()

# ------------------------------
# 主程序
# ------------------------------
def main():
    H, W = map_data["dimensions"]
    planner = SIPPPlanner((H, W), map_data["obstacles"], max_time=20)

    all_paths = []
    for agent in agents:
        path = planner.plan(agent["start"], agent["goal"])
        if not path:
            print(f"No path found for {agent['name']}")
        else:
            print(f"{agent['name']} path:")
            for t, x, y in path:
                print(f"  t={t}: ({x}, {y})")
            planner.add_constraints_from_path(path)
            all_paths.append(path)
    print(all_paths)
    animate_paths(map_data["dimensions"], map_data["obstacles"], agents, all_paths)

if __name__ == '__main__':
    main()

三、结果展示

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐