05-多服务器架构与最佳实践

概述

在前四篇文章中,我们分别介绍了 MCP 协议基础、服务器开发、客户端开发和 LLM 集成。本文将深入探讨多服务器架构的设计与实现,以及构建生产级 MCP 应用的最佳实践。

多服务器架构设计

为什么需要多服务器

在实际应用中,单一 MCP 服务器往往难以满足复杂需求。多服务器架构可以带来以下优势:

  • 功能分离: 不同服务器负责不同领域(数学、BMI、数据库等)
  • 团队协作: 多个团队独立开发和维护各自的服务器
  • 扩展性: 根据需求动态添加或移除服务器
  • 容错性: 单个服务器故障不影响其他功能
  • 负载均衡: 分发请求到多个实例

架构模式

1. 网关模式
客户端
  ↓
网关层 (Gateway)
  ├─→ 数学服务器 (Stdio)
  ├─→ BMI 服务器 (HTTP)
  └─→ 数据库服务器 (HTTP)
2. 总线模式
客户端 → 消息总线 → 多个服务器
3. 直接连接模式
客户端
  ├─→ 数学服务器
  ├─→ BMI 服务器
  └─→ 其他服务器

MultiServerMCPClient 实现

基础配置

from langchain_mcp_adapters.client import MultiServerMCPClient

client = MultiServerMCPClient({
    "math": {
        "command": "python",
        "args": ["math_mcp_server_stdio.py"],
        "transport": "stdio",
    },
    "bmi": {
        "url": "http://localhost:8000/mcp",
        "transport": "streamable_http",
    }
})

会话管理

async with client.session("math") as math_session, \
             client.session("bmi") as bmi_session:

    # 获取数学服务器的工具
    math_tools = await client.get_tools(server_name="math")

    # 获取 BMI 服务器的工具
    bmi_tools = await client.get_tools(server_name="bmi")

    # 合并所有工具
    all_tools = math_tools + bmi_tools

完整实现示例

import os
import json
from typing import List
from typing_extensions import TypedDict
from typing import Annotated
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.prebuilt import tools_condition, ToolNode
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_mcp_adapters.client import MultiServerMCPClient
import asyncio
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 配置多个服务器
client = MultiServerMCPClient({
    "math": {
        "command": "python",
        "args": ["math_mcp_server_stdio.py"],
        "transport": "stdio",
    },
    "bmi": {
        "url": "http://localhost:8000/mcp",
        "transport": "streamable_http",
    }
})

状态定义

class State(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]

创建图

async def create_graph():
    # 创建 LLM
    api_key = os.getenv("DEEPSEEK_API_KEY")
    if not api_key:
        raise ValueError("DEEPSEEK_API_KEY not found in environment variables")

    llm = ChatOpenAI(
        model="deepseek-chat",
        temperature=0,
        api_key=api_key,
        base_url="https://api.deepseek.com"
    )

    # 获取所有服务器的工具
    tools = await client.get_tools()
    llm_with_tool = llm.bind_tools(tools)

    # 获取系统提示
    system_prompt = await client.get_prompt(
        server_name="math",
        prompt_name="system_prompt"
    )

    prompt_template = ChatPromptTemplate.from_messages([
        ("system", system_prompt[0].content),
        MessagesPlaceholder("messages")
    ])
    chat_llm = prompt_template | llm_with_tool

    # Chat Node
    def chat_node(state: State) -> State:
        messages = state["messages"]

        # 智能过滤消息
        filtered_messages = filter_messages(messages)
        state["messages"] = chat_llm.invoke({"messages": filtered_messages})
        return state

    # 构建图
    graph_builder = StateGraph(State)
    graph_builder.add_node("chat_node", chat_node)
    graph_builder.add_node("tool_node", ToolNode(tools=tools))
    graph_builder.add_edge(START, "chat_node")
    graph_builder.add_conditional_edges(
        "chat_node",
        tools_condition,
        {"tools": "tool_node", "__end__": END}
    )
    graph_builder.add_edge("tool_node", "chat_node")
    graph = graph_builder.compile(checkpointer=MemorySaver())
    return graph

消息过滤

def filter_messages(messages):
    """智能过滤消息"""
    filtered = []
    i = 0

    while i < len(messages):
        msg = messages[i]

        if isinstance(msg, HumanMessage):
            filtered.append(msg)
            i += 1
        elif isinstance(msg, AIMessage):
            filtered.append(msg)
            if hasattr(msg, 'tool_calls') and msg.tool_calls:
                tool_call_ids = {tc['id'] for tc in msg.tool_calls}
                j = i + 1
                while j < len(messages) and isinstance(messages[j], ToolMessage):
                    tool_msg = messages[j]
                    if hasattr(tool_msg, 'tool_call_id') and tool_msg.tool_call_id in tool_call_ids:
                        # 确保 ToolMessage content 是字符串
                        if not isinstance(tool_msg.content, str):
                            if hasattr(tool_msg.content, '__dict__'):
                                tool_msg.content = json.dumps(tool_msg.content.__dict__, ensure_ascii=False)
                            else:
                                tool_msg.content = str(tool_msg.content)
                        filtered.append(tool_msg)
                        j += 1
                    else:
                        break
                i = j
            else:
                i += 1
        elif isinstance(msg, ToolMessage):
            i += 1
        else:
            i += 1

    return filtered

主函数

async def main():
    config = {"configurable": {"thread_id": 1234}}
    agent = await create_graph()

    print("=" * 50)
    print("Multi-Server LangGraph Agent Ready!")
    print("=" * 50)
    print("\n可用服务器:")
    print("  - math: 数学计算服务器")
    print("  - bmi: BMI计算服务器")
    print("\n开始对话...\n")

    while True:
        message = input("User: ")
        if message.lower() in ['quit', 'exit', 'q']:
            print("\n正在退出程序...")
            break

        response = await agent.ainvoke({"messages": message}, config=config)
        print(f"AI: {response['messages'][-1].content}\n")

if __name__ == "__main__":
    asyncio.run(main())

服务器部署策略

1. 单机部署

所有服务器运行在同一台机器上:

# 终端 1: 数学服务器
python math_mcp_server_stdio.py

# 终端 2: BMI 服务器
python bmi_mcp_server.py

# 终端 3: 客户端
python bmi_client.py

2. 分布式部署

不同服务器运行在不同机器上:

# 配置远程服务器
client = MultiServerMCPClient({
    "math": {
        "url": "http://10.0.0.1:8000/mcp",
        "transport": "streamable_http",
    },
    "bmi": {
        "url": "http://10.0.0.2:8000/mcp",
        "transport": "streamable_http",
    },
    "database": {
        "url": "http://10.0.0.3:8000/mcp",
        "transport": "streamable_http",
    }
})

3. 容器化部署

使用 Docker 部署服务器:

# Dockerfile for Math Server
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "math_mcp_server_stdio.py"]
# docker-compose.yml
version: '3.8'

services:
  math-server:
    build: .
    command: python math_mcp_server_stdio.py
    ports:
      - "8001:8000"

  bmi-server:
    build: .
    command: python bmi_mcp_server.py
    ports:
      - "8002:8000"

  client:
    build: .
    command: python bmi_client.py
    depends_on:
      - math-server
      - bmi-server

4. 负载均衡

使用 Nginx 实现负载均衡:

upstream mcp_servers {
    server 10.0.0.1:8000;
    server 10.0.0.2:8000;
    server 10.0.0.3:8000;
}

server {
    listen 80;
    server_name mcp.example.com;

    location /mcp {
        proxy_pass http://mcp_servers;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

监控和日志

1. 服务器健康检查

import aiohttp

async def check_server_health(url: str) -> bool:
    """检查服务器健康状态"""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{url}/health", timeout=5) as response:
                return response.status == 200
    except:
        return False

async def monitor_servers(servers: dict):
    """监控所有服务器"""
    while True:
        for name, config in servers.items():
            url = config.get("url")
            if url:
                is_healthy = await check_server_health(url)
                status = "UP" if is_healthy else "DOWN"
                print(f"Server {name}: {status}")
        await asyncio.sleep(30)

2. 性能指标收集

import time
from collections import defaultdict

class MetricsCollector:
    def __init__(self):
        self.metrics = defaultdict(list)

    def record_tool_call(self, tool_name: str, duration: float):
        """记录工具调用性能"""
        self.metrics[tool_name].append(duration)

    def get_average_time(self, tool_name: str) -> float:
        """获取平均响应时间"""
        times = self.metrics.get(tool_name, [])
        return sum(times) / len(times) if times else 0

    def get_stats(self):
        """获取统计信息"""
        stats = {}
        for tool_name, times in self.metrics.items():
            stats[tool_name] = {
                "avg": sum(times) / len(times),
                "min": min(times),
                "max": max(times),
                "count": len(times)
            }
        return stats

3. 日志系统

import logging
from logging.handlers import RotatingFileHandler

def setup_logging(service_name: str):
    """配置日志系统"""
    logger = logging.getLogger(service_name)
    logger.setLevel(logging.INFO)

    # 文件处理器
    file_handler = RotatingFileHandler(
        f"{service_name}.log",
        maxBytes=10*1024*1024,
        backupCount=5
    )
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    ))

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(
        '%(asctime)s - %(levelname)s - %(message)s'
    ))

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

错误处理和容错

1. 自动重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def robust_tool_call(session, tool_name, arguments):
    """带重试的工具调用"""
    return await session.call_tool(tool_name, arguments)

2. 熔断器模式

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
async def call_with_circuit_breaker(session, tool_name, arguments):
    """带熔断器的工具调用"""
    return await session.call_tool(tool_name, arguments)

3. 降级策略

async def call_with_fallback(session, tool_name, arguments):
    """带降级的工具调用"""
    try:
        return await session.call_tool(tool_name, arguments)
    except Exception as e:
        logger.error(f"Tool call failed: {e}")
        # 返回降级结果
        return {
            "content": [{"text": f"工具 {tool_name} 暂时不可用,请稍后重试"}]
        }

安全性考虑

1. 身份验证

from functools import wraps

def require_auth(f):
    """身份验证装饰器"""
    @wraps(f)
    async def wrapper(*args, **kwargs):
        token = os.getenv("AUTH_TOKEN")
        if not token:
            raise ValueError("AUTH_TOKEN not configured")

        # 验证 token
        if not validate_token(token):
            raise PermissionError("Invalid authentication token")

        return await f(*args, **kwargs)
    return wrapper

@mcp.tool()
@require_auth
def secure_add(a: int, b: int) -> str:
    """需要身份验证的工具"""
    return f"{a} + {b} = {a + b}"

2. 参数验证

from pydantic import BaseModel, validator

class AddParams(BaseModel):
    a: int
    b: int

    @validator('a', 'b')
    def validate_positive(cls, v):
        if v < 0:
            raise ValueError('Values must be positive')
        return v

@mcp.tool()
def validated_add(params: AddParams) -> str:
    """带参数验证的工具"""
    result = params.a + params.b
    return f"{params.a} + {params.b} = {result}"

3. 速率限制

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@mcp.tool()
@limiter.limit("10/minute")
def rate_limited_add(a: int, b: int) -> str:
    """带速率限制的工具"""
    return f"{a} + {b} = {a + b}"

性能优化

1. 连接池

from aiohttp import ClientSession, TCPConnector

class MCPConnectionPool:
    def __init__(self, pool_size=10):
        self.connector = TCPConnector(limit=pool_size)
        self.session = None

    async def __aenter__(self):
        self.session = ClientSession(connector=self.connector)
        return self.session

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
        await self.connector.close()

2. 缓存策略

from functools import lru_cache
import hashlib

def cache_key(tool_name: str, arguments: dict) -> str:
    """生成缓存键"""
    args_str = json.dumps(arguments, sort_keys=True)
    return f"{tool_name}:{hashlib.md5(args_str.encode()).hexdigest()}"

@lru_cache(maxsize=1000)
async def cached_tool_call(tool_name: str, arguments: dict):
    """带缓存工具调用"""
    # 实际的工具调用逻辑
    pass

3. 批量处理

async def batch_tool_calls(session, calls: list):
    """批量调用工具"""
    tasks = [
        session.call_tool(call['name'], call['arguments'])
        for call in calls
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

测试策略

1. 单元测试

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_multi_server_client():
    with patch('langchain_mcp_adapters.client.MultiServerMCPClient') as mock_client:
        mock_session = AsyncMock()
        mock_session.get_tools = AsyncMock(return_value=[])

        result = await mock_session.get_tools()

        assert isinstance(result, list)

2. 集成测试

@pytest.mark.asyncio
async def test_integration():
    # 启动多个服务器
    # 执行集成测试
    # 清理资源
    pass

3. 压力测试

import asyncio
import statistics

async def stress_test(tool_name: str, arguments: dict, concurrent_users: int = 100):
    """压力测试"""
    start_time = time.time()
    tasks = [
        call_tool(tool_name, arguments)
        for _ in range(concurrent_users)
    ]
    results = await asyncio.gather(*tasks)
    duration = time.time() - start_time

    avg_time = duration / concurrent_users
    print(f"平均响应时间: {avg_time:.2f}s")
    print(f"吞吐量: {concurrent_users / duration:.2f} req/s")

最佳实践总结

1. 架构设计

  • 分离关注点: 不同服务器负责不同功能
  • 松耦合: 服务器之间保持独立
  • 可扩展: 支持动态添加服务器
  • 容错性: 单点故障不影响整体

2. 代码质量

  • 类型注解: 使用 Python 类型提示
  • 文档字符串: 完整的函数文档
  • 单元测试: 高测试覆盖率
  • 代码审查: 定期代码审查

3. 运维管理

  • 配置管理: 集中化配置管理
  • 监控告警: 实时监控系统状态
  • 日志收集: 集中化日志收集
  • 自动部署: CI/CD 自动化部署

4. 性能优化

  • 连接复用: 使用连接池
  • 缓存策略: 合理使用缓存
  • 异步处理: 充分利用异步特性
  • 批量操作: 减少网络往返

5. 安全保障

  • 身份验证: 实现身份验证机制
  • 参数验证: 严格验证输入参数
  • 速率限制: 防止滥用
  • 数据加密: 传输数据加密

故障排查

问题 1: 服务器连接失败

原因:

  • 服务器未启动
  • 网络问题
  • 防火墙阻止

解决方案:

  • 检查服务器状态
  • 验证网络连接
  • 配置防火墙规则

问题 2: 工具调用超时

原因:

  • 工具执行时间过长
  • 网络延迟
  • 服务器负载过高

解决方案:

  • 增加超时时间
  • 优化工具实现
  • 扩展服务器资源

问题 3: 内存泄漏

原因:

  • 未正确释放资源
  • 缓存未清理
  • 对象循环引用

解决方案:

  • 使用上下文管理器
  • 定期清理缓存
  • 使用内存分析工具

总结

本文详细介绍了多服务器架构的设计与实现,包括:

  • 多服务器架构模式
  • MultiServerMCPClient 实现
  • 服务器部署策略
  • 监控和日志系统
  • 错误处理和容错机制
  • 安全性考虑
  • 性能优化
  • 最佳实践

通过本系列文章的学习,您应该已经掌握了 MCP 协议的完整应用,从基础概念到生产级部署。希望这些知识能够帮助您构建强大的 AI 应用。

阅读顺序建议

  1. 01-MCP协议入门指南: 了解 MCP 基本概念和核心组件
  2. 02-快速构建MCP服务器: 使用 FastMCP 构建服务器
  3. 03-MCP客户端开发实战: 开发 stdio 和 HTTP 客户端
  4. 04-LLM与MCP集成实践: 集成到 LangGraph 构建智能代理
  5. 05-多服务器架构与最佳实践: 多服务器架构和生产部署

参考资源

文章标签

多服务器架构, MCP最佳实践, 生产部署, 性能优化, 安全性, 监控, 容错, Python开发, 系统架构
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐