AI Agents 开发实战:从单体 Agent 到多 Agent 协作系统
摘要 本文是《Azure AI全栈实践》系列第4篇,重点探讨AI Agent系统的演进与实践。从单一LLM调用到多Agent协作的复杂系统,文章系统介绍了Microsoft Agent Framework(MAF)的核心架构、Foundry Agent Service的生产级能力,以及五大多Agent编排模式。内容涵盖Agent-to-Agent(A2A)协议实战、持久化记忆实现、企业级工作流设计
系列:Azure AI 全栈实践 第 4 篇(共 6 篇)
发布:2026‑03
阅读时长:约 35 分钟
标签:Azure AI Foundry·Microsoft Agent Framework·Multi-Agent·A2A·MCP·AutoGen·Semantic Kernel·企业AI
摘要
从一个单一 LLM 调用到能够自主规划、使用工具、协作完成复杂任务的 AI Agent 系统——这条路远比想象中陡峭。本文将带你系统掌握:Microsoft Agent Framework(MAF) 的核心架构、Foundry Agent Service 的生产级能力、五大多 Agent 编排模式、Agent‑to‑Agent(A2A)协议实战,以及从单体 Agent 平滑演进到多 Agent 协作系统的工程路径。所有代码示例均基于 azure‑ai‑projects==2.0.0b3,可直接在 Azure AI Foundry 中运行。
目录
- 为什么需要 AI Agents?演进路径全景
- Microsoft Agent Framework:统一架构
- Foundry Agent Service:生产级基础设施
- 单体 Agent 实战:工具调用全链路
- 五大编排模式:多 Agent 协作蓝图
- A2A 协议与 MCP:跨系统 Agent 互联
- 持久化记忆:让 Agent 记住你
- 生产级多 Agent 系统:企业知识工作流
- 可观测性与评估:监控 Agent 行为
- 安全、合规与人机协同
- 总结与博客 #5 预告
- 参考资料
一、为什么需要 AI Agents?演进路径全景
1.1 从 LLM 调用到 Agentic AI
大多数企业的 AI 应用演进经历三个阶段:
阶段一:单次 LLM 调用
用户输入 → [GPT‑5.2] → 单次响应
局限:无工具、无记忆、无规划
阶段二:RAG 增强(博客 #3)
用户输入 → 向量检索 → [GPT‑5.2 + 上下文] → 引用响应
局限:被动检索,无法主动执行
阶段三:Agentic AI(本篇)
用户目标 → [Agent 规划] → 工具调用循环 → 多 Agent 协作 → 目标完成
能力:自主规划、工具执行、跨域协作、持久记忆
什么时候需要 Agent? 当你的任务满足以下任一条件:
- 需要 多步骤规划(无法一次 LLM 调用完成)
- 需要 调用外部工具(API、数据库、代码执行)
- 需要 与其他系统或 Agent 协作
- 需要 跨会话持久化状态(长期记忆)
- 需要 自适应决策(根据中间结果调整策略)
1.2 企业 Agentic AI 成熟度模型
┌─────────────────────────────────────────────────────────────────┐
│ 企业 Agent 成熟度矩阵(2026 Q1 现状) │
├────────────┬──────────────────────┬────────────────────────────┤
│ 级别 │ 能力 │ 代表场景 │
├────────────┼──────────────────────┼────────────────────────────┤
│ L1 助手级 │ 单次对话、有限工具 │ 客服问答、文档摘要 │
│ L2 工具级 │ 函数调用、代码解释器 │ 数据分析、报告生成 │
│ L3 自主级 │ 多步规划、自主执行 │ 研究助手、DevOps 自动化 │
│ L4 协作级 │ 多 Agent 编排、A2A │ 企业工作流、跨部门协同 │
│ L5 生态级 │ 跨组织 Agent 网络 │ 供应链优化、行业 Agent 市场 │
└────────────┴──────────────────────┴────────────────────────────┘
当前大多数企业处于 L1-L2,本文带你跨越到 L3-L4
1.3 技术栈全景:MAF 的位置
┌──────────────────────────────────────────────────────────────┐
│ 应用层(你的 Agent App) │
├──────────────────────────────────────────────────────────────┤
│ Microsoft Agent Framework (MAF) │
│ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐ │
│ │ Agent Runtime │ │ Orchestration │ │ Memory │ │
│ │ (执行引擎) │ │ (编排引擎) │ │ (记忆层) │ │
│ └─────────────────┘ └──────────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────────┤
│ Azure AI Foundry Agent Service │
│ 工具集成 · 代码解释器 · 文件搜索 · 计算机使用 · 持久线程 │
├───────────────────┬──────────────────────────────────────────┤
│ 协议层 │ MCP (Model Context Protocol) │
│ │ A2A (Agent-to-Agent Protocol) │
├───────────────────┼──────────────────────────────────────────┤
│ 模型层 │ GPT‑5.2 · o3 · GPT‑4.1‑mini · ... │
├───────────────────┼──────────────────────────────────────────┤
│ 知识层 │ Azure AI Search · Foundry IQ │
│ (博客 #3 内容) │ 向量索引 · 混合检索 · Cohere Rerank 4 │
└───────────────────┴──────────────────────────────────────────┘
二、Microsoft Agent Framework:统一架构
2.1 MAF 的诞生背景
在 MAF 之前,开发者面临痛苦的选择困境:
| 框架 | 优势 | 劣势 |
|---|---|---|
| Semantic Kernel | 生产稳定、企业级可观测性、丰富连接器 | 编排能力相对保守,多 Agent 模式有限 |
| AutoGen | 实验性多 Agent、群聊编排领先 | 生产就绪度低,缺乏持久化、合规能力 |
| LangChain/LangGraph | 社区庞大、生态丰富 | 企业级支持弱,与 Azure 原生集成不深 |
Microsoft 的解法:将 Semantic Kernel 的稳定性与 AutoGen 的编排创新合并,创造 Microsoft Agent Framework(MAF)。
Semantic Kernel ──┐
├──► Microsoft Agent Framework (MAF)
AutoGen ─────┘
合并后:两个框架的最佳能力 + 企业级生产就绪
2.2 MAF 四大支柱
支柱一:开放标准与互操作性
┌─────────────────────────────────────────────────────┐
│ 开放协议栈 │
│ │
│ MCP(Model Context Protocol) │
│ ● 工具发现与调用的标准协议 │
│ ● 类比:HTTP 之于 Web,MCP 之于 AI 工具 │
│ │
│ A2A(Agent-to-Agent Protocol) │
│ ● Agent 间通信的标准协议(Google + Microsoft 联合)│
│ ● 支持跨组织、跨云 Agent 协作 │
│ │
│ OpenAPI First │
│ ● 任何 REST API → 自动转为 Agent 工具 │
│ ● 无需手写 Function Schema │
└─────────────────────────────────────────────────────┘
支柱二:五种编排模式(详见第五章)
- Sequential(顺序)
- Concurrent(并发)
- Handoff(交接)
- Group Chat(群聊)
- Magentic(磁力/分层监督)
支柱三:可扩展的记忆存储
# MAF 支持的记忆后端(可插拔)
MEMORY_BACKENDS = {
"redis": "Azure Cache for Redis",
"qdrant": "Qdrant Vector DB",
"pinecone": "Pinecone",
"elasticsearch":"Elasticsearch",
"postgres": "PostgreSQL + pgvector",
"ai_search": "Azure AI Search(推荐生产环境)",
"mem0": "Mem0 托管记忆服务",
}
支柱四:生产就绪
✅ OpenTelemetry 分布式追踪(开箱即用)
✅ Azure AI Content Safety 内容过滤
✅ Entra ID 身份验证 + RBAC
✅ 持久化工作流(Agent 崩溃可恢复)
✅ Human-in-the-loop 审批节点
✅ GitHub Actions / Azure DevOps CI/CD 集成
2.3 与旧框架的迁移路径
# === Semantic Kernel 旧写法 ===
# kernel = Kernel()
# kernel.add_plugin(MyPlugin(), plugin_name="my_plugin")
# result = await kernel.invoke("my_plugin", "my_function", param=value)
# === MAF 新写法 ===
from azure.ai.agents import Agent, AgentTool
@AgentTool(description="执行我的业务逻辑")
async def my_function(param: str) -> str:
return f"处理结果: {param}"
agent = Agent(
name="my-agent",
model="gpt-5.2",
tools=[my_function],
)
# === AutoGen 旧写法 ===
# assistant = AssistantAgent("assistant", llm_config=llm_config)
# user_proxy = UserProxyAgent("user_proxy", human_input_mode="NEVER")
# user_proxy.initiate_chat(assistant, message="...")
# === MAF 新写法(AutoGen 用户迁移)===
from azure.ai.agents import ChatAgent, AgentWorkflow
analyst = ChatAgent(name="analyst", model="gpt-5.2", tools=[...])
writer = ChatAgent(name="writer", model="gpt-4.1-mini", tools=[...])
workflow = AgentWorkflow()
workflow.add_handoff(from_agent=analyst, to_agent=writer,
condition="分析完成后交给 writer 撰写报告")
三、Foundry Agent Service:生产级基础设施
3.1 核心能力矩阵
┌─────────────────────────────────────────────────────────────────────┐
│ Foundry Agent Service 工具箱 │
├──────────────────────┬────────────────────────────────────────────┤
│ 工具类型 │ 说明 │
├──────────────────────┼────────────────────────────────────────────┤
│ Function Calling │ 自定义 Python/REST 函数,博客 #2 已详述 │
│ Code Interpreter │ 沙盒 Python 执行环境,支持文件上传下载 │
│ File Search │ 向量化文件搜索,支持 PDF/Word/PPTX │
│ Web Search │ Bing 实时网页搜索 │
│ Azure AI Search │ 企业知识库检索(博客 #3 内容) │
│ Computer Use ⚡NEW │ 截图理解+UI 操作,实现 RPA 级自动化 │
│ MCP Servers │ 标准化工具发现,连接第三方服务 │
│ Agent-to-Agent (A2A)│ 跨 Agent 调用,支持鉴权 │
└──────────────────────┴────────────────────────────────────────────┘
⚡NEW = 2026 Q1 新增预览功能
3.2 Foundry Agent Service vs 自建 Agent
┌──────────────────────────────────────────────────────────────────┐
│ Foundry Agent Service vs 自建 Agent(LangChain) │
├─────────────────────────────────────────────────────────────────┤
│ 线程管理:托管(自动持久化) 手动管理 DB │
│ 工具集成:内置 10+ 工具 手写 function wrapper │
│ 安全性:Entra ID 原生支持 自行实现 auth │
│ 可观测性:内置 OTel traces 手动集成 │
│ 代码执行:隔离沙盒 自建沙盒(复杂) │
│ SLA:99.9% 微软保证 自行负责 │
│ 迁移成本:零(SDK 标准接口) 框架绑定 │
│ 推荐场景:企业生产环境 快速原型 / 特殊定制 │
└──────────────────────────────────────────────────────────────────┘
3.3 环境配置
# .env(开发环境,勿提交 Git)
# Azure AI Foundry 项目端点
AZURE_AI_PROJECT_ENDPOINT=https://your-project.services.ai.azure.com/api/projects/your-project
# Agent 部署模型(推荐按复杂度分级)
AGENT_MODEL_HIGH=gpt-5.2 # 复杂推理、多步规划
AGENT_MODEL_MID=gpt-4.1 # 标准业务 Agent
AGENT_MODEL_LOW=gpt-4.1-mini # 简单任务、高并发场景
# 工具配置
BING_CONNECTION_ID=/subscriptions/.../connections/bing-search
AI_SEARCH_CONNECTION_ID=/subscriptions/.../connections/ai-search
# 记忆存储
REDIS_CONNECTION_STRING=redis://...@your-cache.redis.cache.windows.net:6380
MEM0_API_KEY=m0-...
# 可观测性
APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=...
# 安装所有依赖
pip install azure-ai-projects==2.0.0b3 azure-identity azure-ai-agents openai azure-monitor-opentelemetry tenacity redis mem0ai python-dotenv
3.4 统一客户端初始化
# agent_client.py - 统一客户端工厂
import os
from functools import lru_cache
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
load_dotenv()
@lru_cache(maxsize=1)
def get_project_client() -> AIProjectClient:
"""获取全局单例 AIProjectClient(支持本地/CI/Azure 三种环境)"""
credential = DefaultAzureCredential()
client = AIProjectClient(
endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
credential=credential,
)
return client
def setup_observability():
"""初始化 Azure Monitor + OpenTelemetry 追踪"""
conn_str = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
if conn_str:
from azure.monitor.opentelemetry import configure_azure_monitor
configure_azure_monitor(connection_string=conn_str)
print("✅ Azure Monitor 追踪已启用")
else:
print("⚠️ 未配置 Application Insights,使用控制台追踪")
# 应用启动时调用
setup_observability()
四、单体 Agent 实战:工具调用全链路
4.1 创建你的第一个生产级 Agent
# single_agent_demo.py
import os, json
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import (
AgentEventHandler,
CodeInterpreterTool,
FileSearchTool,
BingGroundingTool,
FunctionTool,
ToolSet,
MessageDeltaChunk,
RunStepDeltaChunk,
ThreadMessage,
ThreadRun,
RunStep,
)
from azure.identity import DefaultAzureCredential
from agent_client import get_project_client
def get_toolset() -> ToolSet:
"""组装工具集(按需启用)"""
project = get_project_client()
toolset = ToolSet()
# 工具 1:代码解释器(数据分析、图表生成)
toolset.add(CodeInterpreterTool())
# 工具 2:文件搜索(上传的 PDF/Word 企业文档)
toolset.add(FileSearchTool(vector_store_ids=[
os.environ.get("VECTOR_STORE_ID", "")
]))
# 工具 3:Bing 实时搜索(获取最新信息)
bing_conn = project.connections.get(
connection_name=os.environ.get("BING_CONNECTION_NAME", "bing-grounding")
)
toolset.add(BingGroundingTool(connection_id=bing_conn.id))
# 工具 4:自定义函数(连接内部系统)
toolset.add(FunctionTool(functions=get_enterprise_functions()))
return toolset
def get_enterprise_functions() -> set:
"""注册企业内部业务函数"""
import json
def query_sales_data(
start_date: str,
end_date: str,
region: str = "all",
granularity: str = "monthly"
) -> str:
"""
查询销售数据。
:param start_date: 开始日期,格式 YYYY-MM-DD
:param end_date: 结束日期,格式 YYYY-MM-DD
:param region: 区域,可选 north/south/east/west/all
:param granularity: 粒度,可选 daily/weekly/monthly
:return: JSON 格式销售数据
"""
# 实际生产中调用数据仓库 API
mock_data = {
"period": f"{start_date} ~ {end_date}",
"region": region,
"total_revenue": 12_580_000,
"growth_rate": 0.183,
"top_products": ["产品A", "产品B", "产品C"],
"units_sold": 8423,
}
return json.dumps(mock_data, ensure_ascii=False)
def create_incident_ticket(
title: str,
priority: str,
description: str,
assignee: str = "auto"
) -> str:
"""
创建 IT 事故工单。
:param title: 工单标题
:param priority: 优先级 P1/P2/P3/P4
:param description: 问题描述
:param assignee: 负责人(auto 自动分配)
:return: 工单 ID
"""
ticket_id = f"INC-{hash(title) % 100000:05d}"
return json.dumps({
"ticket_id": ticket_id,
"status": "created",
"assigned_to": "on-call-engineer" if assignee == "auto" else assignee,
"estimated_sla": "4h" if priority in ["P1", "P2"] else "24h",
}, ensure_ascii=False)
return {query_sales_data, create_incident_ticket}
# ===== 流式事件处理器 =====
class EnterpriseAgentEventHandler(AgentEventHandler):
"""处理 Agent 流式输出事件"""
def on_message_delta(self, delta: MessageDeltaChunk) -> None:
"""逐字流式打印 Agent 输出"""
if delta.text:
print(delta.text, end="", flush=True)
def on_run_step_delta(self, delta: RunStepDeltaChunk) -> None:
"""打印工具调用进度"""
if delta.delta.step_details:
details = delta.delta.step_details
if hasattr(details, "tool_calls"):
for tc in (details.tool_calls or []):
if hasattr(tc, "function") and tc.function:
func_name = tc.function.name or ""
if func_name:
print(f"\n🔧 调用工具: {func_name}", flush=True)
def on_thread_message(self, message: ThreadMessage) -> None:
print(f"\n✅ 消息完成 [role={message.role}]")
def on_thread_run(self, run: ThreadRun) -> None:
print(f"\n📊 Run 状态: {run.status}")
def on_run_step(self, step: RunStep) -> None:
if step.type == "tool_calls":
print(f"\n⚙️ 执行步骤: {step.type} [{step.status}]")
def on_error(self, data: str) -> None:
print(f"\n❌ 错误: {data}")
def on_done(self) -> None:
print("\n🎯 Agent 运行完成")
# ===== 主要 Agent 类 =====
class EnterpriseAnalystAgent:
"""企业数据分析 Agent(单体)"""
SYSTEM_PROMPT = """你是一位企业数据分析专家,专注于销售数据分析与业务洞察。
**能力范围**:
- 查询并分析销售数据(支持时间范围、区域、产品筛选)
- 执行 Python 代码进行统计分析和可视化
- 搜索最新行业趋势和竞争情报
- 查阅公司内部知识库文档
- 创建 IT 工单处理技术问题
**输出规范**:
1. 数据分析结论须附具体数字支撑
2. 建议须标注置信度(高/中/低)
3. 图表使用 matplotlib,确保中文字体正确
4. 所有数据引用须标注来源
**禁止事项**:
- 不对未经查询的数据做假设
- 不承诺无法核实的指标
"""
def __init__(self):
self.project = get_project_client()
self.toolset = get_toolset()
self._agent = None
self._thread = None
def _ensure_agent(self):
"""懒加载:首次调用时创建 Agent(复用已有 Agent)"""
if self._agent is None:
# 生产建议:按 Agent 名搜索已有实例,避免重复创建
self._agent = self.project.agents.create_agent(
model=os.environ.get("AGENT_MODEL_HIGH", "gpt-5.2"),
name="enterprise-analyst-v2",
instructions=self.SYSTEM_PROMPT,
toolset=self.toolset,
temperature=0.1, # 数据分析要确定性
response_format={"type": "auto"}, # 允许混合文本+结构化
metadata={
"version": "2.0",
"team": "data-platform",
"env": os.getenv("ENVIRONMENT", "dev"),
},
)
print(f"🤖 Agent 已创建: {self._agent.id}")
def _ensure_thread(self):
"""创建或恢复对话线程"""
if self._thread is None:
self._thread = self.project.agents.create_thread()
print(f"💬 线程已创建: {self._thread.id}")
def chat(self, user_message: str, stream: bool = True) -> str:
"""发送消息并获取响应"""
self._ensure_agent()
self._ensure_thread()
# 添加用户消息
self.project.agents.create_message(
thread_id=self._thread.id,
role="user",
content=user_message,
)
print(f"\n{'='*60}")
print(f"👤 用户: {user_message}")
print(f"{'='*60}")
print("🤖 Agent: ", end="")
if stream:
# 流式输出
with self.project.agents.run_stream(
thread_id=self._thread.id,
agent_id=self._agent.id,
event_handler=EnterpriseAgentEventHandler(),
max_completion_tokens=4096,
) as stream_ctx:
stream_ctx.until_done()
else:
# 非流式(适合批处理)
run = self.project.agents.create_and_process_run(
thread_id=self._thread.id,
agent_id=self._agent.id,
)
if run.status == "failed":
raise RuntimeError(f"Agent run failed: {run.last_error}")
# 获取最新响应
messages = self.project.agents.list_messages(
thread_id=self._thread.id
)
latest = messages.get_last_text_message_by_role("assistant")
return latest.text.value if latest else ""
def cleanup(self):
"""清理资源(生产中通常保留 Agent,仅在版本更新时删除)"""
if self._agent:
self.project.agents.delete_agent(self._agent.id)
print(f"🗑️ Agent {self._agent.id} 已删除")
# ===== 使用示例 =====
if __name__ == "__main__":
agent = EnterpriseAnalystAgent()
# 示例 1:数据分析
agent.chat("分析 2026 年 1 月的销售数据,重点关注增长异常的区域")
# 示例 2:结合实时搜索
agent.chat("对比我们的 18.3% 增速与行业平均水平,给出竞争建议")
# 示例 3:代码执行 + 可视化
agent.chat("用 Python 画出过去 3 个月的销售趋势折线图")
4.2 Code Interpreter:沙盒代码执行
Code Interpreter 是 Agent 最强大的工具之一,让 Agent 能够自主编写并执行 Python 代码:
# code_interpreter_advanced.py
import os
from io import BytesIO
from azure.ai.projects.models import CodeInterpreterTool, FileSearchTool
from agent_client import get_project_client
def create_data_analyst_agent_with_files():
"""创建带文件上传能力的数据分析 Agent"""
project = get_project_client()
# 上传分析文件
with open("sales_data_2025.csv", "rb") as f:
uploaded_file = project.agents.upload_file_and_poll(
file=f,
purpose="assistants", # 供 Agent 使用
)
print(f"📎 文件上传成功: {uploaded_file.id}")
# 创建带 Code Interpreter 的 Agent
agent = project.agents.create_agent(
model=os.environ.get("AGENT_MODEL_HIGH", "gpt-5.2"),
name="data-analyst-with-files",
instructions="""你是专业的数据分析师。
使用上传的销售 CSV 文件进行分析:
1. 先用 pandas 读取并探索数据结构
2. 执行描述性统计
3. 生成可视化图表(使用 matplotlib,设置中文字体)
4. 输出关键洞察
分析完成后,将图表保存并提供下载链接。""",
tools=[CodeInterpreterTool()],
tool_resources={
"code_interpreter": {
"file_ids": [uploaded_file.id]
}
},
)
thread = project.agents.create_thread()
project.agents.create_message(
thread_id=thread.id,
role="user",
content="分析销售数据,找出 TOP 5 产品并生成对比柱状图"
)
run = project.agents.create_and_process_run(
thread_id=thread.id,
agent_id=agent.id,
)
# 下载生成的图片
messages = project.agents.list_messages(thread_id=thread.id)
for msg in messages.data:
for attachment in (msg.attachments or []):
if attachment.file_id:
image_data = project.agents.get_file_content(attachment.file_id)
with open(f"chart_{attachment.file_id[:8]}.png", "wb") as out:
out.write(image_data.read())
print(f"🖼️ 图表已保存: chart_{attachment.file_id[:8]}.png")
# 清理文件(生产中定期清理避免存储费用)
project.agents.delete_file(uploaded_file.id)
project.agents.delete_agent(agent.id)
五、五大编排模式:多 Agent 协作蓝图
5.1 为什么需要多 Agent 编排?
单体 Agent 的局限性:
❌ 单体 Agent 问题:
• "全知全能"提示词过长 → Token 浪费 + 性能下降
• 工具过多 → 选择混乱,错误调用率上升
• 无法专业分工 → 代码 Agent 被迫做设计决策
• 单点故障 → 一个工具异常影响整个流程
• 无法并行 → 串行处理导致延迟高
✅ 多 Agent 解法:
• 每个 Agent 专注一个领域(专业分工)
• 并行执行相互独立的任务(速度提升 3-5x)
• 故障隔离(一个 Agent 失败不影响其他 Agent)
• 独立可测试、可优化、可替换
5.2 五大编排模式详解
模式一:Sequential(顺序编排)
适用场景:流水线式任务,A 完成后传给 B,B 传给 C
用户请求 → [研究 Agent] → 资料 → [分析 Agent] → 洞察 → [报告 Agent] → 最终报告
# sequential_workflow.py
from azure.ai.projects.models import AgentEventHandler
from agent_client import get_project_client
class ResearchReportPipeline:
"""顺序编排:研究 → 分析 → 报告 三段流水线"""
def __init__(self):
self.project = get_project_client()
self._agents = {}
def _get_or_create_agent(self, name: str, instructions: str, model: str):
if name not in self._agents:
agent = self.project.agents.create_agent(
model=model,
name=name,
instructions=instructions,
)
self._agents[name] = agent
return self._agents[name]
def run(self, topic: str) -> dict:
"""执行完整研究报告流水线"""
# === 阶段一:研究 Agent(高推理能力)===
researcher = self._get_or_create_agent(
name="researcher",
model="gpt-5.2",
instructions="""你是研究专家,专注于信息收集和事实核查。
输出格式:JSON,包含 key_facts、sources、confidence_level。
不做主观判断,只呈现客观事实。""",
)
thread_r = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=thread_r.id,
role="user",
content=f"研究主题:{topic}\n收集关键事实、数据和可靠来源。"
)
run_r = self.project.agents.create_and_process_run(
thread_id=thread_r.id, agent_id=researcher.id
)
research_result = self._get_last_response(thread_r.id)
print(f"✅ 研究阶段完成 ({len(research_result)} 字符)")
# === 阶段二:分析 Agent(中等推理)===
analyst = self._get_or_create_agent(
name="analyst",
model="gpt-4.1",
instructions="""你是商业分析专家,专注于洞察提炼和趋势判断。
输入:研究资料(JSON)
输出:分析报告(JSON),包含 key_insights、risks、opportunities、recommendations。""",
)
thread_a = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=thread_a.id,
role="user",
content=f"基于以下研究资料进行深度分析:\n{research_result}"
)
run_a = self.project.agents.create_and_process_run(
thread_id=thread_a.id, agent_id=analyst.id
)
analysis_result = self._get_last_response(thread_a.id)
print(f"✅ 分析阶段完成 ({len(analysis_result)} 字符)")
# === 阶段三:报告 Agent(写作能力)===
writer = self._get_or_create_agent(
name="writer",
model="gpt-4.1-mini", # 写作任务不需要最强推理,节省成本
instructions="""你是商业写作专家,擅长将技术分析转化为高管可读的报告。
格式要求:Markdown,包含执行摘要、详细分析、行动建议、附录。
风格:简洁专业,数据支撑,可操作性强。""",
)
thread_w = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=thread_w.id,
role="user",
content=f"撰写关于"{topic}"的专业报告。\n研究资料:{research_result}\n分析结论:{analysis_result}"
)
run_w = self.project.agents.create_and_process_run(
thread_id=thread_w.id, agent_id=writer.id
)
final_report = self._get_last_response(thread_w.id)
print(f"✅ 报告阶段完成 ({len(final_report)} 字符)")
return {
"topic": topic,
"research": research_result,
"analysis": analysis_result,
"report": final_report,
}
def _get_last_response(self, thread_id: str) -> str:
messages = self.project.agents.list_messages(thread_id=thread_id)
latest = messages.get_last_text_message_by_role("assistant")
return latest.text.value if latest else ""
模式二:Concurrent(并发编排)
适用场景:相互独立的子任务,可并行执行,最后汇总
┌──[市场分析 Agent]──┐
用户请求 ──[分发]──┼──[技术评估 Agent]──┼──[汇总 Agent]──► 综合报告
└──[财务分析 Agent]──┘
# concurrent_workflow.py
import asyncio
from agent_client import get_project_client
async def run_agent_task(project, agent_id: str, prompt: str) -> str:
"""异步执行单个 Agent 任务"""
thread = project.agents.create_thread()
project.agents.create_message(
thread_id=thread.id,
role="user",
content=prompt
)
run = project.agents.create_and_process_run(
thread_id=thread.id,
agent_id=agent_id,
)
messages = project.agents.list_messages(thread_id=thread.id)
latest = messages.get_last_text_message_by_role("assistant")
return latest.text.value if latest else ""
async def parallel_due_diligence(company: str) -> dict:
"""并发尽职调查:三个维度同时分析,速度提升约 3x"""
project = get_project_client()
# 创建三个专业 Agent
agents = {
"market": project.agents.create_agent(
model="gpt-5.2", name="market-analyst",
instructions="专注市场规模、竞争格局、增长趋势分析。输出 JSON。"
),
"tech": project.agents.create_agent(
model="gpt-5.2", name="tech-evaluator",
instructions="专注技术架构、研发能力、专利、技术债务评估。输出 JSON。"
),
"finance": project.agents.create_agent(
model="gpt-4.1", name="finance-analyst",
instructions="专注财务报表、现金流、估值、风险评估。输出 JSON。"
),
}
# 并发执行(asyncio.gather 并行)
market_task = run_agent_task(project, agents["market"].id,
f"分析 {company} 的市场地位和竞争优势")
tech_task = run_agent_task(project, agents["tech"].id,
f"评估 {company} 的技术栈和研发能力")
finance_task = run_agent_task(project, agents["finance"].id,
f"分析 {company} 2025 年财务状况")
# 等待所有并发任务完成(比顺序执行快约 3x)
market_result, tech_result, finance_result = await asyncio.gather(
market_task, tech_task, finance_task
)
print("✅ 三项并发分析全部完成")
# 汇总 Agent:综合三方结论
synthesizer = project.agents.create_agent(
model="gpt-5.2", name="synthesizer",
instructions="将市场、技术、财务三方分析整合为投资决策报告。"
)
synthesis = await run_agent_task(
project, synthesizer.id,
f"""综合以下三方分析,输出投资建议报告:
市场分析:{market_result}
技术评估:{tech_result}
财务分析:{finance_result}"""
)
return {
"company": company,
"market": market_result,
"technology": tech_result,
"finance": finance_result,
"recommendation": synthesis,
}
模式三:Handoff(交接编排)
适用场景:根据对话上下文,动态将控制权转交给最合适的 Agent
用户说"我想退款" → [路由 Agent] ──► [退款专员 Agent]
用户说"产品故障" → [路由 Agent] ──► [技术支持 Agent]
用户说"我要投诉" → [路由 Agent] ──► [投诉处理 Agent]
# handoff_workflow.py
from azure.ai.projects.models import AgentNameTarget
from agent_client import get_project_client
def create_customer_service_handoff_system():
"""客户服务:基于意图的动态交接"""
project = get_project_client()
# 专业 Agent 定义
agents_config = {
"triage": {
"name": "triage-agent",
"model": "gpt-4.1-mini", # 路由只需轻量模型
"instructions": """你是客服路由 Agent,负责判断用户意图并转接。
意图分类规则:
- 退款/退货/订单问题 → 转接 refund-agent
- 技术故障/使用问题 → 转接 tech-support-agent
- 投诉/建议/表扬 → 转接 feedback-agent
- 其他 → 自行处理
输出格式:直接使用 handoff 工具转接,不要解释。""",
},
"refund": {
"name": "refund-agent",
"model": "gpt-4.1",
"instructions": """你是退款专员,专注处理退款、退货、订单问题。
你有权限查询订单状态并发起退款申请。
处理流程:1)核实身份 2)查询订单 3)评估退款资格 4)处理申请 5)告知结果""",
},
"tech_support": {
"name": "tech-support-agent",
"model": "gpt-5.2", # 技术问题需要强推理
"instructions": """你是技术支持专家,处理产品故障和使用问题。
解决流程:1)收集问题详情 2)远程诊断 3)提供解决方案 4)验证修复 5)预防建议""",
},
"feedback": {
"name": "feedback-agent",
"model": "gpt-4.1-mini",
"instructions": "处理用户投诉、建议和表扬,记录并反馈处理结果。",
},
}
created_agents = {}
for key, cfg in agents_config.items():
created_agents[key] = project.agents.create_agent(
model=cfg["model"],
name=cfg["name"],
instructions=cfg["instructions"],
)
# 创建共享线程(所有 Agent 共用同一对话上下文)
thread = project.agents.create_thread()
return created_agents, thread
模式四:Group Chat(群聊编排)
适用场景:多个专家 Agent 共同讨论,迭代优化直到达成共识
┌─ [架构师 Agent] ─┐
用户提出需求 → ├─ [安全专家 Agent]─┼──► 多轮讨论 ──► 最终设计方案
└─ [成本顾问 Agent]─┘
# group_chat_workflow.py
# 群聊模式:多 Agent 专家圆桌讨论
# 注:MAF 原生群聊 API 仍在 Preview,以下为等效实现
import json
from dataclasses import dataclass, field
from typing import List
from agent_client import get_project_client
@dataclass
class AgentMessage:
agent_name: str
content: str
round: int
class ExpertRoundtable:
"""专家圆桌:多 Agent 迭代讨论直到达成共识"""
EXPERTS = [
{
"name": "architect",
"role": "系统架构师",
"focus": "技术可行性、扩展性、性能",
"model": "gpt-5.2",
},
{
"name": "security",
"role": "安全专家",
"focus": "安全威胁、合规要求、数据保护",
"model": "gpt-5.2",
},
{
"name": "cost",
"role": "成本顾问",
"focus": "TCO、ROI、运营成本优化",
"model": "gpt-4.1",
},
]
def __init__(self, max_rounds: int = 3):
self.project = get_project_client()
self.max_rounds = max_rounds
self._agents = {}
def discuss(self, proposal: str) -> List[AgentMessage]:
"""组织专家圆桌讨论"""
history: List[AgentMessage] = []
context = proposal
for round_num in range(1, self.max_rounds + 1):
print(f"\n{'='*50}")
print(f"第 {round_num} 轮讨论")
print(f"{'='*50}")
round_messages = []
for expert in self.EXPERTS:
agent = self._get_or_create_expert_agent(expert)
thread = self.project.agents.create_thread()
# 包含完整讨论历史,确保 Agent 知晓其他专家的意见
history_text = "\n".join([
f"[{m.agent_name}] 第{m.round}轮: {m.content[:200]}..."
for m in history
]) if history else "(第一轮,无历史讨论)"
self.project.agents.create_message(
thread_id=thread.id,
role="user",
content=f"""【提案】{context}
【历史讨论】
{history_text}
请从{expert["role"]}视角(聚焦:{expert["focus"]}):
1. 指出关键风险或优化点(≤3条)
2. 对上一轮其他专家观点的回应
3. 你的具体建议
输出精简,每条不超过 50 字。"""
)
run = self.project.agents.create_and_process_run(
thread_id=thread.id,
agent_id=agent.id,
)
messages = self.project.agents.list_messages(thread_id=thread.id)
response = messages.get_last_text_message_by_role("assistant")
content = response.text.value if response else ""
msg = AgentMessage(
agent_name=expert["role"],
content=content,
round=round_num
)
round_messages.append(msg)
history.append(msg)
print(f"\n🎯 [{expert['role']}]: {content[:300]}...")
# 检查是否达成共识(简化判断:第三轮直接汇总)
if round_num == self.max_rounds:
break
return history
def _get_or_create_expert_agent(self, expert_config: dict):
name = expert_config["name"]
if name not in self._agents:
self._agents[name] = self.project.agents.create_agent(
model=expert_config["model"],
name=name,
instructions=f"""你是{expert_config["role"]},
专业聚焦:{expert_config["focus"]}。
风格:简洁专业,直指核心,不重复其他专家的观点。""",
)
return self._agents[name]
模式五:Magentic(磁力/监督者编排)
最高级别:监督者 Agent 动态分配任务给最合适的工作 Agent
┌─────────────────────┐
│ 监督者 Agent │
│ (任务分解 + 分发) │
└─┬──────┬──────┬─────┘
▼ ▼ ▼
[专家A] [专家B] [专家C] ← 动态选择
▼ ▼ ▼
监督者聚合结果 → 用户
# magentic_workflow.py
# 监督者模式:Supervisor Agent 动态分配任务
from agent_client import get_project_client
import json, re
class SupervisorAgent:
"""监督者 Agent:自主分解任务并调度工作 Agent"""
SUPERVISOR_INSTRUCTIONS = """你是多 Agent 系统的监督者(Supervisor)。
**你的职责**:
1. 分析用户的复杂请求
2. 将其分解为可并行/顺序执行的子任务
3. 为每个子任务选择最合适的工作 Agent
4. 监控执行进度并整合最终结果
**可用工作 Agent 列表**:
- "researcher": 信息搜集、事实核查(工具:Bing 搜索)
- "data_analyst": 数据分析、统计计算(工具:Code Interpreter)
- "writer": 内容撰写、报告生成
- "code_reviewer": 代码审查、安全扫描(工具:文件搜索)
**任务分解格式**(严格输出 JSON):
{
"task_plan": [
{"step": 1, "agent": "researcher", "prompt": "具体指令", "depends_on": []},
{"step": 2, "agent": "data_analyst", "prompt": "具体指令", "depends_on": [1]},
{"step": 3, "agent": "writer", "prompt": "具体指令", "depends_on": [1, 2]}
],
"reasoning": "选择此分解方案的原因"
}"""
def __init__(self):
self.project = get_project_client()
self._supervisor = None
self._workers = {}
def execute(self, user_request: str) -> str:
"""执行监督者模式的完整工作流"""
supervisor = self._get_supervisor()
thread = self.project.agents.create_thread()
# Step 1:监督者分解任务
self.project.agents.create_message(
thread_id=thread.id,
role="user",
content=f"用户请求:{user_request}\n请制定任务执行计划(JSON 格式)。"
)
run = self.project.agents.create_and_process_run(
thread_id=thread.id, agent_id=supervisor.id
)
messages = self.project.agents.list_messages(thread_id=thread.id)
plan_response = messages.get_last_text_message_by_role("assistant")
plan_text = plan_response.text.value if plan_response else "{}"
# 解析任务计划
try:
json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
plan = json.loads(json_match.group()) if json_match else {}
except Exception:
plan = {}
task_plan = plan.get("task_plan", [])
print(f"📋 任务计划:{len(task_plan)} 个子任务")
print(f"💭 规划理由:{plan.get('reasoning', '未说明')}")
# Step 2:按计划执行子任务(考虑依赖关系)
results = {}
for task in sorted(task_plan, key=lambda x: x["step"]):
step = task["step"]
agent_name = task["agent"]
depends = task.get("depends_on", [])
# 检查依赖任务是否完成
dep_results = {d: results.get(d, "") for d in depends}
dep_context = "\n".join([
f"步骤{d}结果:{r[:500]}"
for d, r in dep_results.items()
])
full_prompt = f"{task['prompt']}\n{dep_context}" if dep_context else task["prompt"]
print(f"\n⚙️ 执行步骤 {step}:[{agent_name}]")
worker = self._get_worker(agent_name)
worker_thread = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=worker_thread.id,
role="user",
content=full_prompt
)
worker_run = self.project.agents.create_and_process_run(
thread_id=worker_thread.id,
agent_id=worker.id,
)
worker_messages = self.project.agents.list_messages(
thread_id=worker_thread.id
)
worker_response = worker_messages.get_last_text_message_by_role("assistant")
results[step] = worker_response.text.value if worker_response else ""
print(f"✅ 步骤 {step} 完成 ({len(results[step])} 字符)")
# Step 3:监督者汇总结果
all_results = "\n\n".join([
f"**步骤{s}结果**:\n{r}"
for s, r in results.items()
])
self.project.agents.create_message(
thread_id=thread.id,
role="user",
content=f"所有子任务已完成,请汇总以下结果并生成最终报告:\n{all_results}"
)
final_run = self.project.agents.create_and_process_run(
thread_id=thread.id, agent_id=supervisor.id
)
final_messages = self.project.agents.list_messages(thread_id=thread.id)
final_response = final_messages.get_last_text_message_by_role("assistant")
return final_response.text.value if final_response else ""
def _get_supervisor(self):
if self._supervisor is None:
self._supervisor = self.project.agents.create_agent(
model="gpt-5.2", # 监督者需要最强推理能力
name="supervisor-agent",
instructions=self.SUPERVISOR_INSTRUCTIONS,
)
return self._supervisor
def _get_worker(self, agent_name: str):
worker_configs = {
"researcher": ("gpt-5.2", "信息搜集专家,输出结构化 JSON"),
"data_analyst": ("gpt-4.1", "数据分析专家,擅长统计和可视化"),
"writer": ("gpt-4.1-mini", "商业写作专家"),
"code_reviewer": ("gpt-5.2", "代码安全审查专家"),
}
if agent_name not in self._workers:
model, instructions = worker_configs.get(
agent_name, ("gpt-4.1-mini", "通用助手")
)
self._workers[agent_name] = self.project.agents.create_agent(
model=model,
name=f"worker-{agent_name}",
instructions=instructions,
)
return self._workers[agent_name]
5.3 编排模式选择决策树
你的任务是什么?
│
┌──────────────┼──────────────┐
流水线任务 并行任务 对话路由
│ │ │
Sequential Concurrent Handoff
(顺序编排) (并发编排) (交接编排)
│
需要专家讨论?
├── 是 → Group Chat(群聊)
└── 需要动态分配 → Magentic(监督者)
性能对比(相同任务):
Sequential:100%(基准)
Concurrent:30-35%(提速 3x)
Magentic: 50-60%(质量最高,延迟中等)
六、A2A 协议与 MCP:跨系统 Agent 互联
6.1 A2A 协议:Agent 互联网的 HTTP
Agent-to-Agent(A2A)协议由 Google 与 Microsoft 联合推出,是 AI Agent 跨系统通信的开放标准:
┌──────────────────────────────────────────────────────────────┐
│ A2A 协议核心概念 │
├──────────────┬─────────────────────────────────────────────┤
│ Agent Card │ 每个 Agent 的"名片",描述能力、端点、认证方式│
│ Task │ Agent 间传递的工作单元(含上下文、状态) │
│ Message │ 结构化消息(文本/文件/结构化数据) │
│ Artifact │ Agent 执行后产生的输出物 │
│ Auth │ 支持 OAuth 2.0、API Key、mTLS │
└──────────────┴─────────────────────────────────────────────┘
典型 A2A 场景:
你公司的 HR Agent ←→ 供应商的 BGC 调查 Agent
你的财务 Agent ←→ 银行的风险评估 Agent
你的 DevOps Agent ←→ 云厂商的部署 Agent
# a2a_integration.py
# 在 Foundry Agent Service 中使用 A2A 工具
import os
from azure.ai.projects.models import FunctionTool, ToolSet
from agent_client import get_project_client
import httpx, json
class A2AAgentConnector:
"""A2A 协议:连接外部 Agent 的连接器"""
def __init__(self, remote_agent_card_url: str, auth_token: str):
"""
:param remote_agent_card_url: 远程 Agent 的 Card 地址
形如 https://partner.com/.well-known/agent.json
:param auth_token: OAuth Bearer Token
"""
self.card_url = remote_agent_card_url
self.auth_token = auth_token
self._agent_card = None
async def fetch_agent_card(self) -> dict:
"""获取远程 Agent 的能力描述卡"""
if self._agent_card is None:
async with httpx.AsyncClient() as client:
resp = await client.get(self.card_url)
self._agent_card = resp.json()
return self._agent_card
async def delegate_task(self, task_description: str, context: dict) -> str:
"""
将任务委托给远程 Agent。
:param task_description: 任务描述
:param context: 任务上下文(文件、数据、约束等)
:return: 远程 Agent 的执行结果
"""
card = await self.fetch_agent_card()
endpoint = card.get("endpoint", "")
# A2A Task 消息格式
task_payload = {
"id": f"task-{os.urandom(4).hex()}",
"message": {
"role": "user",
"parts": [
{"type": "text", "text": task_description}
]
},
"metadata": context,
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{endpoint}/tasks/send",
json=task_payload,
headers={
"Authorization": f"Bearer {self.auth_token}",
"Content-Type": "application/json",
"X-A2A-Version": "1.0",
}
)
result = resp.json()
# 提取 Agent 输出
artifacts = result.get("artifacts", [])
if artifacts:
return artifacts[0].get("parts", [{}])[0].get("text", "")
return result.get("status", {}).get("message", "任务完成,无文本输出")
def create_a2a_enabled_agent():
"""创建支持 A2A 调用的本地 Agent"""
project = get_project_client()
# A2A 连接器实例(实际使用时从配置读取)
bgc_connector = A2AAgentConnector(
remote_agent_card_url="https://bgc-partner.example.com/.well-known/agent.json",
auth_token=os.environ.get("BGC_AGENT_TOKEN", ""),
)
# 将 A2A 调用封装为本地 Function Tool
async def run_background_check(
candidate_id: str,
check_type: str = "standard"
) -> str:
"""
调用合作伙伴的背景调查 Agent(A2A 协议)。
:param candidate_id: 候选人 ID
:param check_type: 调查类型 standard/enhanced/basic
:return: 调查结果摘要
"""
result = await bgc_connector.delegate_task(
task_description=f"对候选人 {candidate_id} 执行 {check_type} 级别背景调查",
context={"requestor": "azure-hr-agent", "sla": "48h"}
)
return result
agent = project.agents.create_agent(
model="gpt-5.2",
name="hr-agent-with-a2a",
instructions="""你是 HR 招聘助手,负责候选人评估全流程。
对于背景调查需求,使用 run_background_check 工具连接合作伙伴 Agent。
背景调查结果 48 小时内返回。""",
tools=[FunctionTool(functions={run_background_check})],
)
return agent
6.2 MCP:工具生态的统一接口
Model Context Protocol(MCP)由 Anthropic 提出,Microsoft 已将其纳入 MAF 核心协议栈:
┌────────────────────────────────────────────────────────────┐
│ MCP 生态全景(2026) │
├──────────────────────────────────────────────────────────┤
│ MCP Servers(工具服务端) │
│ ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌───────────────┐ │
│ │ GitHub │ │ Slack │ │ Jira │ │ Azure Blob │ │
│ │ MCP Svr │ │ MCP Svr │ │ MCP Svr │ │ MCP Svr │ │
│ └─────────┘ └─────────┘ └──────────┘ └───────────────┘ │
│ │
│ MCP 协议层(标准化工具发现 + 调用) │
│ │
│ MCP Clients(工具使用端) │
│ MAF Agent · Claude · Cursor · VS Code AI Toolkit │
└────────────────────────────────────────────────────────────┘
# mcp_integration.py
# 在 Foundry Agent Service 中注册 MCP Server
from azure.ai.projects.models import McpTool
from agent_client import get_project_client
import os
def create_mcp_powered_agent():
"""创建连接 MCP 服务的 DevOps Agent"""
project = get_project_client()
# 注册 GitHub MCP Server(Foundry 托管)
github_mcp = McpTool(
server_url="https://github.mcp.azure.com",
server_label="github",
allowed_tools=[
"create_issue",
"list_pull_requests",
"get_repository",
"create_branch",
"merge_pull_request",
],
headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"},
)
# 注册 Jira MCP Server(自托管)
jira_mcp = McpTool(
server_url=os.environ.get("JIRA_MCP_SERVER_URL", ""),
server_label="jira",
allowed_tools=["create_ticket", "update_status", "assign_ticket"],
headers={
"Authorization": f"Bearer {os.environ.get('JIRA_TOKEN', '')}",
"X-Atlassian-Domain": os.environ.get("JIRA_DOMAIN", ""),
},
)
# 创建 DevOps Agent
agent = project.agents.create_agent(
model="gpt-5.2",
name="devops-agent-mcp",
instructions="""你是 DevOps 自动化 Agent,专注于 CI/CD 流程管理。
**工作流程**:
1. 代码推送 → 检查 PR 状态 → 运行测试
2. 测试通过 → 自动合并 PR
3. 部署失败 → 创建 Jira 工单 + GitHub Issue + 通知相关人员
4. 定期巡检 → 检查积压 PR(>3天未合并自动提醒)
使用 GitHub 和 Jira 工具完成上述自动化任务。""",
tools=[github_mcp, jira_mcp],
)
return agent
七、持久化记忆:让 Agent 记住你
7.1 Foundry 托管记忆 vs 自建记忆
┌───────────────────────────────────────────────────────────────┐
│ 记忆类型对比 │
├──────────────┬────────────────────────────────────────────────┤
│ 记忆类型 │ 说明 │
├──────────────┼────────────────────────────────────────────────┤
│ 线程内记忆 │ 当前会话的完整对话历史(线程 ID 绑定) │
│ │ → Foundry 自动维护,无需额外开发 │
├──────────────┼────────────────────────────────────────────────┤
│ 短期外部记忆 │ 跨会话但时效性强的信息(用户偏好、近期决策) │
│ │ → Redis 缓存(TTL 7-30 天) │
├──────────────┼────────────────────────────────────────────────┤
│ 长期记忆 │ 永久性的用户画像、历史行为、重要结论 │
│ │ → Azure AI Search 向量索引 + Mem0 托管服务 │
├──────────────┼────────────────────────────────────────────────┤
│ 程序性记忆 │ Agent 的技能、流程、规则更新 │
│ │ → Agent 配置版本化(GitOps) │
└──────────────┴────────────────────────────────────────────────┘
7.2 Mem0 长期记忆集成
# long_term_memory.py
# 使用 Mem0 + Foundry 实现跨会话长期记忆
import os
from mem0 import MemoryClient
from agent_client import get_project_client
class LongTermMemoryAgent:
"""带长期记忆的 Agent:跨会话记住用户偏好和历史"""
def __init__(self, user_id: str):
self.user_id = user_id
self.project = get_project_client()
# 初始化 Mem0 客户端
self.memory = MemoryClient(api_key=os.environ["MEM0_API_KEY"])
self._agent = None
def chat(self, user_message: str, session_id: str) -> str:
"""带记忆的对话"""
# Step 1:从长期记忆检索相关上下文
relevant_memories = self.memory.search(
query=user_message,
user_id=self.user_id,
limit=5,
)
memory_context = ""
if relevant_memories:
memory_items = [m["memory"] for m in relevant_memories]
memory_context = f"""【用户历史记忆】
{chr(10).join(f"- {m}" for m in memory_items)}
请基于以上历史背景个性化你的回答。"""
# Step 2:构建带记忆上下文的 Agent
agent = self._get_or_create_agent(
extra_instructions=memory_context
)
# Step 3:使用已有线程(如有)或创建新线程
thread_id = self._get_or_create_thread(session_id)
self.project.agents.create_message(
thread_id=thread_id,
role="user",
content=user_message,
)
run = self.project.agents.create_and_process_run(
thread_id=thread_id,
agent_id=agent.id,
)
# Step 4:获取响应
messages = self.project.agents.list_messages(thread_id=thread_id)
latest = messages.get_last_text_message_by_role("assistant")
response = latest.text.value if latest else ""
# Step 5:异步更新长期记忆(从对话中提取新记忆)
self._update_memory_async(user_message, response)
return response
def _update_memory_async(self, user_message: str, agent_response: str):
"""从对话中提取并存储新记忆"""
# Mem0 自动提取关键信息(用户偏好、事实、决策等)
self.memory.add(
messages=[
{"role": "user", "content": user_message},
{"role": "assistant", "content": agent_response},
],
user_id=self.user_id,
metadata={
"source": "foundry-agent",
"importance": "medium",
}
)
def get_user_profile(self) -> list:
"""获取用户完整记忆档案"""
return self.memory.get_all(user_id=self.user_id)
def _get_or_create_agent(self, extra_instructions: str = "") -> object:
base_instructions = """你是个人 AI 助手,了解用户的历史偏好和背景。
提供个性化、有记忆的对话体验。"""
full_instructions = base_instructions + "\n\n" + extra_instructions
if self._agent is None:
self._agent = self.project.agents.create_agent(
model=os.environ.get("AGENT_MODEL_HIGH", "gpt-5.2"),
name=f"personal-agent-{self.user_id[:8]}",
instructions=full_instructions,
)
else:
# 更新记忆上下文(每次对话刷新)
self._agent = self.project.agents.update_agent(
agent_id=self._agent.id,
instructions=full_instructions,
)
return self._agent
def _get_or_create_thread(self, session_id: str) -> str:
"""管理会话线程(简化版:生产中使用 DB 存储 session->thread 映射)"""
# 实际生产:用 Redis/数据库存储 session_id → thread_id 映射
if not hasattr(self, "_threads"):
self._threads = {}
if session_id not in self._threads:
thread = self.project.agents.create_thread()
self._threads[session_id] = thread.id
return self._threads[session_id]
八、生产级多 Agent 系统:企业知识工作流
8.1 完整示例:企业研究报告系统
整合本文所有技术点,构建一个完整的企业级多 Agent 研究系统:
┌─────────────────────────────────────────────────────────────────────────┐
│ 企业研究报告系统架构 │
│ │
│ 用户输入 │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 监督者 Agent(GPT‑5.2) │ │
│ │ • 解析用户意图 • 制定研究计划 • 协调子 Agent • 汇总输出 │ │
│ └────┬──────────────────┬─────────────────────┬────────────────────┘ │
│ │ 并发执行 │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌───────────┐ ┌────────────────┐ │
│ │ 搜索 Agt│ │ 数据分析 │ │ 知识库检索 │ │
│ │(Bing搜) │ │ Agt(代码) │ │ Agt(AI Search)│ │
│ └────┬────┘ └─────┬─────┘ └────────┬───────┘ │
│ │ │ │ │
│ └─────────┬────────┘──────────────────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 写作 Agent │ │
│ │ (GPT‑4.1) │ │
│ └──────┬───────┘ │
│ ▼ │
│ 最终研究报告(Markdown + 数据图表) │
└─────────────────────────────────────────────────────────────────────────┘
# enterprise_research_system.py
# 完整的企业研究报告 Agent 系统
import asyncio
import os
from agent_client import get_project_client
from long_term_memory import LongTermMemoryAgent
class EnterpriseResearchSystem:
"""
企业研究报告系统:
- 监督者模式(Magentic)编排
- 并发子 Agent 加速
- 知识库 + 实时搜索双引擎
- 长期记忆个性化
"""
def __init__(self, user_id: str):
self.project = get_project_client()
self.user_id = user_id
self.memory_agent = LongTermMemoryAgent(user_id=user_id)
self._agents = {}
async def generate_research_report(
self,
topic: str,
depth: str = "standard", # brief / standard / comprehensive
session_id: str = "default",
) -> dict:
"""生成完整研究报告"""
print(f"\n🚀 启动研究任务: {topic} (深度: {depth})")
# Phase 1:检索用户历史偏好(记忆)
memories = self.memory_agent.memory.search(
query=topic,
user_id=self.user_id,
limit=3,
)
user_context = "\n".join([m["memory"] for m in memories]) if memories else ""
# Phase 2:并发信息收集
tasks = [
self._web_research(topic), # Bing 实时搜索
self._knowledge_base_research(topic), # 内部知识库
self._data_analysis(topic), # 数据分析
]
web_result, kb_result, data_result = await asyncio.gather(*tasks)
print("✅ 并发信息收集完成")
# Phase 3:整合撰写
writer = self._get_or_create_agent(
"writer",
model="gpt-4.1",
instructions="""你是高级商业分析写手。
将多方研究结果整合为专业报告,要求:
- 执行摘要(3-5 要点)
- 详细分析(分节展开)
- 数据支撑(引用具体数字)
- 风险评估
- 行动建议(优先级排序)
格式:Markdown,可读性强。""",
)
writer_thread = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=writer_thread.id,
role="user",
content=f"""请生成关于"{topic}"的{depth}级别研究报告。
用户背景:{user_context}
研究材料:
### 最新信息(网络搜索)
{web_result}
### 内部知识库
{kb_result}
### 数据分析
{data_result}"""
)
run = self.project.agents.create_and_process_run(
thread_id=writer_thread.id,
agent_id=writer.id,
)
messages = self.project.agents.list_messages(thread_id=writer_thread.id)
final_report = messages.get_last_text_message_by_role("assistant")
report_text = final_report.text.value if final_report else ""
# Phase 4:更新用户记忆
self.memory_agent.memory.add(
messages=[
{"role": "user", "content": f"研究主题:{topic}"},
{"role": "assistant", "content": f"已生成报告,主要结论:{report_text[:500]}"},
],
user_id=self.user_id,
)
return {
"topic": topic,
"user_id": self.user_id,
"report": report_text,
"sources": {
"web": web_result[:500],
"knowledge_base": kb_result[:500],
"data_analysis": data_result[:500],
},
}
async def _web_research(self, topic: str) -> str:
"""子任务:实时网络搜索"""
from azure.ai.projects.models import BingGroundingTool
bing_conn = self.project.connections.get(
connection_name=os.environ.get("BING_CONNECTION_NAME", "bing-grounding")
)
agent = self._get_or_create_agent(
"web_researcher",
model="gpt-5.2",
instructions="搜索最新信息,提取关键事实和数据。输出结构化 JSON。",
extra_tools=[BingGroundingTool(connection_id=bing_conn.id)],
)
thread = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=thread.id, role="user",
content=f"搜索关于 '{topic}' 的最新信息和数据(关注 2025-2026)"
)
self.project.agents.create_and_process_run(
thread_id=thread.id, agent_id=agent.id
)
messages = self.project.agents.list_messages(thread_id=thread.id)
resp = messages.get_last_text_message_by_role("assistant")
return resp.text.value if resp else ""
async def _knowledge_base_research(self, topic: str) -> str:
"""子任务:内部知识库检索(Azure AI Search)"""
from azure.ai.projects.models import AzureAISearchTool
search_conn = self.project.connections.get(
connection_name=os.environ.get("AI_SEARCH_CONNECTION_NAME", "ai-search")
)
agent = self._get_or_create_agent(
"kb_researcher",
model="gpt-4.1",
instructions="从公司内部知识库检索相关政策、流程和案例。",
extra_tools=[AzureAISearchTool(
connection_id=search_conn.id,
index_name=os.environ.get("SEARCH_INDEX_NAME", "enterprise-kb"),
)],
)
thread = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=thread.id, role="user",
content=f"从内部知识库查找与 '{topic}' 相关的政策、案例和最佳实践"
)
self.project.agents.create_and_process_run(
thread_id=thread.id, agent_id=agent.id
)
messages = self.project.agents.list_messages(thread_id=thread.id)
resp = messages.get_last_text_message_by_role("assistant")
return resp.text.value if resp else ""
async def _data_analysis(self, topic: str) -> str:
"""子任务:数据分析(Code Interpreter)"""
from azure.ai.projects.models import CodeInterpreterTool
agent = self._get_or_create_agent(
"data_analyst",
model="gpt-4.1",
instructions="执行 Python 数据分析,生成统计摘要和洞察。",
extra_tools=[CodeInterpreterTool()],
)
thread = self.project.agents.create_thread()
self.project.agents.create_message(
thread_id=thread.id, role="user",
content=f"分析与 '{topic}' 相关的数据趋势,生成 Python 统计分析和可视化"
)
self.project.agents.create_and_process_run(
thread_id=thread.id, agent_id=agent.id
)
messages = self.project.agents.list_messages(thread_id=thread.id)
resp = messages.get_last_text_message_by_role("assistant")
return resp.text.value if resp else ""
def _get_or_create_agent(
self,
name: str,
model: str,
instructions: str,
extra_tools: list = None,
):
if name not in self._agents:
kwargs = dict(
model=model,
name=f"research-{name}",
instructions=instructions,
)
if extra_tools:
kwargs["tools"] = [t.definitions[0] for t in extra_tools]
kwargs["tool_resources"] = {}
for t in extra_tools:
if hasattr(t, "resources") and t.resources:
kwargs["tool_resources"].update(t.resources)
self._agents[name] = self.project.agents.create_agent(**kwargs)
return self._agents[name]
# ===== 使用示例 =====
async def main():
system = EnterpriseResearchSystem(user_id="analyst-001")
result = await system.generate_research_report(
topic="生成式 AI 在制造业中的应用现状与趋势(2026)",
depth="comprehensive",
session_id="session-20260301",
)
print("\n" + "="*60)
print("📊 最终研究报告")
print("="*60)
print(result["report"])
if __name__ == "__main__":
asyncio.run(main())
九、可观测性与评估:监控 Agent 行为
9.1 Agent 追踪:理解 Agent 的每一步决策
生产环境中,Agent 的每次工具调用、每个推理步骤都必须可追溯:
# agent_observability.py
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import time, json
tracer = trace.get_tracer("enterprise-agents")
class TracedAgentRunner:
"""带完整 OpenTelemetry 追踪的 Agent 执行器"""
def __init__(self, project_client):
self.project = project_client
def run_with_tracing(
self,
agent_id: str,
thread_id: str,
user_message: str,
user_id: str = "anonymous",
) -> str:
"""执行 Agent 并记录完整追踪信息"""
with tracer.start_as_current_span("agent.run") as span:
span.set_attributes({
"agent.id": agent_id,
"thread.id": thread_id,
"user.id": user_id,
"input.length": len(user_message),
})
start_time = time.time()
try:
# 添加用户消息
self.project.agents.create_message(
thread_id=thread_id,
role="user",
content=user_message,
)
# 执行 Agent Run
run = self.project.agents.create_and_process_run(
thread_id=thread_id,
agent_id=agent_id,
)
elapsed = time.time() - start_time
span.set_attributes({
"run.id": run.id,
"run.status": run.status,
"run.elapsed_ms": int(elapsed * 1000),
})
# 追踪工具调用
run_steps = self.project.agents.list_run_steps(
thread_id=thread_id,
run_id=run.id,
)
tool_calls_count = 0
for step in run_steps.data:
if step.type == "tool_calls":
tool_calls_count += 1
with tracer.start_as_current_span("agent.tool_call") as tool_span:
details = step.step_details
if hasattr(details, "tool_calls"):
for tc in (details.tool_calls or []):
tool_span.set_attributes({
"tool.type": tc.type,
"tool.name": getattr(
getattr(tc, "function", None), "name", "unknown"
),
"step.status": step.status,
})
span.set_attribute("tool_calls.count", tool_calls_count)
if run.status == "failed":
span.set_status(Status(StatusCode.ERROR, run.last_error.message))
raise RuntimeError(f"Agent run failed: {run.last_error}")
# 获取响应
messages = self.project.agents.list_messages(thread_id=thread_id)
latest = messages.get_last_text_message_by_role("assistant")
response = latest.text.value if latest else ""
span.set_attribute("output.length", len(response))
span.set_status(Status(StatusCode.OK))
return response
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
9.2 Agent 质量评估指标
┌──────────────────────────────────────────────────────────────────┐
│ Agent 评估指标体系 │
├─────────────────┬────────────────────────────────────────────────┤
│ 指标类别 │ 具体指标 │
├─────────────────┼────────────────────────────────────────────────┤
│ 效率指标 │ • 任务完成率(目标 >95%) │
│ │ • 平均响应时间(P50/P95/P99) │
│ │ • 工具调用次数(越少越好) │
│ │ • Token 消耗量(按任务类型对比) │
├─────────────────┼────────────────────────────────────────────────┤
│ 质量指标 │ • 任务正确率(人工抽样评估) │
│ │ • 幻觉率(事实准确性) │
│ │ • 用户满意度(CSAT/NPS) │
│ │ • 引用准确率(RAG 场景) │
├─────────────────┼────────────────────────────────────────────────┤
│ 可靠性指标 │ • 错误率(4xx/5xx) │
│ │ • 重试成功率 │
│ │ • 熔断触发频率 │
│ │ • SLA 达成率 │
├─────────────────┼────────────────────────────────────────────────┤
│ 成本指标 │ • 每次任务 Token 成本 │
│ │ • 模型利用率(GPT‑5.2 vs GPT‑4.1-mini 比例) │
│ │ • 缓存命中率(目标 >40%) │
└─────────────────┴────────────────────────────────────────────────┘
// Azure Monitor KQL:Agent 性能监控仪表板
// 按 Agent 名称统计任务完成率
customEvents
| where timestamp > ago(7d)
| where name == "agent.run"
| extend agent_id = tostring(customDimensions["agent.id"])
| extend status = tostring(customDimensions["run.status"])
| extend elapsed_ms = toint(customDimensions["run.elapsed_ms"])
| summarize
total = count(),
completed = countif(status == "completed"),
failed = countif(status == "failed"),
avg_latency_ms = avg(elapsed_ms),
p95_latency_ms = percentile(elapsed_ms, 95)
by agent_id, bin(timestamp, 1h)
| extend completion_rate = round(100.0 * completed / total, 1)
| project timestamp, agent_id, total, completion_rate, avg_latency_ms, p95_latency_ms
| order by timestamp desc
// 工具调用分布(发现滥用或效率问题)
customEvents
| where timestamp > ago(1d)
| where name == "agent.tool_call"
| extend tool_name = tostring(customDimensions["tool.name"])
| summarize call_count = count() by tool_name
| order by call_count desc
| render piechart
十、安全、合规与人机协同
10.1 Agent 安全威胁模型
┌──────────────────────────────────────────────────────────────┐
│ Agent 安全威胁矩阵 │
├────────────────────┬────────────────┬────────────────────────┤
│ 威胁类型 │ 风险等级 │ 缓解措施 │
├────────────────────┼────────────────┼────────────────────────┤
│ 提示词注入 │ 🔴 高 │ 输入验证 + 内容安全 │
│ (Prompt Injection)│ │ 沙盒执行隔离 │
├────────────────────┼────────────────┼────────────────────────┤
│ 工具权限滥用 │ 🔴 高 │ 最小权限原则 │
│ (Excessive Agency)│ │ 工具调用审批 │
├────────────────────┼────────────────┼────────────────────────┤
│ 数据外泄 │ 🟠 中高 │ 输出过滤 + PII 脱敏 │
│ (Data Exfiltration)│ │ 网络出口管控 │
├────────────────────┼────────────────┼────────────────────────┤
│ 无限循环 │ 🟡 中 │ 最大步数限制 │
│ (Runaway Loops) │ │ 超时熔断 │
├────────────────────┼────────────────┼────────────────────────┤
│ 幻觉高危操作 │ 🟠 中高 │ 高风险操作人工审批 │
│ (Hallucinated Actions)│ │ 操作前二次确认 │
└────────────────────┴────────────────┴────────────────────────┘
10.2 Human-in-the-Loop(人机协同)
对于高风险操作,必须实现人工审批节点:
# human_in_the_loop.py
# 高风险操作的人工审批模式
import os
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
from agent_client import get_project_client
class RiskLevel(Enum):
LOW = "low" # 自动执行,仅记录日志
MEDIUM = "medium" # 自动执行,发送通知
HIGH = "high" # 需要人工审批
CRITICAL = "critical" # 需要双人审批
@dataclass
class PendingApproval:
approval_id: str
agent_id: str
action: str
parameters: dict
risk_level: RiskLevel
requested_by: str
description: str
status: str = "pending" # pending / approved / rejected
class ApprovalGate:
"""人工审批关卡:高风险操作拦截与审批"""
# 高风险操作定义(实际生产中配置化管理)
HIGH_RISK_ACTIONS = {
"delete_data": RiskLevel.CRITICAL,
"send_bulk_email": RiskLevel.HIGH,
"deploy_to_production": RiskLevel.HIGH,
"create_incident_ticket": RiskLevel.MEDIUM,
"update_user_permissions": RiskLevel.HIGH,
"financial_transaction": RiskLevel.CRITICAL,
}
def __init__(self, notify_webhook: str = None):
self.pending: dict[str, PendingApproval] = {}
self.notify_webhook = notify_webhook
def wrap_with_approval(self, func: Callable, action_name: str) -> Callable:
"""将函数包装为需要审批的版本"""
risk = self.HIGH_RISK_ACTIONS.get(action_name, RiskLevel.LOW)
def wrapped(**kwargs) -> str:
if risk in (RiskLevel.HIGH, RiskLevel.CRITICAL):
return self._request_approval(action_name, kwargs, risk)
elif risk == RiskLevel.MEDIUM:
self._send_notification(action_name, kwargs)
return func(**kwargs)
else:
return func(**kwargs)
wrapped.__name__ = func.__name__
wrapped.__doc__ = func.__doc__
return wrapped
def _request_approval(
self, action: str, params: dict, risk: RiskLevel
) -> str:
"""创建审批请求,暂停 Agent 执行等待人工决策"""
import uuid
approval_id = str(uuid.uuid4())[:8]
approval = PendingApproval(
approval_id=approval_id,
agent_id="current-agent",
action=action,
parameters=params,
risk_level=risk,
requested_by="agent",
description=f"Agent 请求执行高风险操作:{action},参数:{params}",
)
self.pending[approval_id] = approval
# 发送审批通知(Teams/Email/PagerDuty)
self._send_approval_request(approval)
return (
f"⏸️ 操作已暂停,等待人工审批。\n"
f"审批 ID: {approval_id}\n"
f"风险级别: {risk.value}\n"
f"请审批员通过审批系统确认或拒绝此操作。"
)
def process_approval(
self, approval_id: str, approved: bool, approver: str
) -> str:
"""处理审批决策(由人工审批员调用)"""
approval = self.pending.get(approval_id)
if not approval:
return f"审批 ID {approval_id} 不存在"
approval.status = "approved" if approved else "rejected"
action = "✅ 已批准" if approved else "❌ 已拒绝"
return f"{action} | 操作: {approval.action} | 审批人: {approver}"
def _send_approval_request(self, approval: PendingApproval):
"""发送审批通知到 Teams"""
if self.notify_webhook:
import httpx
card = {
"type": "AdaptiveCard",
"body": [
{"type": "TextBlock", "text": f"🚨 {approval.risk_level.value.upper()} 风险操作审批请求"},
{"type": "TextBlock", "text": f"操作:{approval.action}"},
{"type": "TextBlock", "text": f"审批 ID:{approval.approval_id}"},
],
"actions": [
{"type": "Action.Submit", "title": "✅ 批准", "data": {"action": "approve"}},
{"type": "Action.Submit", "title": "❌ 拒绝", "data": {"action": "reject"}},
],
}
try:
httpx.post(self.notify_webhook, json=card, timeout=5)
except Exception as e:
print(f"通知发送失败: {e}")
def _send_notification(self, action: str, params: dict):
"""中等风险:发送通知但不阻塞执行"""
print(f"📢 通知:Agent 正在执行 {action}(中等风险)")
10.3 最小权限 RBAC 配置
#!/bin/bash
# agent_rbac_setup.sh - Agent 最小权限 RBAC 配置
SUBSCRIPTION_ID=$(az account show --query id -o tsv)
RG_NAME="rg-ai-agents-prod"
FOUNDRY_NAME="my-foundry-resource"
AGENT_IDENTITY="agent-managed-identity"
# 1. 创建用户分配托管身份
az identity create --name $AGENT_IDENTITY --resource-group $RG_NAME
IDENTITY_ID=$(az identity show --name $AGENT_IDENTITY --resource-group $RG_NAME --query principalId -o tsv)
# 2. Azure OpenAI 权限(仅允许推理,禁止管理)
az role assignment create --assignee $IDENTITY_ID --role "Cognitive Services OpenAI User" --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME/providers/Microsoft.CognitiveServices/accounts/$FOUNDRY_NAME"
# 3. AI Search 权限(仅读取索引,禁止写入)
az role assignment create --assignee $IDENTITY_ID --role "Search Index Data Reader" --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME/providers/Microsoft.Search/searchServices/my-search"
# 4. Blob Storage 权限(仅读取指定容器)
az role assignment create --assignee $IDENTITY_ID --role "Storage Blob Data Reader" --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME/providers/Microsoft.Storage/storageAccounts/myagentstorage/blobServices/default/containers/agent-knowledge-base"
# 5. Application Insights(仅写入遥测)
az role assignment create --assignee $IDENTITY_ID --role "Monitoring Metrics Publisher" --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME"
echo "✅ Agent RBAC 配置完成"
echo "托管身份 ID: $IDENTITY_ID"
十一、总结与博客 #5 预告
核心收获速查卡
┌─────────────────────────────────────────────────────────────────┐
│ 博客 #4 核心决策速查 │
├───────────────────────────────────────────────────────────────┤
│ 场景 推荐方案 │
├───────────────────────────────────────────────────────────────┤
│ 简单 Q&A / 文档搜索 → 直接 RAG(博客 #3),无需 Agent │
│ 工具调用 + 数据分析 → 单体 Agent(章节 4) │
│ 多步骤文档处理流水线 → Sequential 编排(章节 5.2) │
│ 独立子任务并行执行 → Concurrent 编排(章节 5.2) │
│ 智能客服分流 → Handoff 编排(章节 5.2) │
│ 多专家协同决策 → Group Chat 编排(章节 5.2) │
│ 复杂目标自主规划 → Magentic 监督者(章节 5.2) │
│ 跨组织 Agent 协作 → A2A 协议(章节 6) │
│ 第三方工具集成 → MCP 服务(章节 6) │
│ 个性化跨会话记忆 → Mem0 + Azure AI Search(章节 7) │
│ 高风险操作安全保障 → Human-in-the-Loop(章节 10) │
└─────────────────────────────────────────────────────────────────┘
本文完整代码索引:
| 文件 | 功能 | 核心 API |
|---|---|---|
agent_client.py |
统一客户端工厂 | AIProjectClient |
single_agent_demo.py |
单体 Agent + 工具集成 | create_agent / run_stream |
code_interpreter_advanced.py |
代码解释器 + 文件上传 | CodeInterpreterTool |
sequential_workflow.py |
顺序编排三阶段流水线 | 多线程顺序执行 |
concurrent_workflow.py |
并发编排尽职调查 | asyncio.gather |
handoff_workflow.py |
交接编排客服路由 | 意图分类 + 路由 |
group_chat_workflow.py |
群聊编排专家圆桌 | 多轮对话历史 |
magentic_workflow.py |
监督者编排动态分配 | 任务计划 JSON |
a2a_integration.py |
A2A 协议跨系统调用 | A2A Task API |
mcp_integration.py |
MCP 工具服务集成 | McpTool |
long_term_memory.py |
Mem0 长期记忆 | MemoryClient |
enterprise_research_system.py |
完整企业研究系统 | 综合所有模式 |
agent_observability.py |
OTel 追踪 | OpenTelemetry |
human_in_the_loop.py |
人工审批关卡 | ApprovalGate |
agent_rbac_setup.sh |
最小权限 RBAC | Azure CLI |
系列文章进度
| 篇章 | 标题 | 状态 |
|---|---|---|
| 博客 #1 | Azure AI Foundry 平台全景与快速上手 | ✅ 完成 |
| 博客 #2 | Azure OpenAI Service 深度实践 — GPT‑5.2 企业级部署 | ✅ 完成 |
| 博客 #3 | Azure AI Search + RAG 架构实战 | ✅ 完成 |
| 博客 #4 | AI Agents 开发实战:单体→多 Agent 协作 | ✅ 本文 |
| 博客 #5 | Azure AI Foundry 可观测性:全栈监控与评估 | 🔜 下一篇 |
| 博客 #6 | 企业 AI 治理:安全、合规与 MLOps | 🔜 规划中 |
博客 #5 预告:Azure AI Foundry 全栈可观测性
📋 博客 #5 内容预告
一、可观测性三支柱:Metrics / Traces / Logs
• Azure Monitor + Application Insights 深度集成
• OpenTelemetry 自动埋点 vs 手动埋点
二、LLM 专属监控:Token 成本 & 延迟分析
• Prompt Token vs Completion Token 成本归因
• P50/P95/P99 延迟热力图
三、Agent 行为追踪
• 工具调用链路可视化
• 多 Agent 协作追踪(跨服务 Trace)
四、RAG 检索质量监控
• 检索命中率、相关性分数趋势
• Groundedness 评分自动化
五、告警与 SLO
• 自定义 Azure Monitor 告警规则
• SLO 仪表板(Grafana 集成)
六、成本归因与优化闭环
• 按团队/项目/环境分摊 AI 成本
• 自动化降本建议
参考资料
- Microsoft Agent Framework 介绍:https://devblogs.microsoft.com/foundry/introducing-microsoft-agent-framework-the-open-source-engine-for-agentic-ai-apps/
- MAF 官方文档:https://learn.microsoft.com/en-us/agent-framework/overview/
- Azure AI Agent Orchestration Patterns:https://learn.microsoft.com/en-us/azure/architecture/ai-ml/guide/ai-agent-design-patterns
- Foundry Agent Service Ignite 2025 发布:https://techcommunity.microsoft.com/blog/azure-ai-foundry-blog/foundry-agent-service-at-ignite-2025-simple-to-build-powerful-to-deploy-trusted-/4469788
- azure-ai-projects 2.0.0b3 PyPI:https://pypi.org/project/azure-ai-projects/2.0.0b3/
- A2A 协议发布(Google + Microsoft):https://www.microsoft.com/en-us/microsoft-cloud/blog/2025/05/07/empowering-multi-agent-apps-with-the-open-agent2agent-a2a-protocol/
- Agent Factory: MCP & A2A 开放标准:https://azure.microsoft.com/en-us/blog/agent-factory-connecting-agents-apps-and-data-with-new-open-standards-like-mcp-and-a2a/
- Foundry Agent 持久化记忆:https://devblogs.microsoft.com/foundry/introducing-memory-in-foundry-agent-service/
- Computer Use 工具(预览):https://learn.microsoft.com/en-us/azure/foundry/agents/how-to/tools/computer-use
- Semantic Kernel 多 Agent 编排:https://devblogs.microsoft.com/semantic-kernel/semantic-kernel-multi-agent-orchestration/
- Mem0 长期记忆 + Azure AI Foundry:https://medium.com/microsoftazure/implement-long-term-memory-in-your-ai-agents-with-mem0-azure-ai-foundry-and-ai-search-56efd8683c03
- Agent 编排模式演示仓库:https://github.com/leestott/agent_patterns_foundry_demo
- GPT-RAG 参考架构:https://github.com/Azure/GPT-RAG
- 企业 AI 安全构建指南:https://learn.microsoft.com/en-us/azure/cloud-adoption-framework/ai-agents/build-secure-process
- Azure AI Agents Python SDK:https://pypi.org/project/azure-ai-agents/
Azure AI 全栈实践系列 | 博客 #4 完
下一篇:博客 #5 — Azure AI Foundry 可观测性:从 Token 追踪到 SLO 告警
更多推荐


所有评论(0)