智能体(Agents)可以通过多种方式协同工作来解决问题。诸如 AutoGenMetaGPT 和 ChatDev 等研究工作表明,多智能体系统在软件开发等复杂任务上表现优于单智能体系统。

多智能体设计模式是信息协议中出现的一种结构:它描述了智能体如何相互作用以解决问题。例如,上一节中的配备工具的智能体采用了名为 ReAct 的设计模式,该模式涉及智能体与工具的交互。

您可以使用 AutoGen 智能体实现任何多智能体设计模式。在接下来的两节中,我们将讨论两种常见的设计模式:用于任务分解的群聊,以及用于增强鲁棒性的反思。

在本节中,我们将探讨多个代理并发工作的使用。我们涵盖了三种主要模式:

  1. 单一消息与多个处理器
    演示单个消息如何被订阅同一主题的多个代理同时处理。

  2. 多个消息与多个处理器
    阐释如何根据主题将特定消息类型路由到专用代理。

  3. 直接消息传递
    重点介绍代理之间以及从运行时到代理发送消息。

import asyncio
from dataclasses import dataclass

from autogen_core import (
    AgentId,
    ClosureAgent,
    ClosureContext,
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    default_subscription,
    message_handler,
    type_subscription,
)

 

@dataclass
class Task:
    task_id: str


@dataclass
class TaskResponse:
    task_id: str
    result: str

 

单一消息与多个处理器

第一种模式展示了单个消息如何被多个代理同时处理。

  • 每个 Processor 代理使用 default_subscription() 装饰器订阅默认主题。

  • 当向默认主题发布消息时,所有注册的代理都将独立处理该消息。

注意

下面,我们使用 default_subscription() 装饰器订阅 Processor;还有一种不使用装饰器订阅代理的替代方法,如订阅和发布主题所示,这种方式使得同一个代理类可以订阅到不同的主题。

@default_subscription
class Processor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"{self._description} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()

await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))

runtime.start()

await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())

await runtime.stop_when_idle()
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1

多个消息与多个处理器

其次,这种模式演示了将不同类型的消息路由到特定处理器。

  • UrgentProcessor 订阅“紧急”主题

  • NormalProcessor 订阅“普通”主题

我们使用 type_subscription() 装饰器让代理订阅特定的主题类型。

TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")


@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Urgent processor starting task {message.task_id}")
        await asyncio.sleep(1)  # Simulate work
        print(f"Urgent processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)


@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Normal processor starting task {message.task_id}")
        await asyncio.sleep(3)  # Simulate work
        print(f"Normal processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)

注册代理后,我们可以向“紧急”和“普通”主题发布消息。

runtime = SingleThreadedAgentRuntime()

await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))

runtime.start()

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1

收集结果

在前面的示例中,我们依靠控制台打印来验证任务完成情况。然而,在实际应用中,我们通常希望以编程方式收集和处理结果。

为了收集这些消息,我们将使用 ClosureAgent。我们定义了一个专用主题 TASK_RESULTS_TOPIC_TYPEUrgentProcessor 和 NormalProcessor 都将结果发布到此主题。然后 ClosureAgent 将处理来自此主题的消息。

queue = asyncio.Queue[TaskResponse]()


async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
    await queue.put(message)


runtime.start()

CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
    runtime,
    CLOSURE_AGENT_TYPE,
    collect_result,
    subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
while not queue.empty():
    print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')

直接消息

与前面的模式相反,此模式侧重于直接消息。这里我们演示了两种发送方式:

  • 代理之间的直接消息传递

  • 从运行时向特定代理发送消息

以下示例中需要考虑的事项:

  • 消息使用 AgentId 进行寻址。

  • 发送方可以预期接收来自目标代理的响应。

  • 我们只注册 WorkerAgent 类一次;但是,我们向两个不同的 worker 发送任务。

    • 如何实现?如代理生命周期所述,当使用 AgentId 传递消息时,运行时将获取实例,如果不存在则创建一个。在这种情况下,运行时在发送这两个消息时创建了两个 worker 实例。

class WorkerAgent(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"{self.id} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self.id} finished task {message.task_id}")
        return TaskResponse(task_id=message.task_id, result=f"Results by {self.id}")


class DelegatorAgent(RoutedAgent):
    def __init__(self, description: str, worker_type: str):
        super().__init__(description)
        self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]

    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"Delegator received task {message.task_id}.")

        subtask1 = Task(task_id="task-part-1")
        subtask2 = Task(task_id="task-part-2")

        worker1_result, worker2_result = await asyncio.gather(
            self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])
        )

        combined_result = f"Part 1: {worker1_result.result}, " f"Part 2: {worker2_result.result}"
        task_response = TaskResponse(task_id=message.task_id, result=combined_result)
        return task_response
runtime = SingleThreadedAgentRuntime()

await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Worker Agent"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Delegator Agent", "worker"))

runtime.start()

delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)

print(f"Final result: {response.result}")
await runtime.stop_when_idle()
Delegator received task main-task.
worker/worker-1 starting task task-part-1
worker/worker-2 starting task task-part-2
worker/worker-1 finished task task-part-1
worker/worker-2 finished task task-part-2
Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2

额外资源

如果您对并发处理的更多信息感兴趣,请查看 代理混合 模式,该模式严重依赖于并发代理。

 

《AI提示工程必知必会》为读者提供了丰富的AI提示工程知识与实战技能。《AI提示工程必知必会》主要内容包括各类提示词的应用,如问答式、指令式、状态类、建议式、安全类和感谢类提示词,以及如何通过实战演练掌握提示词的使用技巧;使用提示词进行文本摘要、改写重述、语法纠错、机器翻译等语言处理任务,以及在数据挖掘、程序开发等领域的应用;AI在绘画创作上的应用,百度文心一言和阿里通义大模型这两大智能平台的特性与功能,以及市场调研中提示词的实战应用。通过阅读《AI提示工程必知必会》,读者可掌握如何有效利用AI提示工程提升工作效率,创新工作流程,并在职场中脱颖而出。

 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐