AutoGen智能体开发:autogen_ext.runtimes.grpc包
autogen_ext.runtimes.grpc
class GrpcWorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | None = None, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None, payload_serialization_format: str = JSON_DATA_CONTENT_TYPE)[source]
基类: AgentRuntime
用于运行远程或跨语言代理的代理运行时。
代理消息使用来自 agent_worker.proto 的 protobuf 和来自 cloudevent.proto 的 CloudEvent。
跨语言代理还将要求所有代理对代理之间发送的任何消息类型使用共享的 protobuf 模式。
在后台任务中启动运行时。
立即停止运行时。
async stop_when_signal(signals: Sequence[Signals] = (signal.SIGTERM, signal.SIGINT)) → None[source]
当接收到信号时停止运行时。
async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) → Any[source]
向代理发送消息并获取响应。
参数:
-
message (Any) – 要发送的消息。
-
recipient (AgentId) – 接收消息的代理。
-
sender (AgentId | None, 可选) – 发送消息的代理。仅当消息不是由任何代理发送(例如直接发送到外部运行时)时才应为 None。默认为 None。
-
cancellation_token (CancellationToken | None, 可选) – 用于取消正在进行的令牌。默认为 None。
引发:
-
CantHandleException – 如果收件人无法处理消息。
-
UndeliverableException – 如果消息无法送达。
-
Other – 收件人引发的任何其他异常。
返回:
Any – 代理的响应。
async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) → None[source]
向给定命名空间中的所有代理发布消息;如果未提供命名空间,则发布到发送者的命名空间。
发布操作不期望任何响应。
参数:
-
message (Any) – 要发布的消息。
-
topic_id (TopicId) – 发布消息的主题。
-
sender (AgentId | None, 可选) – 发送消息的代理。默认为 None。
-
cancellation_token (CancellationToken | None, 可选) – 用于取消正在进行的令牌。默认为 None。
-
message_id (str | None, 可选) – 消息 ID。如果为 None,将生成一个新的消息 ID。默认为 None。此消息 ID 必须是唯一的,建议使用 UUID。
引发:
UndeliverableException – 如果消息无法送达。
async save_state() → Mapping[str, Any][source]
保存整个运行时(包括所有托管代理)的状态。恢复状态的唯一方法是将其传递给 load_state()。
状态的结构由实现定义,可以是任何 JSON 可序列化对象。
返回:
Mapping[str, Any] – 保存的状态。
async load_state(state: Mapping[str, Any]) → None[source]
加载整个运行时(包括所有托管代理)的状态。该状态应与 save_state() 返回的状态相同。
参数:
state (Mapping[str, Any]) – 保存的状态。
async agent_metadata(agent: AgentId) → AgentMetadata[source]
获取代理的元数据。
参数:
agent (AgentId) – 代理 ID。
返回:
AgentMetadata – 代理元数据。
async agent_save_state(agent: AgentId) → Mapping[str, Any][source]
保存单个代理的状态。
状态的结构由实现定义,可以是任何 JSON 可序列化对象。
参数:
agent (AgentId) – 代理 ID。
返回:
Mapping[str, Any] – 保存的状态。
async agent_load_state(agent: AgentId, state: Mapping[str, Any]) → None[source]
加载单个代理的状态。
参数:
async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) → AgentType[source]
向运行时注册与特定类型关联的代理工厂。类型必须唯一。此 API 不添加任何订阅。
注意
这是一个低级 API,通常应使用代理类的 register 方法,因为这也会自动处理订阅。
示例
from dataclasses import dataclass
from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage
@dataclass
class MyMessage:
content: str
class MyAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("My core agent")
@event
async def handler(self, message: UserMessage, context: MessageContext) -> None:
print("Event received: ", message.content)
async def my_agent_factory():
return MyAgent()
async def main() -> None:
runtime: AgentRuntime = ... # type: ignore
await runtime.register_factory("my_agent", lambda: MyAgent())
import asyncio
asyncio.run(main())
参数:
-
type (str) – 此工厂创建的代理类型。它与代理类名不同。type 参数用于区分不同的工厂函数,而不是代理类。
-
agent_factory (Callable[[], T]) – 创建代理的工厂,其中 T 是具体的 Agent 类型。在工厂内部,使用 autogen_core.AgentInstantiationContext 访问当前运行时和代理 ID 等变量。
-
expected_class (type[T] | None, 可选) – 代理的预期类,用于工厂的运行时验证。默认为 None。如果为 None,则不执行验证。
async register_agent_instance(agent_instance: Agent, agent_id: AgentId) → AgentId[source]
向运行时注册代理实例。类型可以重用,但每个 agent_id 必须唯一。同一类型中的所有代理实例必须是相同的对象类型。此 API 不添加任何订阅。
注意
这是一个低级 API,通常应使用代理类的 register_instance 方法,因为这也会自动处理订阅。
示例
from dataclasses import dataclass
from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage
@dataclass
class MyMessage:
content: str
class MyAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("My core agent")
@event
async def handler(self, message: UserMessage, context: MessageContext) -> None:
print("Event received: ", message.content)
async def main() -> None:
runtime: AgentRuntime = ... # type: ignore
agent = MyAgent()
await runtime.register_agent_instance(
agent_instance=agent, agent_id=AgentId(type="my_agent", key="default")
)
import asyncio
asyncio.run(main())
参数:
async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) → T[source]
尝试通过名称和命名空间获取底层代理实例。这通常不被鼓励(因此名称很长),但在某些情况下可能很有用。
如果底层代理无法访问,这将引发异常。
参数:
-
id (AgentId) – 代理 ID。
-
type (Type[T], 可选) – 代理的预期类型。默认为 Agent。
返回:
T – 具体代理实例。
引发:
-
LookupError – 如果找不到代理。
-
NotAccessibleError – 如果代理不可访问,例如它位于远程位置。
-
TypeError – 如果代理不是预期类型。
async add_subscription(subscription: Subscription) → None[source]
添加一个新的订阅,运行时在处理已发布消息时应履行该订阅
参数:
subscription (Subscription) – 要添加的订阅
async remove_subscription(id: str) → None[source]
从运行时中移除订阅
参数:
id (str) – 要移除的订阅 ID
引发:
LookupError – 如果订阅不存在
async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) → AgentId[source]
add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) → None[source]
向运行时添加新的消息序列化器
注意:这将根据 type_name 和 data_content_type 属性对序列化器进行去重
参数:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器
class GrpcWorkerAgentRuntimeHost(address: str, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None)[source]
基类: object
在后台任务中启动服务器。
async stop(grace: int = 5) → None[source]
停止服务器。
async stop_when_signal(grace: int = 5, signals: Sequence[Signals] = (signal.SIGTERM, signal.SIGINT)) → None[source]
当接收到信号时停止服务器。
class GrpcWorkerAgentRuntimeHostServicer[source]
基类: AgentRpcServicer
一个 gRPC 服务器,用于托管代理的消息传递服务。
async OpenChannel(request_iterator: AsyncIterator[Message], context: ServicerContext[Message, Message]) → AsyncIterator[Message][source]
在 .proto 文件中缺少相关的文档注释。
async OpenControlChannel(request_iterator: AsyncIterator[ControlMessage], context: ServicerContext[ControlMessage, ControlMessage]) → AsyncIterator[ControlMessage]#
在 .proto 文件中缺少相关的文档注释。
async RegisterAgent(request: RegisterAgentTypeRequest, context: ServicerContext[RegisterAgentTypeRequest, RegisterAgentTypeResponse]) → RegisterAgentTypeResponse[source]
在 .proto 文件中缺少相关的文档注释。
async AddSubscription(request: AddSubscriptionRequest, context: ServicerContext[AddSubscriptionRequest, AddSubscriptionResponse]) → AddSubscriptionResponse[source]
在 .proto 文件中缺少相关的文档注释。
async RemoveSubscription(request: RemoveSubscriptionRequest, context: ServicerContext[RemoveSubscriptionRequest, RemoveSubscriptionResponse]) → RemoveSubscriptionResponse[source]
在 .proto 文件中缺少相关的文档注释。
async GetSubscriptions(request: GetSubscriptionsRequest, context: ServicerContext[GetSubscriptionsRequest, GetSubscriptionsResponse]) → GetSubscriptionsResponse[source]
在 .proto 文件中缺少相关的文档注释。
《AI提示工程必知必会》为读者提供了丰富的AI提示工程知识与实战技能。《AI提示工程必知必会》主要内容包括各类提示词的应用,如问答式、指令式、状态类、建议式、安全类和感谢类提示词,以及如何通过实战演练掌握提示词的使用技巧;使用提示词进行文本摘要、改写重述、语法纠错、机器翻译等语言处理任务,以及在数据挖掘、程序开发等领域的应用;AI在绘画创作上的应用,百度文心一言和阿里通义大模型这两大智能平台的特性与功能,以及市场调研中提示词的实战应用。通过阅读《AI提示工程必知必会》,读者可掌握如何有效利用AI提示工程提升工作效率,创新工作流程,并在职场中脱颖而出。

更多推荐
所有评论(0)