RocketMQ For AI:多智能体异步通信新方案
摘要:RocketMQ推出LiteTopic特性,专为AI场景设计的多智能体异步通信架构。LiteTopic支持轻量级动态创建、自动生命周期管理和高性能订阅,解决AI应用中的长耗时任务阻塞和会话连续性挑战。其核心优势包括排他消费、顺序性保障和百万级轻量级主题支持,已在阿里云RocketMQ 5.x实例部署并提交至开源社区。典型应用场景包括Multi-Agent异步通信(实现任务并行调度与结果异步回
基于 RocketMQ LiteTopic 的多智能体异步通信架构
AI 应用的异步通信挑战
随着人工智能技术的发展,企业级 AI 应用呈现出长耗时任务、算力资源稀缺且昂贵、流量峰谷波动等新特征。
在多智能体(Multi-Agent)协作场景下,传统同步调用模式会导致线程长时间阻塞,难以应对大规模协作需求。例如,一个 Supervisor Agent(负责协调与管理) 将任务分配给多个 Sub-Agent 执行,如果采用同步等待 Sub-Agent 返回结果,Supervisor 的线程将被长时间占用,系统难以扩展。另一挑战是在网络异常或节点故障时如何保持会话连续性,避免重复处理任务浪费算力。
为解决这些问题,消息队列作为异步解耦中间件成为关键方案:将阻塞的同步调用改为异步通知,利用消息队列天然的削峰填谷能力平滑流量,并结合消费限流、消息优先级等特性提高资源利用率。
RocketMQ for AI 的核心: LiteTopic
针对 AI 场景需求,Apache RocketMQ 进行了战略升级,推出了面向 AI 场景的新特性,其中核心即 Lite Topic 。LiteTopic 可以理解为 RocketMQ 中一种动态、轻量级的子主题,支持根据业务需要按需创建和订阅,实现更细粒度的消息路由。与传统 Topic 不同,LiteTopic 具有以下关键特性:
• 轻量资源:LiteTopic 属于轻量级资源,可在一个父级 Topic 下创建百万级别的 LiteTopic,以支持海量任务的隔离通信需求。
• 自动生命周期管理:LiteTopic 的创建和销毁全自动完成。Producer 发送消息时如果指定的 LiteTopic 不存在,系统会自动创建;若长时间未收到新消息则根据过期时间自动删除,免除手工管理开销。
• 高性能订阅:Consumer 动态订阅/取消订阅 LiteTopic,每个消费者可订阅多达成千上万个 LiteTopic。这使应用能够根据会话或任务动态调整订阅,而无需预先创建所有子主题。
• 排他消费:同一时刻每个 LiteTopic 只会被单个消费者订阅和消费。这一机制确保了特定会话或任务的消息只会有一个消费者处理,避免并发竞争,在会话保持场景中至关重要。
• 顺序性保障:每个 LiteTopic 内部的消息严格按照发送顺序存储和消费,保证了例如多轮对话响应顺序的一致性。
上述能力目前已在阿里云消息队列 RocketMQ 5.x 实例中发布,并已在 Apache RocketMQ 开源社区提交 PR。同时其也在和 Agent Scope 的开发团队合作,尝试将 LiteTopic 引入 Agent Scope 中,为 AI 应用提供更加强大的异步通信能力。
更多持续更新前沿技术、Agent开发等资源加入 赋范空间 免费领取
LiteTopic 应用场景
Multi-Agent 的异步通信,解决长耗时调用阻塞痛点
在一个典型的多智能体协作场景下,一个 Supervisor Agent 负责接收用户请求并拆解成若干子任务,由不同Sub-Agent执行并返回给Supervisor Agent,最后汇总结果反馈给用户

- 任务请求投递:
为每个 Sub-Agent 创建一个专属的请求 Topic 充当任务队列,Supervisor 将拆解后的子任务分别发送到对应的 Sub-Agent 请求Topic中。每个 Sub-Agent 独享其请求队列,从而实现任务的并行调度与流量隔离。
- 结果异步返回:
Supervisor 与 Sub-Agent 间通过 Response Topic 和 LiteTopic 实现结果的异步回传:
a. Supervisor 创建一个用于接收所有任务结果的 Response Topic,并订阅该 Topic 。这样Supervisor即可监听子任务的结果消息。
b. 当 Sub-Agent 完成任务后,将结果消息发送到上述 Response Topic 下,并为每个独立任务动态创建一个专属 LiteTopic 来承载该任务的结果。RocketMQ 检测到新的 LiteTopic 名称后会自动创建相应资源,无需预先配置。
c. Supervisor 实时订阅到 Response Topic 下新建的 LiteTopic 消息,从中异步获取各个任务结果,然后通过 HTTP SSE 等方式将结果即时推送给前端用户。
每个 LiteTopic 内消息顺序消费,确保如果子任务的产出 分多条消息(如流式输出),Supervisor 接收到的顺序与产生顺序一致。
分布式会话状态管理,终结 AI 应用的会话状态管理难题
AI 应用的交互模式具有特殊性,即长耗时、多轮次且高度依赖高成本计算的会话。当应用依赖 SSE 等长连接时,一旦由于网络等原因连接中断,不仅会导致当前会话上下文的丢失,更会直接造成已投入的 AI 任务作废,从而浪费宝贵的算力资源。
在一个典型的长连接会话场景下,例如:对话类AI应用,用户与AI可能进行多轮对话,且对话过程耗时较长。如果对话过程中客户端断线并重连到不同服务节点,如何不中断地续接对话并避免任务重复执行

- 用户连接会话
用户会话由一个应用节点处理,该节点订阅对应会话的 LiteTopic 并连续向前端推送AI回复。
- 由于网络或节点故障,用户重连到了集群中另一节点:
a. 新节点会立即订阅此会话对应的 LiteTopic
b. 原节点则取消订阅
确保同一会话的 LiteTopic 在任一时刻只由一个节点消费。新节点可以从上次中断的位置继续消费会话消息流。
这样保证用户在前端感受到的对话是连续的,中途换节点并不会丢失之前的上下文,也不会因为重新订阅而导致重复执行已完成的AI任务。
通过这种方式,RocketMQ LiteTopic 模型有效解决了会话状态跨节点迁移和断线续传的问题,大大提高了多节点部署下用户会话的可靠性。
更多持续更新前沿技术、Agent开发等资源加入 赋范空间 免费领取
更多参考资料
github : Lite Topic: A New Message Model
阿里云 开发参考:LiteTopic
RocketMQ LiteTopic + Google ADK 实现 多 Agent 异步交互示例:rocketmq-multi-agent-demo
更多推荐



所有评论(0)