系列:Azure AI 全栈实践 第 4 篇(共 6 篇)
发布:2026‑03
阅读时长:约 35 分钟
标签Azure AI Foundry · Microsoft Agent Framework · Multi-Agent · A2A · MCP · AutoGen · Semantic Kernel · 企业AI


摘要

从一个单一 LLM 调用到能够自主规划、使用工具、协作完成复杂任务的 AI Agent 系统——这条路远比想象中陡峭。本文将带你系统掌握:Microsoft Agent Framework(MAF) 的核心架构、Foundry Agent Service 的生产级能力、五大多 Agent 编排模式、Agent‑to‑Agent(A2A)协议实战,以及从单体 Agent 平滑演进到多 Agent 协作系统的工程路径。所有代码示例均基于 azure‑ai‑projects==2.0.0b3,可直接在 Azure AI Foundry 中运行。


目录

  1. 为什么需要 AI Agents?演进路径全景
  2. Microsoft Agent Framework:统一架构
  3. Foundry Agent Service:生产级基础设施
  4. 单体 Agent 实战:工具调用全链路
  5. 五大编排模式:多 Agent 协作蓝图
  6. A2A 协议与 MCP:跨系统 Agent 互联
  7. 持久化记忆:让 Agent 记住你
  8. 生产级多 Agent 系统:企业知识工作流
  9. 可观测性与评估:监控 Agent 行为
  10. 安全、合规与人机协同
  11. 总结与博客 #5 预告
  12. 参考资料

一、为什么需要 AI Agents?演进路径全景

1.1 从 LLM 调用到 Agentic AI

大多数企业的 AI 应用演进经历三个阶段:

阶段一:单次 LLM 调用
  用户输入 → [GPT‑5.2] → 单次响应
  局限:无工具、无记忆、无规划

阶段二:RAG 增强(博客 #3)
  用户输入 → 向量检索 → [GPT‑5.2 + 上下文] → 引用响应
  局限:被动检索,无法主动执行

阶段三:Agentic AI(本篇)
  用户目标 → [Agent 规划] → 工具调用循环 → 多 Agent 协作 → 目标完成
  能力:自主规划、工具执行、跨域协作、持久记忆

什么时候需要 Agent? 当你的任务满足以下任一条件:

  • 需要 多步骤规划(无法一次 LLM 调用完成)
  • 需要 调用外部工具(API、数据库、代码执行)
  • 需要 与其他系统或 Agent 协作
  • 需要 跨会话持久化状态(长期记忆)
  • 需要 自适应决策(根据中间结果调整策略)

1.2 企业 Agentic AI 成熟度模型

┌─────────────────────────────────────────────────────────────────┐
│            企业 Agent 成熟度矩阵(2026 Q1 现状)                  │
├────────────┬──────────────────────┬────────────────────────────┤
│  级别       │  能力                 │  代表场景                   │
├────────────┼──────────────────────┼────────────────────────────┤
│ L1 助手级   │ 单次对话、有限工具     │ 客服问答、文档摘要           │
│ L2 工具级   │ 函数调用、代码解释器   │ 数据分析、报告生成           │
│ L3 自主级   │ 多步规划、自主执行     │ 研究助手、DevOps 自动化     │
│ L4 协作级   │ 多 Agent 编排、A2A    │ 企业工作流、跨部门协同       │
│ L5 生态级   │ 跨组织 Agent 网络     │ 供应链优化、行业 Agent 市场  │
└────────────┴──────────────────────┴────────────────────────────┘
  当前大多数企业处于 L1-L2,本文带你跨越到 L3-L4

1.3 技术栈全景:MAF 的位置

┌──────────────────────────────────────────────────────────────┐
│                     应用层(你的 Agent App)                   │
├──────────────────────────────────────────────────────────────┤
│           Microsoft Agent Framework (MAF)                     │
│  ┌─────────────────┐  ┌──────────────────┐  ┌─────────────┐  │
│  │  Agent Runtime   │  │ Orchestration    │  │  Memory     │  │
│  │  (执行引擎)      │  │ (编排引擎)       │  │  (记忆层)   │  │
│  └─────────────────┘  └──────────────────┘  └─────────────┘  │
├──────────────────────────────────────────────────────────────┤
│              Azure AI Foundry Agent Service                   │
│  工具集成 · 代码解释器 · 文件搜索 · 计算机使用 · 持久线程     │
├───────────────────┬──────────────────────────────────────────┤
│  协议层           │  MCP (Model Context Protocol)             │
│                   │  A2A (Agent-to-Agent Protocol)            │
├───────────────────┼──────────────────────────────────────────┤
│  模型层           │  GPT‑5.2 · o3 · GPT‑4.1‑mini · ...       │
├───────────────────┼──────────────────────────────────────────┤
│  知识层           │  Azure AI Search · Foundry IQ            │
│  (博客 #3 内容)   │  向量索引 · 混合检索 · Cohere Rerank 4    │
└───────────────────┴──────────────────────────────────────────┘

二、Microsoft Agent Framework:统一架构

2.1 MAF 的诞生背景

在 MAF 之前,开发者面临痛苦的选择困境:

框架 优势 劣势
Semantic Kernel 生产稳定、企业级可观测性、丰富连接器 编排能力相对保守,多 Agent 模式有限
AutoGen 实验性多 Agent、群聊编排领先 生产就绪度低,缺乏持久化、合规能力
LangChain/LangGraph 社区庞大、生态丰富 企业级支持弱,与 Azure 原生集成不深

Microsoft 的解法:将 Semantic Kernel 的稳定性与 AutoGen 的编排创新合并,创造 Microsoft Agent Framework(MAF)

Semantic Kernel ──┐
                  ├──► Microsoft Agent Framework (MAF)
AutoGen      ─────┘
              合并后:两个框架的最佳能力 + 企业级生产就绪

2.2 MAF 四大支柱

支柱一:开放标准与互操作性

┌─────────────────────────────────────────────────────┐
│              开放协议栈                              │
│                                                     │
│  MCP(Model Context Protocol)                      │
│  ● 工具发现与调用的标准协议                          │
│  ● 类比:HTTP 之于 Web,MCP 之于 AI 工具            │
│                                                     │
│  A2A(Agent-to-Agent Protocol)                     │
│  ● Agent 间通信的标准协议(Google + Microsoft 联合)│
│  ● 支持跨组织、跨云 Agent 协作                      │
│                                                     │
│  OpenAPI First                                      │
│  ● 任何 REST API → 自动转为 Agent 工具              │
│  ● 无需手写 Function Schema                         │
└─────────────────────────────────────────────────────┘

支柱二:五种编排模式(详见第五章)

  • Sequential(顺序)
  • Concurrent(并发)
  • Handoff(交接)
  • Group Chat(群聊)
  • Magentic(磁力/分层监督)

支柱三:可扩展的记忆存储

# MAF 支持的记忆后端(可插拔)
MEMORY_BACKENDS = {
    "redis":        "Azure Cache for Redis",
    "qdrant":       "Qdrant Vector DB",
    "pinecone":     "Pinecone",
    "elasticsearch":"Elasticsearch",
    "postgres":     "PostgreSQL + pgvector",
    "ai_search":    "Azure AI Search(推荐生产环境)",
    "mem0":         "Mem0 托管记忆服务",
}

支柱四:生产就绪

✅ OpenTelemetry 分布式追踪(开箱即用)
✅ Azure AI Content Safety 内容过滤
✅ Entra ID 身份验证 + RBAC
✅ 持久化工作流(Agent 崩溃可恢复)
✅ Human-in-the-loop 审批节点
✅ GitHub Actions / Azure DevOps CI/CD 集成

2.3 与旧框架的迁移路径

# === Semantic Kernel 旧写法 ===
# kernel = Kernel()
# kernel.add_plugin(MyPlugin(), plugin_name="my_plugin")
# result = await kernel.invoke("my_plugin", "my_function", param=value)

# === MAF 新写法 ===
from azure.ai.agents import Agent, AgentTool

@AgentTool(description="执行我的业务逻辑")
async def my_function(param: str) -> str:
    return f"处理结果: {param}"

agent = Agent(
    name="my-agent",
    model="gpt-5.2",
    tools=[my_function],
)

# === AutoGen 旧写法 ===
# assistant = AssistantAgent("assistant", llm_config=llm_config)
# user_proxy = UserProxyAgent("user_proxy", human_input_mode="NEVER")
# user_proxy.initiate_chat(assistant, message="...")

# === MAF 新写法(AutoGen 用户迁移)===
from azure.ai.agents import ChatAgent, AgentWorkflow

analyst = ChatAgent(name="analyst", model="gpt-5.2", tools=[...])
writer  = ChatAgent(name="writer",  model="gpt-4.1-mini", tools=[...])

workflow = AgentWorkflow()
workflow.add_handoff(from_agent=analyst, to_agent=writer,
                     condition="分析完成后交给 writer 撰写报告")

三、Foundry Agent Service:生产级基础设施

3.1 核心能力矩阵

┌─────────────────────────────────────────────────────────────────────┐
│                  Foundry Agent Service 工具箱                        │
├──────────────────────┬────────────────────────────────────────────┤
│  工具类型             │  说明                                        │
├──────────────────────┼────────────────────────────────────────────┤
│  Function Calling    │  自定义 Python/REST 函数,博客 #2 已详述     │
│  Code Interpreter    │  沙盒 Python 执行环境,支持文件上传下载       │
│  File Search         │  向量化文件搜索,支持 PDF/Word/PPTX          │
│  Web Search          │  Bing 实时网页搜索                           │
│  Azure AI Search     │  企业知识库检索(博客 #3 内容)               │
│  Computer Use ⚡NEW  │  截图理解+UI 操作,实现 RPA 级自动化          │
│  MCP Servers         │  标准化工具发现,连接第三方服务               │
│  Agent-to-Agent (A2A)│  跨 Agent 调用,支持鉴权                     │
└──────────────────────┴────────────────────────────────────────────┘
  ⚡NEW = 2026 Q1 新增预览功能

3.2 Foundry Agent Service vs 自建 Agent

┌──────────────────────────────────────────────────────────────────┐
│  Foundry Agent Service              vs   自建 Agent(LangChain)  │
├─────────────────────────────────────────────────────────────────┤
│  线程管理:托管(自动持久化)          手动管理 DB               │
│  工具集成:内置 10+ 工具              手写 function wrapper       │
│  安全性:Entra ID 原生支持            自行实现 auth               │
│  可观测性:内置 OTel traces           手动集成                   │
│  代码执行:隔离沙盒                   自建沙盒(复杂)            │
│  SLA:99.9% 微软保证                  自行负责                   │
│  迁移成本:零(SDK 标准接口)         框架绑定                   │
│  推荐场景:企业生产环境               快速原型 / 特殊定制         │
└──────────────────────────────────────────────────────────────────┘

3.3 环境配置

# .env(开发环境,勿提交 Git)
# Azure AI Foundry 项目端点
AZURE_AI_PROJECT_ENDPOINT=https://your-project.services.ai.azure.com/api/projects/your-project

# Agent 部署模型(推荐按复杂度分级)
AGENT_MODEL_HIGH=gpt-5.2          # 复杂推理、多步规划
AGENT_MODEL_MID=gpt-4.1           # 标准业务 Agent
AGENT_MODEL_LOW=gpt-4.1-mini      # 简单任务、高并发场景

# 工具配置
BING_CONNECTION_ID=/subscriptions/.../connections/bing-search
AI_SEARCH_CONNECTION_ID=/subscriptions/.../connections/ai-search

# 记忆存储
REDIS_CONNECTION_STRING=redis://...@your-cache.redis.cache.windows.net:6380
MEM0_API_KEY=m0-...

# 可观测性
APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=...
# 安装所有依赖
pip install azure-ai-projects==2.0.0b3             azure-identity             azure-ai-agents             openai             azure-monitor-opentelemetry             tenacity             redis             mem0ai             python-dotenv

3.4 统一客户端初始化

# agent_client.py - 统一客户端工厂
import os
from functools import lru_cache
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv

load_dotenv()

@lru_cache(maxsize=1)
def get_project_client() -> AIProjectClient:
    """获取全局单例 AIProjectClient(支持本地/CI/Azure 三种环境)"""
    credential = DefaultAzureCredential()
    client = AIProjectClient(
        endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
        credential=credential,
    )
    return client

def setup_observability():
    """初始化 Azure Monitor + OpenTelemetry 追踪"""
    conn_str = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
    if conn_str:
        from azure.monitor.opentelemetry import configure_azure_monitor
        configure_azure_monitor(connection_string=conn_str)
        print("✅ Azure Monitor 追踪已启用")
    else:
        print("⚠️  未配置 Application Insights,使用控制台追踪")

# 应用启动时调用
setup_observability()

四、单体 Agent 实战:工具调用全链路

4.1 创建你的第一个生产级 Agent

# single_agent_demo.py
import os, json
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import (
    AgentEventHandler,
    CodeInterpreterTool,
    FileSearchTool,
    BingGroundingTool,
    FunctionTool,
    ToolSet,
    MessageDeltaChunk,
    RunStepDeltaChunk,
    ThreadMessage,
    ThreadRun,
    RunStep,
)
from azure.identity import DefaultAzureCredential
from agent_client import get_project_client

def get_toolset() -> ToolSet:
    """组装工具集(按需启用)"""
    project = get_project_client()
    toolset = ToolSet()

    # 工具 1:代码解释器(数据分析、图表生成)
    toolset.add(CodeInterpreterTool())

    # 工具 2:文件搜索(上传的 PDF/Word 企业文档)
    toolset.add(FileSearchTool(vector_store_ids=[
        os.environ.get("VECTOR_STORE_ID", "")
    ]))

    # 工具 3:Bing 实时搜索(获取最新信息)
    bing_conn = project.connections.get(
        connection_name=os.environ.get("BING_CONNECTION_NAME", "bing-grounding")
    )
    toolset.add(BingGroundingTool(connection_id=bing_conn.id))

    # 工具 4:自定义函数(连接内部系统)
    toolset.add(FunctionTool(functions=get_enterprise_functions()))

    return toolset


def get_enterprise_functions() -> set:
    """注册企业内部业务函数"""
    import json

    def query_sales_data(
        start_date: str,
        end_date: str,
        region: str = "all",
        granularity: str = "monthly"
    ) -> str:
        """
        查询销售数据。
        :param start_date: 开始日期,格式 YYYY-MM-DD
        :param end_date: 结束日期,格式 YYYY-MM-DD
        :param region: 区域,可选 north/south/east/west/all
        :param granularity: 粒度,可选 daily/weekly/monthly
        :return: JSON 格式销售数据
        """
        # 实际生产中调用数据仓库 API
        mock_data = {
            "period": f"{start_date} ~ {end_date}",
            "region": region,
            "total_revenue": 12_580_000,
            "growth_rate": 0.183,
            "top_products": ["产品A", "产品B", "产品C"],
            "units_sold": 8423,
        }
        return json.dumps(mock_data, ensure_ascii=False)

    def create_incident_ticket(
        title: str,
        priority: str,
        description: str,
        assignee: str = "auto"
    ) -> str:
        """
        创建 IT 事故工单。
        :param title: 工单标题
        :param priority: 优先级 P1/P2/P3/P4
        :param description: 问题描述
        :param assignee: 负责人(auto 自动分配)
        :return: 工单 ID
        """
        ticket_id = f"INC-{hash(title) % 100000:05d}"
        return json.dumps({
            "ticket_id": ticket_id,
            "status": "created",
            "assigned_to": "on-call-engineer" if assignee == "auto" else assignee,
            "estimated_sla": "4h" if priority in ["P1", "P2"] else "24h",
        }, ensure_ascii=False)

    return {query_sales_data, create_incident_ticket}


# ===== 流式事件处理器 =====
class EnterpriseAgentEventHandler(AgentEventHandler):
    """处理 Agent 流式输出事件"""

    def on_message_delta(self, delta: MessageDeltaChunk) -> None:
        """逐字流式打印 Agent 输出"""
        if delta.text:
            print(delta.text, end="", flush=True)

    def on_run_step_delta(self, delta: RunStepDeltaChunk) -> None:
        """打印工具调用进度"""
        if delta.delta.step_details:
            details = delta.delta.step_details
            if hasattr(details, "tool_calls"):
                for tc in (details.tool_calls or []):
                    if hasattr(tc, "function") and tc.function:
                        func_name = tc.function.name or ""
                        if func_name:
                            print(f"\n🔧 调用工具: {func_name}", flush=True)

    def on_thread_message(self, message: ThreadMessage) -> None:
        print(f"\n✅ 消息完成 [role={message.role}]")

    def on_thread_run(self, run: ThreadRun) -> None:
        print(f"\n📊 Run 状态: {run.status}")

    def on_run_step(self, step: RunStep) -> None:
        if step.type == "tool_calls":
            print(f"\n⚙️  执行步骤: {step.type} [{step.status}]")

    def on_error(self, data: str) -> None:
        print(f"\n❌ 错误: {data}")

    def on_done(self) -> None:
        print("\n🎯 Agent 运行完成")


# ===== 主要 Agent 类 =====
class EnterpriseAnalystAgent:
    """企业数据分析 Agent(单体)"""

    SYSTEM_PROMPT = """你是一位企业数据分析专家,专注于销售数据分析与业务洞察。

**能力范围**:
- 查询并分析销售数据(支持时间范围、区域、产品筛选)
- 执行 Python 代码进行统计分析和可视化
- 搜索最新行业趋势和竞争情报
- 查阅公司内部知识库文档
- 创建 IT 工单处理技术问题

**输出规范**:
1. 数据分析结论须附具体数字支撑
2. 建议须标注置信度(高/中/低)
3. 图表使用 matplotlib,确保中文字体正确
4. 所有数据引用须标注来源

**禁止事项**:
- 不对未经查询的数据做假设
- 不承诺无法核实的指标
"""

    def __init__(self):
        self.project = get_project_client()
        self.toolset = get_toolset()
        self._agent = None
        self._thread = None

    def _ensure_agent(self):
        """懒加载:首次调用时创建 Agent(复用已有 Agent)"""
        if self._agent is None:
            # 生产建议:按 Agent 名搜索已有实例,避免重复创建
            self._agent = self.project.agents.create_agent(
                model=os.environ.get("AGENT_MODEL_HIGH", "gpt-5.2"),
                name="enterprise-analyst-v2",
                instructions=self.SYSTEM_PROMPT,
                toolset=self.toolset,
                temperature=0.1,    # 数据分析要确定性
                response_format={"type": "auto"},  # 允许混合文本+结构化
                metadata={
                    "version": "2.0",
                    "team": "data-platform",
                    "env": os.getenv("ENVIRONMENT", "dev"),
                },
            )
            print(f"🤖 Agent 已创建: {self._agent.id}")

    def _ensure_thread(self):
        """创建或恢复对话线程"""
        if self._thread is None:
            self._thread = self.project.agents.create_thread()
            print(f"💬 线程已创建: {self._thread.id}")

    def chat(self, user_message: str, stream: bool = True) -> str:
        """发送消息并获取响应"""
        self._ensure_agent()
        self._ensure_thread()

        # 添加用户消息
        self.project.agents.create_message(
            thread_id=self._thread.id,
            role="user",
            content=user_message,
        )

        print(f"\n{'='*60}")
        print(f"👤 用户: {user_message}")
        print(f"{'='*60}")
        print("🤖 Agent: ", end="")

        if stream:
            # 流式输出
            with self.project.agents.run_stream(
                thread_id=self._thread.id,
                agent_id=self._agent.id,
                event_handler=EnterpriseAgentEventHandler(),
                max_completion_tokens=4096,
            ) as stream_ctx:
                stream_ctx.until_done()
        else:
            # 非流式(适合批处理)
            run = self.project.agents.create_and_process_run(
                thread_id=self._thread.id,
                agent_id=self._agent.id,
            )
            if run.status == "failed":
                raise RuntimeError(f"Agent run failed: {run.last_error}")

        # 获取最新响应
        messages = self.project.agents.list_messages(
            thread_id=self._thread.id
        )
        latest = messages.get_last_text_message_by_role("assistant")
        return latest.text.value if latest else ""

    def cleanup(self):
        """清理资源(生产中通常保留 Agent,仅在版本更新时删除)"""
        if self._agent:
            self.project.agents.delete_agent(self._agent.id)
            print(f"🗑️  Agent {self._agent.id} 已删除")


# ===== 使用示例 =====
if __name__ == "__main__":
    agent = EnterpriseAnalystAgent()

    # 示例 1:数据分析
    agent.chat("分析 2026 年 1 月的销售数据,重点关注增长异常的区域")

    # 示例 2:结合实时搜索
    agent.chat("对比我们的 18.3% 增速与行业平均水平,给出竞争建议")

    # 示例 3:代码执行 + 可视化
    agent.chat("用 Python 画出过去 3 个月的销售趋势折线图")

4.2 Code Interpreter:沙盒代码执行

Code Interpreter 是 Agent 最强大的工具之一,让 Agent 能够自主编写并执行 Python 代码:

# code_interpreter_advanced.py
import os
from io import BytesIO
from azure.ai.projects.models import CodeInterpreterTool, FileSearchTool
from agent_client import get_project_client

def create_data_analyst_agent_with_files():
    """创建带文件上传能力的数据分析 Agent"""
    project = get_project_client()

    # 上传分析文件
    with open("sales_data_2025.csv", "rb") as f:
        uploaded_file = project.agents.upload_file_and_poll(
            file=f,
            purpose="assistants",  # 供 Agent 使用
        )
    print(f"📎 文件上传成功: {uploaded_file.id}")

    # 创建带 Code Interpreter 的 Agent
    agent = project.agents.create_agent(
        model=os.environ.get("AGENT_MODEL_HIGH", "gpt-5.2"),
        name="data-analyst-with-files",
        instructions="""你是专业的数据分析师。
使用上传的销售 CSV 文件进行分析:
1. 先用 pandas 读取并探索数据结构
2. 执行描述性统计
3. 生成可视化图表(使用 matplotlib,设置中文字体)
4. 输出关键洞察
分析完成后,将图表保存并提供下载链接。""",
        tools=[CodeInterpreterTool()],
        tool_resources={
            "code_interpreter": {
                "file_ids": [uploaded_file.id]
            }
        },
    )

    thread = project.agents.create_thread()
    project.agents.create_message(
        thread_id=thread.id,
        role="user",
        content="分析销售数据,找出 TOP 5 产品并生成对比柱状图"
    )

    run = project.agents.create_and_process_run(
        thread_id=thread.id,
        agent_id=agent.id,
    )

    # 下载生成的图片
    messages = project.agents.list_messages(thread_id=thread.id)
    for msg in messages.data:
        for attachment in (msg.attachments or []):
            if attachment.file_id:
                image_data = project.agents.get_file_content(attachment.file_id)
                with open(f"chart_{attachment.file_id[:8]}.png", "wb") as out:
                    out.write(image_data.read())
                print(f"🖼️  图表已保存: chart_{attachment.file_id[:8]}.png")

    # 清理文件(生产中定期清理避免存储费用)
    project.agents.delete_file(uploaded_file.id)
    project.agents.delete_agent(agent.id)

五、五大编排模式:多 Agent 协作蓝图

5.1 为什么需要多 Agent 编排?

单体 Agent 的局限性:

❌ 单体 Agent 问题:
  • "全知全能"提示词过长 → Token 浪费 + 性能下降
  • 工具过多 → 选择混乱,错误调用率上升
  • 无法专业分工 → 代码 Agent 被迫做设计决策
  • 单点故障 → 一个工具异常影响整个流程
  • 无法并行 → 串行处理导致延迟高

✅ 多 Agent 解法:
  • 每个 Agent 专注一个领域(专业分工)
  • 并行执行相互独立的任务(速度提升 3-5x)
  • 故障隔离(一个 Agent 失败不影响其他 Agent)
  • 独立可测试、可优化、可替换

5.2 五大编排模式详解

模式一:Sequential(顺序编排)

适用场景:流水线式任务,A 完成后传给 B,B 传给 C

用户请求 → [研究 Agent] → 资料 → [分析 Agent] → 洞察 → [报告 Agent] → 最终报告
# sequential_workflow.py
from azure.ai.projects.models import AgentEventHandler
from agent_client import get_project_client

class ResearchReportPipeline:
    """顺序编排:研究 → 分析 → 报告 三段流水线"""

    def __init__(self):
        self.project = get_project_client()
        self._agents = {}

    def _get_or_create_agent(self, name: str, instructions: str, model: str):
        if name not in self._agents:
            agent = self.project.agents.create_agent(
                model=model,
                name=name,
                instructions=instructions,
            )
            self._agents[name] = agent
        return self._agents[name]

    def run(self, topic: str) -> dict:
        """执行完整研究报告流水线"""

        # === 阶段一:研究 Agent(高推理能力)===
        researcher = self._get_or_create_agent(
            name="researcher",
            model="gpt-5.2",
            instructions="""你是研究专家,专注于信息收集和事实核查。
输出格式:JSON,包含 key_facts、sources、confidence_level。
不做主观判断,只呈现客观事实。""",
        )

        thread_r = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=thread_r.id,
            role="user",
            content=f"研究主题:{topic}\n收集关键事实、数据和可靠来源。"
        )
        run_r = self.project.agents.create_and_process_run(
            thread_id=thread_r.id, agent_id=researcher.id
        )
        research_result = self._get_last_response(thread_r.id)
        print(f"✅ 研究阶段完成 ({len(research_result)} 字符)")

        # === 阶段二:分析 Agent(中等推理)===
        analyst = self._get_or_create_agent(
            name="analyst",
            model="gpt-4.1",
            instructions="""你是商业分析专家,专注于洞察提炼和趋势判断。
输入:研究资料(JSON)
输出:分析报告(JSON),包含 key_insights、risks、opportunities、recommendations。""",
        )

        thread_a = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=thread_a.id,
            role="user",
            content=f"基于以下研究资料进行深度分析:\n{research_result}"
        )
        run_a = self.project.agents.create_and_process_run(
            thread_id=thread_a.id, agent_id=analyst.id
        )
        analysis_result = self._get_last_response(thread_a.id)
        print(f"✅ 分析阶段完成 ({len(analysis_result)} 字符)")

        # === 阶段三:报告 Agent(写作能力)===
        writer = self._get_or_create_agent(
            name="writer",
            model="gpt-4.1-mini",   # 写作任务不需要最强推理,节省成本
            instructions="""你是商业写作专家,擅长将技术分析转化为高管可读的报告。
格式要求:Markdown,包含执行摘要、详细分析、行动建议、附录。
风格:简洁专业,数据支撑,可操作性强。""",
        )

        thread_w = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=thread_w.id,
            role="user",
            content=f"撰写关于"{topic}"的专业报告。\n研究资料:{research_result}\n分析结论:{analysis_result}"
        )
        run_w = self.project.agents.create_and_process_run(
            thread_id=thread_w.id, agent_id=writer.id
        )
        final_report = self._get_last_response(thread_w.id)
        print(f"✅ 报告阶段完成 ({len(final_report)} 字符)")

        return {
            "topic": topic,
            "research": research_result,
            "analysis": analysis_result,
            "report": final_report,
        }

    def _get_last_response(self, thread_id: str) -> str:
        messages = self.project.agents.list_messages(thread_id=thread_id)
        latest = messages.get_last_text_message_by_role("assistant")
        return latest.text.value if latest else ""

模式二:Concurrent(并发编排)

适用场景:相互独立的子任务,可并行执行,最后汇总

                    ┌──[市场分析 Agent]──┐
用户请求 ──[分发]──┼──[技术评估 Agent]──┼──[汇总 Agent]──► 综合报告
                    └──[财务分析 Agent]──┘
# concurrent_workflow.py
import asyncio
from agent_client import get_project_client

async def run_agent_task(project, agent_id: str, prompt: str) -> str:
    """异步执行单个 Agent 任务"""
    thread = project.agents.create_thread()
    project.agents.create_message(
        thread_id=thread.id,
        role="user",
        content=prompt
    )
    run = project.agents.create_and_process_run(
        thread_id=thread.id,
        agent_id=agent_id,
    )
    messages = project.agents.list_messages(thread_id=thread.id)
    latest = messages.get_last_text_message_by_role("assistant")
    return latest.text.value if latest else ""


async def parallel_due_diligence(company: str) -> dict:
    """并发尽职调查:三个维度同时分析,速度提升约 3x"""
    project = get_project_client()

    # 创建三个专业 Agent
    agents = {
        "market": project.agents.create_agent(
            model="gpt-5.2", name="market-analyst",
            instructions="专注市场规模、竞争格局、增长趋势分析。输出 JSON。"
        ),
        "tech": project.agents.create_agent(
            model="gpt-5.2", name="tech-evaluator",
            instructions="专注技术架构、研发能力、专利、技术债务评估。输出 JSON。"
        ),
        "finance": project.agents.create_agent(
            model="gpt-4.1", name="finance-analyst",
            instructions="专注财务报表、现金流、估值、风险评估。输出 JSON。"
        ),
    }

    # 并发执行(asyncio.gather 并行)
    market_task   = run_agent_task(project, agents["market"].id,
                                   f"分析 {company} 的市场地位和竞争优势")
    tech_task     = run_agent_task(project, agents["tech"].id,
                                   f"评估 {company} 的技术栈和研发能力")
    finance_task  = run_agent_task(project, agents["finance"].id,
                                   f"分析 {company} 2025 年财务状况")

    # 等待所有并发任务完成(比顺序执行快约 3x)
    market_result, tech_result, finance_result = await asyncio.gather(
        market_task, tech_task, finance_task
    )
    print("✅ 三项并发分析全部完成")

    # 汇总 Agent:综合三方结论
    synthesizer = project.agents.create_agent(
        model="gpt-5.2", name="synthesizer",
        instructions="将市场、技术、财务三方分析整合为投资决策报告。"
    )
    synthesis = await run_agent_task(
        project, synthesizer.id,
        f"""综合以下三方分析,输出投资建议报告:
市场分析:{market_result}
技术评估:{tech_result}
财务分析:{finance_result}"""
    )

    return {
        "company": company,
        "market": market_result,
        "technology": tech_result,
        "finance": finance_result,
        "recommendation": synthesis,
    }

模式三:Handoff(交接编排)

适用场景:根据对话上下文,动态将控制权转交给最合适的 Agent

用户说"我想退款" → [路由 Agent] ──► [退款专员 Agent]
用户说"产品故障" → [路由 Agent] ──► [技术支持 Agent]
用户说"我要投诉" → [路由 Agent] ──► [投诉处理 Agent]
# handoff_workflow.py
from azure.ai.projects.models import AgentNameTarget
from agent_client import get_project_client

def create_customer_service_handoff_system():
    """客户服务:基于意图的动态交接"""
    project = get_project_client()

    # 专业 Agent 定义
    agents_config = {
        "triage": {
            "name": "triage-agent",
            "model": "gpt-4.1-mini",     # 路由只需轻量模型
            "instructions": """你是客服路由 Agent,负责判断用户意图并转接。
意图分类规则:
- 退款/退货/订单问题 → 转接 refund-agent
- 技术故障/使用问题 → 转接 tech-support-agent
- 投诉/建议/表扬  → 转接 feedback-agent
- 其他             → 自行处理

输出格式:直接使用 handoff 工具转接,不要解释。""",
        },
        "refund": {
            "name": "refund-agent",
            "model": "gpt-4.1",
            "instructions": """你是退款专员,专注处理退款、退货、订单问题。
你有权限查询订单状态并发起退款申请。
处理流程:1)核实身份 2)查询订单 3)评估退款资格 4)处理申请 5)告知结果""",
        },
        "tech_support": {
            "name": "tech-support-agent",
            "model": "gpt-5.2",          # 技术问题需要强推理
            "instructions": """你是技术支持专家,处理产品故障和使用问题。
解决流程:1)收集问题详情 2)远程诊断 3)提供解决方案 4)验证修复 5)预防建议""",
        },
        "feedback": {
            "name": "feedback-agent",
            "model": "gpt-4.1-mini",
            "instructions": "处理用户投诉、建议和表扬,记录并反馈处理结果。",
        },
    }

    created_agents = {}
    for key, cfg in agents_config.items():
        created_agents[key] = project.agents.create_agent(
            model=cfg["model"],
            name=cfg["name"],
            instructions=cfg["instructions"],
        )

    # 创建共享线程(所有 Agent 共用同一对话上下文)
    thread = project.agents.create_thread()

    return created_agents, thread

模式四:Group Chat(群聊编排)

适用场景:多个专家 Agent 共同讨论,迭代优化直到达成共识

               ┌─ [架构师 Agent] ─┐
用户提出需求 → ├─ [安全专家 Agent]─┼──► 多轮讨论 ──► 最终设计方案
               └─ [成本顾问 Agent]─┘
# group_chat_workflow.py
# 群聊模式:多 Agent 专家圆桌讨论
# 注:MAF 原生群聊 API 仍在 Preview,以下为等效实现

import json
from dataclasses import dataclass, field
from typing import List
from agent_client import get_project_client

@dataclass
class AgentMessage:
    agent_name: str
    content: str
    round: int

class ExpertRoundtable:
    """专家圆桌:多 Agent 迭代讨论直到达成共识"""

    EXPERTS = [
        {
            "name": "architect",
            "role": "系统架构师",
            "focus": "技术可行性、扩展性、性能",
            "model": "gpt-5.2",
        },
        {
            "name": "security",
            "role": "安全专家",
            "focus": "安全威胁、合规要求、数据保护",
            "model": "gpt-5.2",
        },
        {
            "name": "cost",
            "role": "成本顾问",
            "focus": "TCO、ROI、运营成本优化",
            "model": "gpt-4.1",
        },
    ]

    def __init__(self, max_rounds: int = 3):
        self.project = get_project_client()
        self.max_rounds = max_rounds
        self._agents = {}

    def discuss(self, proposal: str) -> List[AgentMessage]:
        """组织专家圆桌讨论"""
        history: List[AgentMessage] = []
        context = proposal

        for round_num in range(1, self.max_rounds + 1):
            print(f"\n{'='*50}")
            print(f"第 {round_num} 轮讨论")
            print(f"{'='*50}")

            round_messages = []
            for expert in self.EXPERTS:
                agent = self._get_or_create_expert_agent(expert)
                thread = self.project.agents.create_thread()

                # 包含完整讨论历史,确保 Agent 知晓其他专家的意见
                history_text = "\n".join([
                    f"[{m.agent_name}] 第{m.round}轮: {m.content[:200]}..."
                    for m in history
                ]) if history else "(第一轮,无历史讨论)"

                self.project.agents.create_message(
                    thread_id=thread.id,
                    role="user",
                    content=f"""【提案】{context}

【历史讨论】
{history_text}

请从{expert["role"]}视角(聚焦:{expert["focus"]}):
1. 指出关键风险或优化点(≤3条)
2. 对上一轮其他专家观点的回应
3. 你的具体建议
输出精简,每条不超过 50 字。"""
                )

                run = self.project.agents.create_and_process_run(
                    thread_id=thread.id,
                    agent_id=agent.id,
                )
                messages = self.project.agents.list_messages(thread_id=thread.id)
                response = messages.get_last_text_message_by_role("assistant")
                content = response.text.value if response else ""

                msg = AgentMessage(
                    agent_name=expert["role"],
                    content=content,
                    round=round_num
                )
                round_messages.append(msg)
                history.append(msg)
                print(f"\n🎯 [{expert['role']}]: {content[:300]}...")

            # 检查是否达成共识(简化判断:第三轮直接汇总)
            if round_num == self.max_rounds:
                break

        return history

    def _get_or_create_expert_agent(self, expert_config: dict):
        name = expert_config["name"]
        if name not in self._agents:
            self._agents[name] = self.project.agents.create_agent(
                model=expert_config["model"],
                name=name,
                instructions=f"""你是{expert_config["role"]},
专业聚焦:{expert_config["focus"]}。
风格:简洁专业,直指核心,不重复其他专家的观点。""",
            )
        return self._agents[name]

模式五:Magentic(磁力/监督者编排)

最高级别:监督者 Agent 动态分配任务给最合适的工作 Agent

                   ┌─────────────────────┐
                   │   监督者 Agent       │
                   │  (任务分解 + 分发)   │
                   └─┬──────┬──────┬─────┘
                     ▼      ▼      ▼
              [专家A] [专家B] [专家C]  ← 动态选择
                     ▼      ▼      ▼
                   监督者聚合结果 → 用户
# magentic_workflow.py
# 监督者模式:Supervisor Agent 动态分配任务

from agent_client import get_project_client
import json, re

class SupervisorAgent:
    """监督者 Agent:自主分解任务并调度工作 Agent"""

    SUPERVISOR_INSTRUCTIONS = """你是多 Agent 系统的监督者(Supervisor)。

**你的职责**:
1. 分析用户的复杂请求
2. 将其分解为可并行/顺序执行的子任务
3. 为每个子任务选择最合适的工作 Agent
4. 监控执行进度并整合最终结果

**可用工作 Agent 列表**:
- "researcher": 信息搜集、事实核查(工具:Bing 搜索)
- "data_analyst": 数据分析、统计计算(工具:Code Interpreter)
- "writer": 内容撰写、报告生成
- "code_reviewer": 代码审查、安全扫描(工具:文件搜索)

**任务分解格式**(严格输出 JSON):
{
  "task_plan": [
    {"step": 1, "agent": "researcher", "prompt": "具体指令", "depends_on": []},
    {"step": 2, "agent": "data_analyst", "prompt": "具体指令", "depends_on": [1]},
    {"step": 3, "agent": "writer", "prompt": "具体指令", "depends_on": [1, 2]}
  ],
  "reasoning": "选择此分解方案的原因"
}"""

    def __init__(self):
        self.project = get_project_client()
        self._supervisor = None
        self._workers = {}

    def execute(self, user_request: str) -> str:
        """执行监督者模式的完整工作流"""
        supervisor = self._get_supervisor()
        thread = self.project.agents.create_thread()

        # Step 1:监督者分解任务
        self.project.agents.create_message(
            thread_id=thread.id,
            role="user",
            content=f"用户请求:{user_request}\n请制定任务执行计划(JSON 格式)。"
        )
        run = self.project.agents.create_and_process_run(
            thread_id=thread.id, agent_id=supervisor.id
        )
        messages = self.project.agents.list_messages(thread_id=thread.id)
        plan_response = messages.get_last_text_message_by_role("assistant")
        plan_text = plan_response.text.value if plan_response else "{}"

        # 解析任务计划
        try:
            json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
            plan = json.loads(json_match.group()) if json_match else {}
        except Exception:
            plan = {}

        task_plan = plan.get("task_plan", [])
        print(f"📋 任务计划:{len(task_plan)} 个子任务")
        print(f"💭 规划理由:{plan.get('reasoning', '未说明')}")

        # Step 2:按计划执行子任务(考虑依赖关系)
        results = {}
        for task in sorted(task_plan, key=lambda x: x["step"]):
            step = task["step"]
            agent_name = task["agent"]
            depends = task.get("depends_on", [])

            # 检查依赖任务是否完成
            dep_results = {d: results.get(d, "") for d in depends}
            dep_context = "\n".join([
                f"步骤{d}结果:{r[:500]}"
                for d, r in dep_results.items()
            ])

            full_prompt = f"{task['prompt']}\n{dep_context}" if dep_context else task["prompt"]

            print(f"\n⚙️  执行步骤 {step}:[{agent_name}]")
            worker = self._get_worker(agent_name)
            worker_thread = self.project.agents.create_thread()
            self.project.agents.create_message(
                thread_id=worker_thread.id,
                role="user",
                content=full_prompt
            )
            worker_run = self.project.agents.create_and_process_run(
                thread_id=worker_thread.id,
                agent_id=worker.id,
            )
            worker_messages = self.project.agents.list_messages(
                thread_id=worker_thread.id
            )
            worker_response = worker_messages.get_last_text_message_by_role("assistant")
            results[step] = worker_response.text.value if worker_response else ""
            print(f"✅ 步骤 {step} 完成 ({len(results[step])} 字符)")

        # Step 3:监督者汇总结果
        all_results = "\n\n".join([
            f"**步骤{s}结果**:\n{r}"
            for s, r in results.items()
        ])
        self.project.agents.create_message(
            thread_id=thread.id,
            role="user",
            content=f"所有子任务已完成,请汇总以下结果并生成最终报告:\n{all_results}"
        )
        final_run = self.project.agents.create_and_process_run(
            thread_id=thread.id, agent_id=supervisor.id
        )
        final_messages = self.project.agents.list_messages(thread_id=thread.id)
        final_response = final_messages.get_last_text_message_by_role("assistant")
        return final_response.text.value if final_response else ""

    def _get_supervisor(self):
        if self._supervisor is None:
            self._supervisor = self.project.agents.create_agent(
                model="gpt-5.2",  # 监督者需要最强推理能力
                name="supervisor-agent",
                instructions=self.SUPERVISOR_INSTRUCTIONS,
            )
        return self._supervisor

    def _get_worker(self, agent_name: str):
        worker_configs = {
            "researcher": ("gpt-5.2", "信息搜集专家,输出结构化 JSON"),
            "data_analyst": ("gpt-4.1", "数据分析专家,擅长统计和可视化"),
            "writer": ("gpt-4.1-mini", "商业写作专家"),
            "code_reviewer": ("gpt-5.2", "代码安全审查专家"),
        }
        if agent_name not in self._workers:
            model, instructions = worker_configs.get(
                agent_name, ("gpt-4.1-mini", "通用助手")
            )
            self._workers[agent_name] = self.project.agents.create_agent(
                model=model,
                name=f"worker-{agent_name}",
                instructions=instructions,
            )
        return self._workers[agent_name]

5.3 编排模式选择决策树

                    你的任务是什么?
                         │
          ┌──────────────┼──────────────┐
     流水线任务        并行任务         对话路由
          │                │                │
    Sequential          Concurrent       Handoff
    (顺序编排)          (并发编排)       (交接编排)
                              │
                    需要专家讨论?
                    ├── 是 → Group Chat(群聊)
                    └── 需要动态分配 → Magentic(监督者)

性能对比(相同任务):
  Sequential:100%(基准)
  Concurrent:30-35%(提速 3x)
  Magentic:  50-60%(质量最高,延迟中等)

六、A2A 协议与 MCP:跨系统 Agent 互联

6.1 A2A 协议:Agent 互联网的 HTTP

Agent-to-Agent(A2A)协议由 Google 与 Microsoft 联合推出,是 AI Agent 跨系统通信的开放标准:

┌──────────────────────────────────────────────────────────────┐
│                    A2A 协议核心概念                           │
├──────────────┬─────────────────────────────────────────────┤
│  Agent Card   │ 每个 Agent 的"名片",描述能力、端点、认证方式│
│  Task        │ Agent 间传递的工作单元(含上下文、状态)      │
│  Message     │ 结构化消息(文本/文件/结构化数据)            │
│  Artifact    │ Agent 执行后产生的输出物                      │
│  Auth        │ 支持 OAuth 2.0、API Key、mTLS                 │
└──────────────┴─────────────────────────────────────────────┘

典型 A2A 场景:
  你公司的 HR Agent ←→ 供应商的 BGC 调查 Agent
  你的财务 Agent   ←→ 银行的风险评估 Agent
  你的 DevOps Agent ←→ 云厂商的部署 Agent
# a2a_integration.py
# 在 Foundry Agent Service 中使用 A2A 工具

import os
from azure.ai.projects.models import FunctionTool, ToolSet
from agent_client import get_project_client
import httpx, json

class A2AAgentConnector:
    """A2A 协议:连接外部 Agent 的连接器"""

    def __init__(self, remote_agent_card_url: str, auth_token: str):
        """
        :param remote_agent_card_url: 远程 Agent 的 Card 地址
                                      形如 https://partner.com/.well-known/agent.json
        :param auth_token: OAuth Bearer Token
        """
        self.card_url = remote_agent_card_url
        self.auth_token = auth_token
        self._agent_card = None

    async def fetch_agent_card(self) -> dict:
        """获取远程 Agent 的能力描述卡"""
        if self._agent_card is None:
            async with httpx.AsyncClient() as client:
                resp = await client.get(self.card_url)
                self._agent_card = resp.json()
        return self._agent_card

    async def delegate_task(self, task_description: str, context: dict) -> str:
        """
        将任务委托给远程 Agent。
        :param task_description: 任务描述
        :param context: 任务上下文(文件、数据、约束等)
        :return: 远程 Agent 的执行结果
        """
        card = await self.fetch_agent_card()
        endpoint = card.get("endpoint", "")

        # A2A Task 消息格式
        task_payload = {
            "id": f"task-{os.urandom(4).hex()}",
            "message": {
                "role": "user",
                "parts": [
                    {"type": "text", "text": task_description}
                ]
            },
            "metadata": context,
        }

        async with httpx.AsyncClient(timeout=120.0) as client:
            resp = await client.post(
                f"{endpoint}/tasks/send",
                json=task_payload,
                headers={
                    "Authorization": f"Bearer {self.auth_token}",
                    "Content-Type": "application/json",
                    "X-A2A-Version": "1.0",
                }
            )
            result = resp.json()

        # 提取 Agent 输出
        artifacts = result.get("artifacts", [])
        if artifacts:
            return artifacts[0].get("parts", [{}])[0].get("text", "")
        return result.get("status", {}).get("message", "任务完成,无文本输出")


def create_a2a_enabled_agent():
    """创建支持 A2A 调用的本地 Agent"""
    project = get_project_client()

    # A2A 连接器实例(实际使用时从配置读取)
    bgc_connector = A2AAgentConnector(
        remote_agent_card_url="https://bgc-partner.example.com/.well-known/agent.json",
        auth_token=os.environ.get("BGC_AGENT_TOKEN", ""),
    )

    # 将 A2A 调用封装为本地 Function Tool
    async def run_background_check(
        candidate_id: str,
        check_type: str = "standard"
    ) -> str:
        """
        调用合作伙伴的背景调查 Agent(A2A 协议)。
        :param candidate_id: 候选人 ID
        :param check_type: 调查类型 standard/enhanced/basic
        :return: 调查结果摘要
        """
        result = await bgc_connector.delegate_task(
            task_description=f"对候选人 {candidate_id} 执行 {check_type} 级别背景调查",
            context={"requestor": "azure-hr-agent", "sla": "48h"}
        )
        return result

    agent = project.agents.create_agent(
        model="gpt-5.2",
        name="hr-agent-with-a2a",
        instructions="""你是 HR 招聘助手,负责候选人评估全流程。
对于背景调查需求,使用 run_background_check 工具连接合作伙伴 Agent。
背景调查结果 48 小时内返回。""",
        tools=[FunctionTool(functions={run_background_check})],
    )
    return agent

6.2 MCP:工具生态的统一接口

Model Context Protocol(MCP)由 Anthropic 提出,Microsoft 已将其纳入 MAF 核心协议栈:

┌────────────────────────────────────────────────────────────┐
│                   MCP 生态全景(2026)                       │
├──────────────────────────────────────────────────────────┤
│  MCP Servers(工具服务端)                                  │
│  ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌───────────────┐  │
│  │ GitHub  │ │  Slack  │ │  Jira    │ │ Azure Blob     │  │
│  │ MCP Svr │ │ MCP Svr │ │ MCP Svr  │ │ MCP Svr        │  │
│  └─────────┘ └─────────┘ └──────────┘ └───────────────┘  │
│                                                            │
│  MCP 协议层(标准化工具发现 + 调用)                         │
│                                                            │
│  MCP Clients(工具使用端)                                  │
│  MAF Agent · Claude · Cursor · VS Code AI Toolkit         │
└────────────────────────────────────────────────────────────┘
# mcp_integration.py
# 在 Foundry Agent Service 中注册 MCP Server

from azure.ai.projects.models import McpTool
from agent_client import get_project_client
import os

def create_mcp_powered_agent():
    """创建连接 MCP 服务的 DevOps Agent"""
    project = get_project_client()

    # 注册 GitHub MCP Server(Foundry 托管)
    github_mcp = McpTool(
        server_url="https://github.mcp.azure.com",
        server_label="github",
        allowed_tools=[
            "create_issue",
            "list_pull_requests",
            "get_repository",
            "create_branch",
            "merge_pull_request",
        ],
        headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"},
    )

    # 注册 Jira MCP Server(自托管)
    jira_mcp = McpTool(
        server_url=os.environ.get("JIRA_MCP_SERVER_URL", ""),
        server_label="jira",
        allowed_tools=["create_ticket", "update_status", "assign_ticket"],
        headers={
            "Authorization": f"Bearer {os.environ.get('JIRA_TOKEN', '')}",
            "X-Atlassian-Domain": os.environ.get("JIRA_DOMAIN", ""),
        },
    )

    # 创建 DevOps Agent
    agent = project.agents.create_agent(
        model="gpt-5.2",
        name="devops-agent-mcp",
        instructions="""你是 DevOps 自动化 Agent,专注于 CI/CD 流程管理。

**工作流程**:
1. 代码推送 → 检查 PR 状态 → 运行测试
2. 测试通过 → 自动合并 PR
3. 部署失败 → 创建 Jira 工单 + GitHub Issue + 通知相关人员
4. 定期巡检 → 检查积压 PR(>3天未合并自动提醒)

使用 GitHub 和 Jira 工具完成上述自动化任务。""",
        tools=[github_mcp, jira_mcp],
    )
    return agent

七、持久化记忆:让 Agent 记住你

7.1 Foundry 托管记忆 vs 自建记忆

┌───────────────────────────────────────────────────────────────┐
│               记忆类型对比                                     │
├──────────────┬────────────────────────────────────────────────┤
│  记忆类型     │  说明                                          │
├──────────────┼────────────────────────────────────────────────┤
│  线程内记忆   │  当前会话的完整对话历史(线程 ID 绑定)         │
│              │  → Foundry 自动维护,无需额外开发               │
├──────────────┼────────────────────────────────────────────────┤
│  短期外部记忆 │  跨会话但时效性强的信息(用户偏好、近期决策)   │
│              │  → Redis 缓存(TTL 7-30 天)                   │
├──────────────┼────────────────────────────────────────────────┤
│  长期记忆     │  永久性的用户画像、历史行为、重要结论           │
│              │  → Azure AI Search 向量索引 + Mem0 托管服务    │
├──────────────┼────────────────────────────────────────────────┤
│  程序性记忆   │  Agent 的技能、流程、规则更新                  │
│              │  → Agent 配置版本化(GitOps)                  │
└──────────────┴────────────────────────────────────────────────┘

7.2 Mem0 长期记忆集成

# long_term_memory.py
# 使用 Mem0 + Foundry 实现跨会话长期记忆

import os
from mem0 import MemoryClient
from agent_client import get_project_client

class LongTermMemoryAgent:
    """带长期记忆的 Agent:跨会话记住用户偏好和历史"""

    def __init__(self, user_id: str):
        self.user_id = user_id
        self.project = get_project_client()

        # 初始化 Mem0 客户端
        self.memory = MemoryClient(api_key=os.environ["MEM0_API_KEY"])
        self._agent = None

    def chat(self, user_message: str, session_id: str) -> str:
        """带记忆的对话"""

        # Step 1:从长期记忆检索相关上下文
        relevant_memories = self.memory.search(
            query=user_message,
            user_id=self.user_id,
            limit=5,
        )

        memory_context = ""
        if relevant_memories:
            memory_items = [m["memory"] for m in relevant_memories]
            memory_context = f"""【用户历史记忆】
{chr(10).join(f"- {m}" for m in memory_items)}

请基于以上历史背景个性化你的回答。"""

        # Step 2:构建带记忆上下文的 Agent
        agent = self._get_or_create_agent(
            extra_instructions=memory_context
        )

        # Step 3:使用已有线程(如有)或创建新线程
        thread_id = self._get_or_create_thread(session_id)
        self.project.agents.create_message(
            thread_id=thread_id,
            role="user",
            content=user_message,
        )

        run = self.project.agents.create_and_process_run(
            thread_id=thread_id,
            agent_id=agent.id,
        )

        # Step 4:获取响应
        messages = self.project.agents.list_messages(thread_id=thread_id)
        latest = messages.get_last_text_message_by_role("assistant")
        response = latest.text.value if latest else ""

        # Step 5:异步更新长期记忆(从对话中提取新记忆)
        self._update_memory_async(user_message, response)

        return response

    def _update_memory_async(self, user_message: str, agent_response: str):
        """从对话中提取并存储新记忆"""
        # Mem0 自动提取关键信息(用户偏好、事实、决策等)
        self.memory.add(
            messages=[
                {"role": "user", "content": user_message},
                {"role": "assistant", "content": agent_response},
            ],
            user_id=self.user_id,
            metadata={
                "source": "foundry-agent",
                "importance": "medium",
            }
        )

    def get_user_profile(self) -> list:
        """获取用户完整记忆档案"""
        return self.memory.get_all(user_id=self.user_id)

    def _get_or_create_agent(self, extra_instructions: str = "") -> object:
        base_instructions = """你是个人 AI 助手,了解用户的历史偏好和背景。
提供个性化、有记忆的对话体验。"""
        full_instructions = base_instructions + "\n\n" + extra_instructions

        if self._agent is None:
            self._agent = self.project.agents.create_agent(
                model=os.environ.get("AGENT_MODEL_HIGH", "gpt-5.2"),
                name=f"personal-agent-{self.user_id[:8]}",
                instructions=full_instructions,
            )
        else:
            # 更新记忆上下文(每次对话刷新)
            self._agent = self.project.agents.update_agent(
                agent_id=self._agent.id,
                instructions=full_instructions,
            )
        return self._agent

    def _get_or_create_thread(self, session_id: str) -> str:
        """管理会话线程(简化版:生产中使用 DB 存储 session->thread 映射)"""
        # 实际生产:用 Redis/数据库存储 session_id → thread_id 映射
        if not hasattr(self, "_threads"):
            self._threads = {}
        if session_id not in self._threads:
            thread = self.project.agents.create_thread()
            self._threads[session_id] = thread.id
        return self._threads[session_id]

八、生产级多 Agent 系统:企业知识工作流

8.1 完整示例:企业研究报告系统

整合本文所有技术点,构建一个完整的企业级多 Agent 研究系统:

┌─────────────────────────────────────────────────────────────────────────┐
│                   企业研究报告系统架构                                     │
│                                                                         │
│  用户输入                                                                │
│     │                                                                   │
│     ▼                                                                   │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                监督者 Agent(GPT‑5.2)                            │  │
│  │  • 解析用户意图   • 制定研究计划   • 协调子 Agent   • 汇总输出    │  │
│  └────┬──────────────────┬─────────────────────┬────────────────────┘  │
│       │ 并发执行           │                     │                       │
│       ▼                  ▼                     ▼                       │
│  ┌─────────┐      ┌───────────┐       ┌────────────────┐               │
│  │ 搜索 Agt│      │ 数据分析  │       │  知识库检索    │               │
│  │(Bing搜) │      │ Agt(代码) │       │  Agt(AI Search)│               │
│  └────┬────┘      └─────┬─────┘       └────────┬───────┘               │
│       │                 │                      │                       │
│       └─────────┬────────┘──────────────────────┘                      │
│                 ▼                                                       │
│          ┌─────────────┐                                                │
│          │ 写作 Agent   │                                               │
│          │ (GPT‑4.1)   │                                               │
│          └──────┬───────┘                                               │
│                 ▼                                                       │
│          最终研究报告(Markdown + 数据图表)                              │
└─────────────────────────────────────────────────────────────────────────┘
# enterprise_research_system.py
# 完整的企业研究报告 Agent 系统

import asyncio
import os
from agent_client import get_project_client
from long_term_memory import LongTermMemoryAgent

class EnterpriseResearchSystem:
    """
    企业研究报告系统:
    - 监督者模式(Magentic)编排
    - 并发子 Agent 加速
    - 知识库 + 实时搜索双引擎
    - 长期记忆个性化
    """

    def __init__(self, user_id: str):
        self.project = get_project_client()
        self.user_id = user_id
        self.memory_agent = LongTermMemoryAgent(user_id=user_id)
        self._agents = {}

    async def generate_research_report(
        self,
        topic: str,
        depth: str = "standard",   # brief / standard / comprehensive
        session_id: str = "default",
    ) -> dict:
        """生成完整研究报告"""

        print(f"\n🚀 启动研究任务: {topic} (深度: {depth})")

        # Phase 1:检索用户历史偏好(记忆)
        memories = self.memory_agent.memory.search(
            query=topic,
            user_id=self.user_id,
            limit=3,
        )
        user_context = "\n".join([m["memory"] for m in memories]) if memories else ""

        # Phase 2:并发信息收集
        tasks = [
            self._web_research(topic),           # Bing 实时搜索
            self._knowledge_base_research(topic), # 内部知识库
            self._data_analysis(topic),           # 数据分析
        ]
        web_result, kb_result, data_result = await asyncio.gather(*tasks)
        print("✅ 并发信息收集完成")

        # Phase 3:整合撰写
        writer = self._get_or_create_agent(
            "writer",
            model="gpt-4.1",
            instructions="""你是高级商业分析写手。
将多方研究结果整合为专业报告,要求:
- 执行摘要(3-5 要点)
- 详细分析(分节展开)
- 数据支撑(引用具体数字)
- 风险评估
- 行动建议(优先级排序)
格式:Markdown,可读性强。""",
        )

        writer_thread = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=writer_thread.id,
            role="user",
            content=f"""请生成关于"{topic}"的{depth}级别研究报告。

用户背景:{user_context}

研究材料:
### 最新信息(网络搜索)
{web_result}

### 内部知识库
{kb_result}

### 数据分析
{data_result}"""
        )

        run = self.project.agents.create_and_process_run(
            thread_id=writer_thread.id,
            agent_id=writer.id,
        )
        messages = self.project.agents.list_messages(thread_id=writer_thread.id)
        final_report = messages.get_last_text_message_by_role("assistant")
        report_text = final_report.text.value if final_report else ""

        # Phase 4:更新用户记忆
        self.memory_agent.memory.add(
            messages=[
                {"role": "user", "content": f"研究主题:{topic}"},
                {"role": "assistant", "content": f"已生成报告,主要结论:{report_text[:500]}"},
            ],
            user_id=self.user_id,
        )

        return {
            "topic": topic,
            "user_id": self.user_id,
            "report": report_text,
            "sources": {
                "web": web_result[:500],
                "knowledge_base": kb_result[:500],
                "data_analysis": data_result[:500],
            },
        }

    async def _web_research(self, topic: str) -> str:
        """子任务:实时网络搜索"""
        from azure.ai.projects.models import BingGroundingTool
        bing_conn = self.project.connections.get(
            connection_name=os.environ.get("BING_CONNECTION_NAME", "bing-grounding")
        )
        agent = self._get_or_create_agent(
            "web_researcher",
            model="gpt-5.2",
            instructions="搜索最新信息,提取关键事实和数据。输出结构化 JSON。",
            extra_tools=[BingGroundingTool(connection_id=bing_conn.id)],
        )
        thread = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=thread.id, role="user",
            content=f"搜索关于 '{topic}' 的最新信息和数据(关注 2025-2026)"
        )
        self.project.agents.create_and_process_run(
            thread_id=thread.id, agent_id=agent.id
        )
        messages = self.project.agents.list_messages(thread_id=thread.id)
        resp = messages.get_last_text_message_by_role("assistant")
        return resp.text.value if resp else ""

    async def _knowledge_base_research(self, topic: str) -> str:
        """子任务:内部知识库检索(Azure AI Search)"""
        from azure.ai.projects.models import AzureAISearchTool
        search_conn = self.project.connections.get(
            connection_name=os.environ.get("AI_SEARCH_CONNECTION_NAME", "ai-search")
        )
        agent = self._get_or_create_agent(
            "kb_researcher",
            model="gpt-4.1",
            instructions="从公司内部知识库检索相关政策、流程和案例。",
            extra_tools=[AzureAISearchTool(
                connection_id=search_conn.id,
                index_name=os.environ.get("SEARCH_INDEX_NAME", "enterprise-kb"),
            )],
        )
        thread = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=thread.id, role="user",
            content=f"从内部知识库查找与 '{topic}' 相关的政策、案例和最佳实践"
        )
        self.project.agents.create_and_process_run(
            thread_id=thread.id, agent_id=agent.id
        )
        messages = self.project.agents.list_messages(thread_id=thread.id)
        resp = messages.get_last_text_message_by_role("assistant")
        return resp.text.value if resp else ""

    async def _data_analysis(self, topic: str) -> str:
        """子任务:数据分析(Code Interpreter)"""
        from azure.ai.projects.models import CodeInterpreterTool
        agent = self._get_or_create_agent(
            "data_analyst",
            model="gpt-4.1",
            instructions="执行 Python 数据分析,生成统计摘要和洞察。",
            extra_tools=[CodeInterpreterTool()],
        )
        thread = self.project.agents.create_thread()
        self.project.agents.create_message(
            thread_id=thread.id, role="user",
            content=f"分析与 '{topic}' 相关的数据趋势,生成 Python 统计分析和可视化"
        )
        self.project.agents.create_and_process_run(
            thread_id=thread.id, agent_id=agent.id
        )
        messages = self.project.agents.list_messages(thread_id=thread.id)
        resp = messages.get_last_text_message_by_role("assistant")
        return resp.text.value if resp else ""

    def _get_or_create_agent(
        self,
        name: str,
        model: str,
        instructions: str,
        extra_tools: list = None,
    ):
        if name not in self._agents:
            kwargs = dict(
                model=model,
                name=f"research-{name}",
                instructions=instructions,
            )
            if extra_tools:
                kwargs["tools"] = [t.definitions[0] for t in extra_tools]
                kwargs["tool_resources"] = {}
                for t in extra_tools:
                    if hasattr(t, "resources") and t.resources:
                        kwargs["tool_resources"].update(t.resources)
            self._agents[name] = self.project.agents.create_agent(**kwargs)
        return self._agents[name]


# ===== 使用示例 =====
async def main():
    system = EnterpriseResearchSystem(user_id="analyst-001")

    result = await system.generate_research_report(
        topic="生成式 AI 在制造业中的应用现状与趋势(2026)",
        depth="comprehensive",
        session_id="session-20260301",
    )

    print("\n" + "="*60)
    print("📊 最终研究报告")
    print("="*60)
    print(result["report"])

if __name__ == "__main__":
    asyncio.run(main())

九、可观测性与评估:监控 Agent 行为

9.1 Agent 追踪:理解 Agent 的每一步决策

生产环境中,Agent 的每次工具调用、每个推理步骤都必须可追溯:

# agent_observability.py
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import time, json

tracer = trace.get_tracer("enterprise-agents")

class TracedAgentRunner:
    """带完整 OpenTelemetry 追踪的 Agent 执行器"""

    def __init__(self, project_client):
        self.project = project_client

    def run_with_tracing(
        self,
        agent_id: str,
        thread_id: str,
        user_message: str,
        user_id: str = "anonymous",
    ) -> str:
        """执行 Agent 并记录完整追踪信息"""

        with tracer.start_as_current_span("agent.run") as span:
            span.set_attributes({
                "agent.id": agent_id,
                "thread.id": thread_id,
                "user.id": user_id,
                "input.length": len(user_message),
            })

            start_time = time.time()

            try:
                # 添加用户消息
                self.project.agents.create_message(
                    thread_id=thread_id,
                    role="user",
                    content=user_message,
                )

                # 执行 Agent Run
                run = self.project.agents.create_and_process_run(
                    thread_id=thread_id,
                    agent_id=agent_id,
                )

                elapsed = time.time() - start_time
                span.set_attributes({
                    "run.id": run.id,
                    "run.status": run.status,
                    "run.elapsed_ms": int(elapsed * 1000),
                })

                # 追踪工具调用
                run_steps = self.project.agents.list_run_steps(
                    thread_id=thread_id,
                    run_id=run.id,
                )

                tool_calls_count = 0
                for step in run_steps.data:
                    if step.type == "tool_calls":
                        tool_calls_count += 1
                        with tracer.start_as_current_span("agent.tool_call") as tool_span:
                            details = step.step_details
                            if hasattr(details, "tool_calls"):
                                for tc in (details.tool_calls or []):
                                    tool_span.set_attributes({
                                        "tool.type": tc.type,
                                        "tool.name": getattr(
                                            getattr(tc, "function", None), "name", "unknown"
                                        ),
                                        "step.status": step.status,
                                    })

                span.set_attribute("tool_calls.count", tool_calls_count)

                if run.status == "failed":
                    span.set_status(Status(StatusCode.ERROR, run.last_error.message))
                    raise RuntimeError(f"Agent run failed: {run.last_error}")

                # 获取响应
                messages = self.project.agents.list_messages(thread_id=thread_id)
                latest = messages.get_last_text_message_by_role("assistant")
                response = latest.text.value if latest else ""

                span.set_attribute("output.length", len(response))
                span.set_status(Status(StatusCode.OK))
                return response

            except Exception as e:
                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))
                raise

9.2 Agent 质量评估指标

┌──────────────────────────────────────────────────────────────────┐
│                   Agent 评估指标体系                               │
├─────────────────┬────────────────────────────────────────────────┤
│  指标类别        │  具体指标                                       │
├─────────────────┼────────────────────────────────────────────────┤
│  效率指标        │  • 任务完成率(目标 >95%)                      │
│                 │  • 平均响应时间(P50/P95/P99)                   │
│                 │  • 工具调用次数(越少越好)                      │
│                 │  • Token 消耗量(按任务类型对比)                │
├─────────────────┼────────────────────────────────────────────────┤
│  质量指标        │  • 任务正确率(人工抽样评估)                    │
│                 │  • 幻觉率(事实准确性)                          │
│                 │  • 用户满意度(CSAT/NPS)                       │
│                 │  • 引用准确率(RAG 场景)                        │
├─────────────────┼────────────────────────────────────────────────┤
│  可靠性指标      │  • 错误率(4xx/5xx)                            │
│                 │  • 重试成功率                                    │
│                 │  • 熔断触发频率                                  │
│                 │  • SLA 达成率                                    │
├─────────────────┼────────────────────────────────────────────────┤
│  成本指标        │  • 每次任务 Token 成本                          │
│                 │  • 模型利用率(GPT‑5.2 vs GPT‑4.1-mini 比例)   │
│                 │  • 缓存命中率(目标 >40%)                       │
└─────────────────┴────────────────────────────────────────────────┘
// Azure Monitor KQL:Agent 性能监控仪表板

// 按 Agent 名称统计任务完成率
customEvents
| where timestamp > ago(7d)
| where name == "agent.run"
| extend agent_id = tostring(customDimensions["agent.id"])
| extend status = tostring(customDimensions["run.status"])
| extend elapsed_ms = toint(customDimensions["run.elapsed_ms"])
| summarize
    total = count(),
    completed = countif(status == "completed"),
    failed = countif(status == "failed"),
    avg_latency_ms = avg(elapsed_ms),
    p95_latency_ms = percentile(elapsed_ms, 95)
    by agent_id, bin(timestamp, 1h)
| extend completion_rate = round(100.0 * completed / total, 1)
| project timestamp, agent_id, total, completion_rate, avg_latency_ms, p95_latency_ms
| order by timestamp desc

// 工具调用分布(发现滥用或效率问题)
customEvents
| where timestamp > ago(1d)
| where name == "agent.tool_call"
| extend tool_name = tostring(customDimensions["tool.name"])
| summarize call_count = count() by tool_name
| order by call_count desc
| render piechart

十、安全、合规与人机协同

10.1 Agent 安全威胁模型

┌──────────────────────────────────────────────────────────────┐
│              Agent 安全威胁矩阵                               │
├────────────────────┬────────────────┬────────────────────────┤
│  威胁类型          │  风险等级      │  缓解措施               │
├────────────────────┼────────────────┼────────────────────────┤
│  提示词注入        │  🔴 高         │  输入验证 + 内容安全    │
│  (Prompt Injection)│               │  沙盒执行隔离           │
├────────────────────┼────────────────┼────────────────────────┤
│  工具权限滥用      │  🔴 高         │  最小权限原则           │
│  (Excessive Agency)│               │  工具调用审批           │
├────────────────────┼────────────────┼────────────────────────┤
│  数据外泄          │  🟠 中高       │  输出过滤 + PII 脱敏    │
│  (Data Exfiltration)│              │  网络出口管控           │
├────────────────────┼────────────────┼────────────────────────┤
│  无限循环          │  🟡 中         │  最大步数限制           │
│  (Runaway Loops)   │               │  超时熔断               │
├────────────────────┼────────────────┼────────────────────────┤
│  幻觉高危操作      │  🟠 中高       │  高风险操作人工审批     │
│  (Hallucinated Actions)│           │  操作前二次确认         │
└────────────────────┴────────────────┴────────────────────────┘

10.2 Human-in-the-Loop(人机协同)

对于高风险操作,必须实现人工审批节点:

# human_in_the_loop.py
# 高风险操作的人工审批模式

import os
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
from agent_client import get_project_client

class RiskLevel(Enum):
    LOW = "low"         # 自动执行,仅记录日志
    MEDIUM = "medium"   # 自动执行,发送通知
    HIGH = "high"       # 需要人工审批
    CRITICAL = "critical" # 需要双人审批

@dataclass
class PendingApproval:
    approval_id: str
    agent_id: str
    action: str
    parameters: dict
    risk_level: RiskLevel
    requested_by: str
    description: str
    status: str = "pending"  # pending / approved / rejected


class ApprovalGate:
    """人工审批关卡:高风险操作拦截与审批"""

    # 高风险操作定义(实际生产中配置化管理)
    HIGH_RISK_ACTIONS = {
        "delete_data": RiskLevel.CRITICAL,
        "send_bulk_email": RiskLevel.HIGH,
        "deploy_to_production": RiskLevel.HIGH,
        "create_incident_ticket": RiskLevel.MEDIUM,
        "update_user_permissions": RiskLevel.HIGH,
        "financial_transaction": RiskLevel.CRITICAL,
    }

    def __init__(self, notify_webhook: str = None):
        self.pending: dict[str, PendingApproval] = {}
        self.notify_webhook = notify_webhook

    def wrap_with_approval(self, func: Callable, action_name: str) -> Callable:
        """将函数包装为需要审批的版本"""
        risk = self.HIGH_RISK_ACTIONS.get(action_name, RiskLevel.LOW)

        def wrapped(**kwargs) -> str:
            if risk in (RiskLevel.HIGH, RiskLevel.CRITICAL):
                return self._request_approval(action_name, kwargs, risk)
            elif risk == RiskLevel.MEDIUM:
                self._send_notification(action_name, kwargs)
                return func(**kwargs)
            else:
                return func(**kwargs)

        wrapped.__name__ = func.__name__
        wrapped.__doc__ = func.__doc__
        return wrapped

    def _request_approval(
        self, action: str, params: dict, risk: RiskLevel
    ) -> str:
        """创建审批请求,暂停 Agent 执行等待人工决策"""
        import uuid
        approval_id = str(uuid.uuid4())[:8]

        approval = PendingApproval(
            approval_id=approval_id,
            agent_id="current-agent",
            action=action,
            parameters=params,
            risk_level=risk,
            requested_by="agent",
            description=f"Agent 请求执行高风险操作:{action},参数:{params}",
        )
        self.pending[approval_id] = approval

        # 发送审批通知(Teams/Email/PagerDuty)
        self._send_approval_request(approval)

        return (
            f"⏸️  操作已暂停,等待人工审批。\n"
            f"审批 ID: {approval_id}\n"
            f"风险级别: {risk.value}\n"
            f"请审批员通过审批系统确认或拒绝此操作。"
        )

    def process_approval(
        self, approval_id: str, approved: bool, approver: str
    ) -> str:
        """处理审批决策(由人工审批员调用)"""
        approval = self.pending.get(approval_id)
        if not approval:
            return f"审批 ID {approval_id} 不存在"

        approval.status = "approved" if approved else "rejected"
        action = "✅ 已批准" if approved else "❌ 已拒绝"
        return f"{action} | 操作: {approval.action} | 审批人: {approver}"

    def _send_approval_request(self, approval: PendingApproval):
        """发送审批通知到 Teams"""
        if self.notify_webhook:
            import httpx
            card = {
                "type": "AdaptiveCard",
                "body": [
                    {"type": "TextBlock", "text": f"🚨 {approval.risk_level.value.upper()} 风险操作审批请求"},
                    {"type": "TextBlock", "text": f"操作:{approval.action}"},
                    {"type": "TextBlock", "text": f"审批 ID:{approval.approval_id}"},
                ],
                "actions": [
                    {"type": "Action.Submit", "title": "✅ 批准", "data": {"action": "approve"}},
                    {"type": "Action.Submit", "title": "❌ 拒绝", "data": {"action": "reject"}},
                ],
            }
            try:
                httpx.post(self.notify_webhook, json=card, timeout=5)
            except Exception as e:
                print(f"通知发送失败: {e}")

    def _send_notification(self, action: str, params: dict):
        """中等风险:发送通知但不阻塞执行"""
        print(f"📢 通知:Agent 正在执行 {action}(中等风险)")

10.3 最小权限 RBAC 配置

#!/bin/bash
# agent_rbac_setup.sh - Agent 最小权限 RBAC 配置

SUBSCRIPTION_ID=$(az account show --query id -o tsv)
RG_NAME="rg-ai-agents-prod"
FOUNDRY_NAME="my-foundry-resource"
AGENT_IDENTITY="agent-managed-identity"

# 1. 创建用户分配托管身份
az identity create   --name $AGENT_IDENTITY   --resource-group $RG_NAME

IDENTITY_ID=$(az identity show   --name $AGENT_IDENTITY   --resource-group $RG_NAME   --query principalId -o tsv)

# 2. Azure OpenAI 权限(仅允许推理,禁止管理)
az role assignment create   --assignee $IDENTITY_ID   --role "Cognitive Services OpenAI User"   --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME/providers/Microsoft.CognitiveServices/accounts/$FOUNDRY_NAME"

# 3. AI Search 权限(仅读取索引,禁止写入)
az role assignment create   --assignee $IDENTITY_ID   --role "Search Index Data Reader"   --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME/providers/Microsoft.Search/searchServices/my-search"

# 4. Blob Storage 权限(仅读取指定容器)
az role assignment create   --assignee $IDENTITY_ID   --role "Storage Blob Data Reader"   --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME/providers/Microsoft.Storage/storageAccounts/myagentstorage/blobServices/default/containers/agent-knowledge-base"

# 5. Application Insights(仅写入遥测)
az role assignment create   --assignee $IDENTITY_ID   --role "Monitoring Metrics Publisher"   --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RG_NAME"

echo "✅ Agent RBAC 配置完成"
echo "托管身份 ID: $IDENTITY_ID"

十一、总结与博客 #5 预告

核心收获速查卡

┌─────────────────────────────────────────────────────────────────┐
│              博客 #4 核心决策速查                                │
├───────────────────────────────────────────────────────────────┤
│  场景                    推荐方案                              │
├───────────────────────────────────────────────────────────────┤
│  简单 Q&A / 文档搜索    → 直接 RAG(博客 #3),无需 Agent      │
│  工具调用 + 数据分析    → 单体 Agent(章节 4)                 │
│  多步骤文档处理流水线   → Sequential 编排(章节 5.2)          │
│  独立子任务并行执行     → Concurrent 编排(章节 5.2)          │
│  智能客服分流           → Handoff 编排(章节 5.2)             │
│  多专家协同决策         → Group Chat 编排(章节 5.2)          │
│  复杂目标自主规划       → Magentic 监督者(章节 5.2)          │
│  跨组织 Agent 协作      → A2A 协议(章节 6)                  │
│  第三方工具集成         → MCP 服务(章节 6)                   │
│  个性化跨会话记忆       → Mem0 + Azure AI Search(章节 7)     │
│  高风险操作安全保障     → Human-in-the-Loop(章节 10)         │
└─────────────────────────────────────────────────────────────────┘

本文完整代码索引

文件 功能 核心 API
agent_client.py 统一客户端工厂 AIProjectClient
single_agent_demo.py 单体 Agent + 工具集成 create_agent / run_stream
code_interpreter_advanced.py 代码解释器 + 文件上传 CodeInterpreterTool
sequential_workflow.py 顺序编排三阶段流水线 多线程顺序执行
concurrent_workflow.py 并发编排尽职调查 asyncio.gather
handoff_workflow.py 交接编排客服路由 意图分类 + 路由
group_chat_workflow.py 群聊编排专家圆桌 多轮对话历史
magentic_workflow.py 监督者编排动态分配 任务计划 JSON
a2a_integration.py A2A 协议跨系统调用 A2A Task API
mcp_integration.py MCP 工具服务集成 McpTool
long_term_memory.py Mem0 长期记忆 MemoryClient
enterprise_research_system.py 完整企业研究系统 综合所有模式
agent_observability.py OTel 追踪 OpenTelemetry
human_in_the_loop.py 人工审批关卡 ApprovalGate
agent_rbac_setup.sh 最小权限 RBAC Azure CLI

系列文章进度

篇章 标题 状态
博客 #1 Azure AI Foundry 平台全景与快速上手 ✅ 完成
博客 #2 Azure OpenAI Service 深度实践 — GPT‑5.2 企业级部署 ✅ 完成
博客 #3 Azure AI Search + RAG 架构实战 ✅ 完成
博客 #4 AI Agents 开发实战:单体→多 Agent 协作 本文
博客 #5 Azure AI Foundry 可观测性:全栈监控与评估 🔜 下一篇
博客 #6 企业 AI 治理:安全、合规与 MLOps 🔜 规划中

博客 #5 预告:Azure AI Foundry 全栈可观测性

📋 博客 #5 内容预告

  一、可观测性三支柱:Metrics / Traces / Logs
      • Azure Monitor + Application Insights 深度集成
      • OpenTelemetry 自动埋点 vs 手动埋点

  二、LLM 专属监控:Token 成本 & 延迟分析
      • Prompt Token vs Completion Token 成本归因
      • P50/P95/P99 延迟热力图

  三、Agent 行为追踪
      • 工具调用链路可视化
      • 多 Agent 协作追踪(跨服务 Trace)

  四、RAG 检索质量监控
      • 检索命中率、相关性分数趋势
      • Groundedness 评分自动化

  五、告警与 SLO
      • 自定义 Azure Monitor 告警规则
      • SLO 仪表板(Grafana 集成)

  六、成本归因与优化闭环
      • 按团队/项目/环境分摊 AI 成本
      • 自动化降本建议

参考资料

  1. Microsoft Agent Framework 介绍:https://devblogs.microsoft.com/foundry/introducing-microsoft-agent-framework-the-open-source-engine-for-agentic-ai-apps/
  2. MAF 官方文档:https://learn.microsoft.com/en-us/agent-framework/overview/
  3. Azure AI Agent Orchestration Patterns:https://learn.microsoft.com/en-us/azure/architecture/ai-ml/guide/ai-agent-design-patterns
  4. Foundry Agent Service Ignite 2025 发布:https://techcommunity.microsoft.com/blog/azure-ai-foundry-blog/foundry-agent-service-at-ignite-2025-simple-to-build-powerful-to-deploy-trusted-/4469788
  5. azure-ai-projects 2.0.0b3 PyPI:https://pypi.org/project/azure-ai-projects/2.0.0b3/
  6. A2A 协议发布(Google + Microsoft):https://www.microsoft.com/en-us/microsoft-cloud/blog/2025/05/07/empowering-multi-agent-apps-with-the-open-agent2agent-a2a-protocol/
  7. Agent Factory: MCP & A2A 开放标准:https://azure.microsoft.com/en-us/blog/agent-factory-connecting-agents-apps-and-data-with-new-open-standards-like-mcp-and-a2a/
  8. Foundry Agent 持久化记忆:https://devblogs.microsoft.com/foundry/introducing-memory-in-foundry-agent-service/
  9. Computer Use 工具(预览):https://learn.microsoft.com/en-us/azure/foundry/agents/how-to/tools/computer-use
  10. Semantic Kernel 多 Agent 编排:https://devblogs.microsoft.com/semantic-kernel/semantic-kernel-multi-agent-orchestration/
  11. Mem0 长期记忆 + Azure AI Foundry:https://medium.com/microsoftazure/implement-long-term-memory-in-your-ai-agents-with-mem0-azure-ai-foundry-and-ai-search-56efd8683c03
  12. Agent 编排模式演示仓库:https://github.com/leestott/agent_patterns_foundry_demo
  13. GPT-RAG 参考架构:https://github.com/Azure/GPT-RAG
  14. 企业 AI 安全构建指南:https://learn.microsoft.com/en-us/azure/cloud-adoption-framework/ai-agents/build-secure-process
  15. Azure AI Agents Python SDK:https://pypi.org/project/azure-ai-agents/

Azure AI 全栈实践系列 | 博客 #4 完
下一篇:博客 #5 — Azure AI Foundry 可观测性:从 Token 追踪到 SLO 告警

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐