前言

随着云原生技术、微服务架构和容器编排体系的普及,IT系统的复杂度持续攀升。传统运维观测手段难以应对高维度数据、复杂链路与快速迭代带来的挑战,诊断效率与交互体验均面临瓶颈。云舟观测智能体系统( GC Agent System)由此推出。系统以自然语言交互为入口,通过统一编排层与标准化工具体系,实现对观测数据的智能分析、故障辅助诊断与知识检索,为企业构建新一代的自动化运维能力。

本文聚焦系统架构设计、核心模块原理与工程实现,为大家呈现一个完整可靠、可扩展、企业级的监控观测智能体系统技术蓝图。


一、总体架构设计

1.1 架构理念

系统采用 模块化、松耦合、可扩展 的架构体系。上层业务逻辑、中层编排调度与下层执行工具使用标准接口隔离,使系统既能保持稳定性,也能在未来轻松扩展至更多模型、更多场景和更多观测工具。

1.2 分层架构

系统整体分为 用户/网关层、编排层、Agent工具与能力层、协议层、持久层等五个层次,各层职责清晰、边界明确。

后端核心技术组件主要有以下:

选型以 性能优先、生态成熟、支持异步、运维友好 为原则,核心能力基于langgraph/langchain构建。

1.3 系统任务流程

示例:

一个典型的用户交互请求(如“帮我诊断下主机故障”)在系统中的流转过程如下:

  1. 请求接入与极速响应:用户发起请求后,API 层立即生成会话 ID,并在进入任何耗时操作之前,通过 SSE 推送首包事件。这种“首包零延迟”机制能够显著减少用户等待时间。

  2. 纵深防御鉴权:系统基于 Cookie 获取用户态信息,并将其编码到 x-ai Header 中。该凭证会伴随整个请求链路,最终传递到最底层的 MCP 工具,从而实现数据级的权限隔离。

  3. 智能决策编排:

    • 当请求来自明确的指令(例如前端按钮触发),系统会直接进入路由专家模式。

    • 当请求为自然语言输入时,系统进入自由探索模式。意图识别组件会对请求进行七类分类(如问候、知识查询、故障排查等),并据此选择最合适的执行策略。

  4. 动态代理构建:系统根据任务类型,通过 MCPLoader 动态构建执行代理,加载所需的 MCP 工具,并配置相应提示模板,使代理具备完成任务的能力。

  5. 推理与执行循环:代理按照“推理—调用工具—获取结果”的工作方式进行循环。在执行过程中,系统会实时记录关键中间状态并通过 SSE 反馈给前端,以便用户了解任务的执行进度。

  6. 结果生成与记忆:当任务执行完成后,系统会将最终结果写入 Redis 的会话历史。同时,后台会异步触发标题生成流程,为该轮会话生成简洁、可索引的主题描述,形成完整的交互闭环。

二、核心组件深度解析

2.1 GCExecutor - 核心引擎

GCExecutor是整个系统的核心编排器,负责协调Agent生态系统的运作。它采用了创新的双模式执行策略,能够根据任务特性自动选择最优的执行路径。

2.1.1 路由专家模式(Route Mode)

路由专家模式是为专业任务设计的快速执行通道。当用户明确知道需要哪个专业Agent时,系统会直接路由到对应的Agent,跳过意图识别环节,大幅提升响应速度。适用于需求明确、流程固定的场景。

核心特性:

  • 直接路由:通过特定参数指定目标Agent

  • 工具精简:只加载Agent配置中定义的5-10个必要工具

  • 响应可预测:执行路径固定,便于调试和优化

  • 性能优先:减少LLM调用次数,降低延迟

目前适用场景:

  • 主机诊断:直接调用主机诊断专家。

  • 知识查询:直接调用知识库专家。

  • 定向任务:用户明确需要特定功能(待开放)

2.1.2 AI自由探索模式(Explore Mode)

探索模式是系统的智能化体现,适用于复杂、开放性的问题。系统会加载所有可用工具,让AI自主分析问题并选择最优工具组合。

核心特性:

  • 全量工具:加载25+可用工具

  • 智能选择:AI根据上下文自主决策

  • 多轮迭代:支持复杂的多步骤任务

  • 动态调整:根据执行结果调整策略

适用场景:

  • 复杂故障诊断

  • 多维度数据分析

  • 探索性问题解决

  • 跨领域综合任务

为保障多租户隔离,每次执行均会重新构建执行环境,注入用户级鉴权信息。

2.1.3 意图识别系统

意图识别是探索模式的第一道关卡,通过多级过滤和智能分析,精准理解用户意图。

五级识别流程:

  1. 输入净化:移除特殊字符,防止注入攻击

  2. 可疑检测:识别恶意输入模式

  3. 相关性判断:检查是否属于平台业务范围

  4. 历史融合:结合对话历史理解上下文

  5. LLM分析:深度语义理解,返回7种意图类型

意图类型体系:

  • 无效/恶意输入

  • 知识查询

  • 通用任务

  • 问候类输入

  • 身份信息询问

  • 上下文相关短问

  • 可疑/异常输入(防注入)

2.1.4 编排核心逻辑

编排器的核心在于协调各个组件的协同工作,确保请求的高效处理。

执行流程:

  1. 请求初始化:生成session_id和response_id

  2. 模式判断:路由模式 vs 探索模式

  3. Agent创建:动态创建Agent实例(不使用缓存)

  4. 工具加载:根据模式加载相应工具集

  5. 执行监控:实时跟踪执行状态

  6. 结果聚合:整合多个工具的返回结果

关键实现代码:

class GCExecutor:
    """GC Agent 执行器"""

    def __init__(self, redis_url: str = None):
        """初始化执行器"""
        # 初始化LLM
        self.llm = GCLLMManager.get_llm()
        
        # 获取MCP加载器
        self.mcp_loader = get_mcp_loader()
        
        # Agent缓存(注意:实际不使用缓存)
        self.agents = {}
        
        # 初始化Redis存储
        self.redis_store = GCRedisStore(redis_url=redis_url)
        
        # 初始化反馈服务
        self.feedback_service = GCFeedbackService(redis_url=redis_url)

    async def execute(self, request: GCRequest, start_sequence: int = 0) -> AsyncIterator[str]:
        """执行请求 - 生成SSE流"""
        
        # 保存headers到实例,供后续工具创建使用
        self.current_request_headers = request.headers
        
        # 确定执行模式
        if request.direct_agent:
            # 直接路由模式 - 使用指定的Agent
            agent_id = request.direct_agent
            async for event in self._execute_with_agent(
                    agent_id,
                    request.message,
                    session_id,
                    response_id,
                    sequence
            ):
                yield event
        else:
            # AI自由探索模式
            async for event in self._execute_explore_mode(
                    request.message,
                    session_id,
                    response_id,
                    sequence
            ):
                yield event

意图识别的完整实现:

async def _analyze_intent(self, query: str, session_id: str) -> Dict[str, Any]:
    """增强的意图识别方法(带防护机制)"""
    
    # 步骤1:输入净化
    cleaned_query = self._sanitize_input(query)
    
    # 步骤2:可疑输入检测
    if self._is_suspicious_input(cleaned_query):
        return {
            "intent_type": "invalid",
            "intent_name": "可疑输入",
            "confidence": 1.0,
            "direct_response": "抱歉,您的输入包含不支持的内容..."
        }
    
    # 步骤3:主题相关性检测
    if not self._is_observation_related(cleaned_query):
        pass  # 继续LLM判断
    
    # 步骤4:获取历史对话
    history_messages = await self.redis_store.get_conversation_history(
        session_id=session_id,
        limit=20
    )
    
    # 步骤5:LLM意图分析
    response = await self.llm.ainvoke([
        SystemMessage(content="你是一个意图识别专家..."),
        HumanMessage(content=prompt)
    ])

小结:GCExecutor通过双模式执行策略,既保证了专业任务的高效执行,又提供了复杂问题的智能探索能力。意图识别系统的多级过滤机制确保了系统的安全性和准确性。

2.2 MCP工具加载器和Header透传

MCP(Model Context Protocol)是系统与外部工具交互的统一协议,Headers透传机制确保了认证信息在整个调用链路中的完整传递。

async def _create_tools_for_agent(
    self, 
    agent_config: Dict[str, Any], 
    explore_mode: bool = False,
    request_headers: Optional[Dict[str, str]] = None
) -> List[Tool]:
    """为Agent创建工具"""
    
    # 合并headers,确保所有值都是字符串
    final_headers = {}
    
    # 处理配置中的headers
    if mcp_server.get("headers"):
        for key, value in mcp_server["headers"].items():
            final_headers[key] = str(value)
    
    # 处理请求中的headers(包含x-ai认证信息)
    if request_headers:
        for key, value in request_headers.items():
            final_headers[key] = str(value)
    
    # 创建MCP客户端配置
    server_config = {
        "server": {
            "url": url,
            "transport": "streamable_http",
            "headers": final_headers  # Headers透传到MCP服务
        }
    }
    
    # 每次都创建新客户端,不缓存
    client = MultiServerMCPClient(server_config)
    
    # 根据模式过滤工具
    if not explore_mode and defined_tools:
        # 路由模式:只返回定义的工具
        return [t for t in all_tools if t.name in defined_tools]
    else:
        # 探索模式:返回所有工具
        return all_tools

小结:MCP协议的引入统一了工具调用接口,Headers透传机制保证了认证链路的完整性,这是系统安全性的重要保障。

三、技术创新亮点

3.1 双模式执行策略

class ExecutionModeStrategy:
    """
    创新的双模式执行策略
    """
    
    async def execute_explore_mode(self, query: str):
        """
        探索模式 - 适用于复杂、开放性问题
        
        特点:
        1. 加载所有可用工具(20+)
        2. AI自主选择最优工具组合
        3. 支持多轮工具调用
        4. 动态调整执行策略
        """
        # 加载完整工具集
        all_tools = await self.load_all_tools()
        
        # 创建超级Agent
        super_agent = create_react_agent(
            llm=self.llm,
            tools=all_tools,
            prompt=self.exploration_prompt
        )
        
        # 智能执行
        return await super_agent.astream(query)
    
    async def execute_route_mode(self, query: str, agent_id: str):
        """
        路由模式 - 适用于明确、专业的任务
        
        特点:
        1. 直接路由到专业Agent
        2. 只加载必要工具(5-10个)
        3. 执行速度快
        4. 响应可预测
        """
        # 加载指定Agent
        agent = await self.load_agent(agent_id)
        
        # 快速执行
        return await agent.astream(query)

3.2 流式响应系统

class StreamingResponseSystem:
    """
    SSE流式响应系统
    """
    
    async def generate_sse_stream(self, request: GCRequest):
        """
        生成SSE事件流
        
        技术特点:
        1. 首包延迟 < 100ms
        2. 思维链实时可视化
        3. 内容渐进式输出
        4. 错误优雅降级
        """
        
        # 立即发送首个事件
        yield self._format_sse({
            "type": "session_response_id",
            "message": response_id,
            "timestamp": datetime.now().isoformat()
        })
        
        # 实时发送思考步骤
        async for step in self.executor.think():
            yield self._format_sse({
                "type": "thinking_step",
                "message": step.description,
                "step": step.type
            })
        
        # 流式输出内容
        async for token in self.executor.generate():
            yield self._format_sse({
                "type": "token",
                "message": token
            })

3.3 智能工具调用格式化

def _get_tool_description(self, tool_name: str, tool_args: Dict) -> str:
    """
    智能工具描述生成
    
    创新点:
    1. 根据工具类型生成友好描述
    2. 参数智能提取和展示
    3. 执行意图清晰表达
    """
    
    # 工具语义映射
    semantic_map = {
        "getSystemSpace": "获取系统空间信息",
        "queryHostCoreMetrics": "查询主机核心指标",
        "queryHostProcData": "分析进程运行状态",
        "getHostErrorLogs": "检索系统异常日志"
    }
    
    # 智能描述生成
    base_desc = semantic_map.get(tool_name, f"执行{tool_name}")
    
    # 参数增强
    if "hosts" in tool_args:
        base_desc += f" - 目标: {tool_args['hosts']}"
    if "metrics" in tool_args:
        base_desc += f" - 指标: {tool_args['metrics']}"
    
    return base_desc

3.4 企业级会话管理与用户隔离

class SessionManagement:
    """
    企业级会话管理系统
    """
    
    async def manage_session(self, session_id: str, user_id: str):
        """
        会话管理核心
        
        技术特点:
        1. Redis持久化 - 分布式会话存储
        2. 用户隔离 - 基于Cookie的认证
        3. 上下文保持 - 历史消息关联
        4. 标题生成 - AI智能摘要
        """
        
        # 会话创建/获取
        session = await self.redis_store.get_or_create_session(
            session_id=session_id,
            user_id=user_id
        )
        
        # 历史上下文构建
        history = await self.redis_store.get_conversation_history(
            session_id=session_id,
            limit=20  # 保持最近20条消息
        )
        
        # 智能标题生成
        if not session.title:
            title = await self._generate_session_title(
                user_question=query,
                ai_response=response
            )
            await self.redis_store.update_session_title(session_id, title)

小结:通过双模式执行、流式响应、智能工具格式化等创新技术,系统实现了效率与智能的完美平衡。

四、上下文/记忆管理

记忆是 Agent 能够进行连续对话的基础。本系统基于 Redis 构建了一套层次化的分布式记忆系统,实现了会话管理、历史记忆和用户隔离等核心功能。

class GCRedisStore:
    """GC Agent的Redis存储实现"""
    
    def __init__(self, redis_url: str = None):
        """初始化Redis连接"""
        if redis_url:
            self.redis = redis.from_url(redis_url)
        else:
            # 从配置文件读取
            config = self._load_config()
            redis_config = config.get('redis', {})
            self.redis = redis.Redis(
                host=redis_config.get('host', 'localhost'),
                port=redis_config.get('port', 6379),
                db=redis_config.get('db', 0),
                decode_responses=True
            )
    
    async def get_or_create_session(self, session_id: str, user_id: str = "default") -> SessionContext:
        """获取或创建会话上下文"""
        key = f"session:{session_id}"
        context_data = await self.redis.hgetall(key)
        
        if not context_data:
            # 创建新会话
            context = SessionContext(
                session_id=session_id,
                user_id=user_id,
                created_at=datetime.now(),
                updated_at=datetime.now()
            )
            await self.redis.hset(key, mapping=context.dict())
        else:
            context = SessionContext(**context_data)
        
        return context
    
    async def add_user_message(self, session_id: str, content: str):
        """添加用户消息到历史"""
        message = {
            "role": "user",
            "content": content,
            "timestamp": datetime.now().isoformat()
        }
        
        key = f"messages:{session_id}"
        await self.redis.rpush(key, json.dumps(message))
        
        # 更新会话时间
        await self._update_session_time(session_id)

小结:Redis存储层通过精心设计的Key结构和过期策略,实现了高效的会话管理和历史记忆功能。同时,为了提升历史会话的可读性,系统会在首轮对话结束后,异步触发一个轻量级 LLM 任务。该任务会根据用户的提问和 AI 的回答,提炼出精准的会话摘要(15 字以内),并更新到 Redis 中。这一过程对用户完全透明,不阻塞主交互流程。

五、认证机制

系统采用创新的三层鉴权架构,形成完整的安全闭环,保证数据安全可靠。

5.1 三层鉴权体系

第一层:网关层权限校验

拦截所有流量,通过与企业鉴权中心的交互,验证 Cookie 的合法性并解析出用户身份。这一层阻挡了所有未授权的外部访问。

  • Cookie交换:解析用户Cookie获取智汇云用户态信息

  • 用户识别:从Cookie中提取当前用户的各类信息

第二层:MCP Server鉴权

在 API 业务逻辑中,系统强制校验 Session ID 与 User ID 的绑定关系。即使用户猜到了其他人的 Session ID,也无法查看或操作对应的会话内容,确保了租户间的绝对隔离。

  • 服务认证:MCP Server自身的访问控制

  • 未来扩展:支持MCP Server市场的统一鉴权链接

  • 服务隔离:不同MCP Server间的权限隔离

第三层:数据权限校验

这是最具创新性的一层。系统将用户身份信息编码为 `x-ai` Header,并通过 MCP 协议透传至最底层的工具服务。这意味着,权限控制下沉到了数据源头——LLM 无法"看到"用户无权访问的主机或日志,从根本上解决了 AI 系统常见的数据泄露风险。

  • Headers透传:将用户信息透传到每个工具

  • 细粒度控制:每个MCP Server的Tool做各自的数据权限验证

  • 动态授权:基于用户角色的动态权限分配

5.2 认证服务实现

class AuthService:
    """统一的认证服务"""
    
    async def get_user_info(self, request: Request) -> Tuple[str, str]:
        """从请求中获取用户信息"""
        
        # 尝试从Cookie获取
        cookie_value = request.cookies.get("sessionid") or request.cookies.get("JSESSIONID")
        
        if cookie_value:
            try:
                # 解析Cookie获取用户信息
                user_info = self._parse_cookie(cookie_value)
                user_name = user_info.get("userName", "default")
                return json.dumps(user_info), user_name
            except Exception as e:
                logger.warning(f"Cookie解析失败: {e}")
        
        # 降级到默认用户
        default_user = {
            "userName": "default",
            "userId": "default",
            "timestamp": datetime.now().isoformat()
        }
        return json.dumps(default_user), "default"
    
    def get_x_ai_header(self, user_info: str) -> str:
        """生成X-AI header"""
        # Base64编码用户信息
        return base64.b64encode(user_info.encode()).decode()
    
    async def verify_session_ownership(
        self, 
        session_id: str, 
        user_name: str,
        redis_store: GCRedisStore
    ) -> bool:
        """验证会话所有权"""
        session_context = await redis_store.get_context(session_id)
        
        if not session_context:
            return False
            
        return session_context.user_id == user_name

小结:三层鉴权机制从网关、服务、数据三个维度构建了完整的安全体系,确保了系统的安全性和可靠性。

六、系统可观测性与可进化能力

6.1 Trace数据上报

系统通过Traceloop统一上报采集数据到LLM应用监测模块,实现全链路追踪。

监控维度:

  • LLM调用监控:Token使用量、响应时间、错误率

  • 工具执行追踪:每个工具的执行时间和成功率

  • 会话分析:用户行为模式、热点问题识别

  • 性能指标:P99延迟、QPS、并发数

实现代码:

from traceloop.sdk import Traceloop
from traceloop.sdk.tracing.tracing import Instruments

# 先禁用 MCP 自动初始化
Traceloop.init(
    app_name="gc-agent",
    resource_attributes={
        "service.name": "gc-agent",
        "service.version": "v1.0",
        "service.xHawkUserSpaceId": "xxx"
    },
    api_endpoint="http://traces.xxxx.lycc.qihoo.net:55683",
    block_instruments={Instruments.MCP},
    disable_batch=True
)

# 手动初始化 MCP instrumentor
mcp_instrumentor = None
try:
    from opentelemetry.instrumentation.mcp import McpInstrumentor
    mcp_instrumentor = McpInstrumentor()
        # 验证初始化状态
    logger.info(f"MCP instrumentor is instrumented: {mcp_instrumentor.is_instrumented_by_opentelemetry}")

except Exception as e:
    logger.error(f"Failed to initialize MCP instrumentor: {e}")
    mcp_instrumentor = None
# 添加src路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))

6.2 反馈服务系统

反馈服务主要用于收集用户对AI响应的评价(点赞/点踩),为系统后期优化提供数据支持。

  • 评价收集:支持用户对每个响应进行评价

  • 数据持久化:将反馈数据存储在Redis中

  • 统计分析:支持按会话、时间等维度统计

6.3 实现代码

class GCFeedbackService:
    """反馈服务"""
    
    def __init__(self, redis_url: str = None):
        """初始化反馈服务"""
        if redis_url:
            self.redis = redis.from_url(redis_url)
        else:
            # 从配置文件读取
            config = self._load_config()
            redis_config = config.get('redis', {})
            self.redis = redis.Redis(
                host=redis_config.get('host', 'localhost'),
                port=redis_config.get('port', 6379),
                db=redis_config.get('feedback_db', 1),  # 使用独立的DB
                decode_responses=True
            )
    
    async def save_response(
        self,
        response_id: str,
        session_id: str,
        user_question: str,
        agent_response: str
    ) -> Feedback:
        """保存响应记录"""
        feedback = Feedback(
            response_id=response_id,
            session_id=session_id,
            user_question=user_question,
            agent_response=agent_response,
            created_at=datetime.now(),
            updated_at=datetime.now()
        )
        
        # 保存到Redis
        key = f"feedback:{response_id}"
        await self.redis.hset(key, mapping=feedback.dict())
        
        # 添加到会话索引
        session_key = f"session_feedbacks:{session_id}"
        await self.redis.sadd(session_key, response_id)
        
        return feedback
    
    async def update_rating(
        self,
        response_id: str,
        rating: Optional[str],
        session_id: Optional[str] = None
    ) -> bool:
        """更新反馈评分"""
        key = f"feedback:{response_id}"
        
        # 检查反馈是否存在
        exists = await self.redis.exists(key)
        if not exists:
            logger.warning(f"反馈记录不存在: {response_id}")
            return False
        
        # 更新评分
        updates = {
            "rating": rating or "null",
            "updated_at": datetime.now().isoformat()
        }
        
        await self.redis.hset(key, mapping=updates)
        logger.info(f"反馈评分已更新: {response_id} -> {rating}")
        
        return True

小结:反馈服务通过简单有效的设计,为系统的持续优化提供了重要的数据基础。


结语

云舟观测智能体系统通过分层架构设计、标准化工具体系和可扩展的智能编排能力,为企业提供了结构清晰、可审计、易集成的智能化监控观测能力。在当前架构基础上,系统将围绕以下方向持续演进:

技术演进方向:

  • Agent 市场化:支持第三方 Agent 接入,构建内部能力生态。

  • 主动式运维:从被动式操作协助迈向预测式、预防式运维。

  • 多模态支持:实现图表、日志、指标等多类型观测数据的统一分析。

产品化路线:

  • 私有化部署:支持与智汇云体系的集成与打包部署,满足企业级落地需求。

  • 插件化架构:允许用户按需扩展 MCP 工具与流程,实现业务级定制能力。

  • 生态化发展:与更多观测工具与运维平台深度集成,构建全链路协同生态。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐