1. 概述

这两年MCP Server很火爆,但是我一直不了解什么是MCP Server,借着这个机会,实践了一把MCP Server的搭建,因为资源有限,后续再补充LLM模型应用MCP Server的部分。本demo介绍了如何从零开始构建一个基于MCP的磁盘监控服务器Docker镜像。该镜像包含一个MCP服务器,提供磁盘SMART状态检查和磁盘列表查询功能,以及对应的客户端测试工具。

1.1 功能特性

  • MCP服务器端: 提供RESTful API接口,支持SSE传输模式
  • 磁盘监控工具:
    • check_disk_smart: 检查硬盘SMART自检状态
    • list_disks: 查询服务器磁盘列表信息
  • 客户端工具: 完整的Python客户端实现,支持异步调用
  • 容器化部署: 完整的Docker镜像构建和运行方案

2. 环境准备

2.1 系统要求

  • 操作系统: EulerOS 2.x系列或EulerOS 22.03 LTS系列,ubuntu用apt-get下载docker,其余应该都一样
  • 网络: 可访问外部软件仓库

2.2 Docker安装

2.2.1 添加Docker仓库
# 添加Docker官方仓库
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
2.2.2 适配EulerOS版本

根据EulerOS版本修改仓库配置:

对于EulerOS 2.x系列(对应CentOS 7):

sed -i 's/$releasever/7/g' /etc/yum.repos.d/docker-ce.repo

对于EulerOS 22.03 LTS系列(对应CentOS 8):

sed -i 's/$releasever/8/g' /etc/yum.repos.d/docker-ce.repo
2.2.3 清理并重建缓存
yum clean all
yum makecache
2.2.4 安装Docker
yum install docker-ce
2.2.5 验证安装
docker --version

成功输出Docker版本信息表示安装成功。

2.3 启动Docker服务

# 启动Docker服务
systemctl start docker
# 设置开机自启
systemctl enable docker
# 查看服务状态
systemctl status docker

2.4(可选)挂载/var到硬盘上

因docker镜像默认保存在/var/lib路径下,若空间不够,可挂载到硬盘上存储,步骤如下:

  1. nvme6n1 上创建分区和文件系统
  2. 临时挂载并迁移 /var 数据
  3. 修改 /etc/fstab 实现永久挂载*
2.4.1:在 nvme6n1 上创建分区
# 使用 fdisk 或 parted 创建新分区(这里以 fdisk 为例)
sudo fdisk /dev/nvme6n1

在 fdisk 交互界面中:

  • 输入 n 创建新分区
  • 选择默认分区类型(主分区)
  • 设置分区大小(建议分配全部空间或按需分配)
  • 输入 w 保存并退出

创建后查看新分区设备名(通常是 /dev/nvme6n1p1):

sudo lsblk /dev/nvme6n1
2.4.2:创建文件系统
# 创建 ext4 文件系统(也可根据需求选择 xfs 等)
sudo mkfs.ext4 /dev/nvme6n1p1
2.4.3:临时挂载并迁移数据
# 创建临时挂载点
sudo mkdir /mnt/var_new

# 挂载新分区
sudo mount /dev/nvme6n1p1 /mnt/var_new

# 复制 /var 数据到新位置(保留权限属性)
sudo rsync -avx /var/ /mnt/var_new/

# 重命名原 /var 目录作为备份
sudo mv /var /var.backup

# 创建新的空 /var 目录
sudo mkdir /var

# 卸载临时挂载点
sudo umount /mnt/var_new

# 将新分区挂载到 /var
sudo mount /dev/nvme6n1p1 /var
2.4.4:配置永久挂载

编辑 /etc/fstab 文件:

sudo vi /etc/fstab

添加以下行(根据实际文件系统类型调整):

/dev/nvme6n1p1 /var ext4 defaults 0 0
2.4.5:验证和清理
# 重新挂载所有文件系统
sudo mount -a

# 验证挂载
df -h /var
lsblk

# 确认服务正常运行后,可删除备份(谨慎操作)
# sudo rm -rf /var.backup

3. 项目文件准备

3.1 项目结构

mcp-disk-monitor/
├── Dockerfile
├── requirements.txt
├── mcp_server.py
└── mcp_client.py

3.2 依赖文件 (requirements.txt)

fastmcp>=0.1.0
langchain-mcp-adapters>=0.1.0
aiohttp

3.3 MCP服务器端 (mcp_server.py)

import subprocess
import json
from typing import List, Dict, Any
from fastmcp import FastMCP

# 初始化 MCP 服务实例
mcp = FastMCP(name="DiskMonitorServer")

@mcp.tool
def check_disk_smart(device: str = "/dev/sda") -> str:
    """
    检查指定硬盘的 SMART 自检状态。
    
    Args:
        device (str): 要检查的硬盘设备路径,默认为 /dev/sda。
    
    Returns:
        str: 返回 SMART 自检结果,为 "PASSED" 或 "FAILED"。
    """
    try:
        # 使用 smartctl 命令查询 SMART 健康状态
        # 注意:此命令需要 root 权限或 sudo 配置
        result = subprocess.run(
            ["sudo", "smartctl", "-H", device],
            capture_output=True,
            text=True,
            check=False
        )
        output = result.stdout.lower()
        
        if "passed" in output:
            return "PASSED"
        elif "failed" in output:
            return "FAILED"
        else:
            # 如果命令输出无法解析,可能表示命令执行失败或输出格式不符
            return f"UNKNOWN or ERROR: {result.stderr[:100]}"
            
    except FileNotFoundError:
        return "ERROR: 'smartctl' command not found. Please install smartmontools."
    except Exception as e:
        return f"ERROR: {str(e)}"

@mcp.tool
def list_disks() -> List[Dict[str, Any]]:
    """
    执行 lsblk 命令查询服务器上的磁盘列表。
    
    Returns:
        List[Dict]: 返回一个字典列表,每个字典包含磁盘的名称、大小和类型等信息。
    """
    try:
        # 使用 lsblk 命令以 JSON 格式输出磁盘信息
        result = subprocess.run(
            ["lsblk", "-J", "-o", "NAME,SIZE,TYPE,MOUNTPOINT"],
            capture_output=True,
            text=True,
            check=True
        )
        data = json.loads(result.stdout)
        # 从返回的 JSON 中提取 blockdevices 列表
        disks = data.get("blockdevices", [])
        return disks
        
    except FileNotFoundError:
        return [{"error": "'lsblk' command not found."}]
    except json.JSONDecodeError:
        return [{"error": "Failed to parse lsblk output."}]
    except subprocess.CalledProcessError as e:
        return [{"error": f"lsblk command failed: {e.stderr[:100]}"}]
    except Exception as e:
        return [{"error": f"Unexpected error: {str(e)}"}]

if __name__ == "__main__":
    # 启动服务,使用 SSE 传输模式,监听所有网络接口的 6666 端口
    mcp.run(transport="sse", host="0.0.0.0", port=6666)

3.4 MCP客户端 (mcp_client.py)

import asyncio
import json
from typing import Dict, Any, List
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.types import TextContent

class DiskMonitorClient:
    """
    使用 MultiServerMCPClient 的 MCP 客户端
    """
    
    def __init__(self, server_url: str = "http://localhost:6666/sse"):
        self.server_url = server_url
        self.server_name = "disk-monitor"
        
        # 创建 MultiServerMCPClient
        self.client = MultiServerMCPClient({
            self.server_name: {
                "url": server_url,
                "transport": "sse"
            }
        })
        
    async def connect(self):
        """连接服务器"""
        print(f"连接到 MCP 服务器: {self.server_url}")
        
        # 获取工具列表
        tools = await self.client.get_tools(server_name=self.server_name)
        print(f"可用工具: {[tool.name for tool in tools]}")
        
        return tools
        
    async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any:
        """调用服务器工具"""
        if arguments is None:
            arguments = {}
            
        print(f"\n调用工具: {tool_name}")
        print(f"参数: {arguments}")
        
        async with self.client.session(self.server_name) as session:
            # 调用工具
            tool_result = await session.call_tool(
                name=tool_name, 
                arguments=arguments
            )
            
            # 处理结果
            if tool_result.isError:
                print(f"工具调用错误: {tool_result}")
                return None
                
            # 提取文本内容
            result_list = []
            for item in tool_result.content:
                if isinstance(item, TextContent):
                    text = item.text
                    try:
                        # 尝试解析为 JSON
                        json_obj = json.loads(text)
                        result_list.append(json_obj)
                    except json.JSONDecodeError:
                        # 如果不是 JSON,保持为文本
                        result_list.append(text)
            
            print(f"工具返回类型: {type(result_list)}")
            print(f"工具返回长度: {len(result_list)}")
            
            return result_list
            
    async def list_tools(self):
        """列出所有可用工具"""
        async with self.client.session(self.server_name) as session:
            tool_list = await session.list_tools()
            return tool_list.tools
            
    async def disconnect(self):
        """断开连接"""
        # MultiServerMCPClient 会自动管理连接
        pass

def format_disk_info(disks: List[Dict]) -> str:
    """格式化磁盘信息以便阅读"""
    if not disks:
        return "没有找到磁盘信息"
    
    output = []
    
    def format_device(device: Dict, indent: int = 0):
        """递归格式化设备树"""
        prefix = "  " * indent
        name = device.get('name', '未知')
        size = device.get('size', '未知')
        dev_type = device.get('type', '未知')
        mountpoint = device.get('mountpoint') or '(未挂载)'
        
        # 主设备信息
        device_line = f"{prefix}├─ {name} ({dev_type}, {size})"
        if mountpoint and mountpoint != '(未挂载)':
            device_line += f" [挂载点: {mountpoint}]"
        output.append(device_line)
        
        # 子设备(分区)
        children = device.get('children', [])
        for i, child in enumerate(children):
            child_prefix = "  " * (indent + 1)
            last_child = (i == len(children) - 1)
            connector = "└─" if last_child else "├─"
            
            child_name = child.get('name', '未知')
            child_size = child.get('size', '未知')
            child_type = child.get('type', '未知')
            child_mount = child.get('mountpoint') or '(未挂载)'
            
            child_line = f"{child_prefix}{connector} {child_name} ({child_type}, {child_size})"
            if child_mount and child_mount != '(未挂载)':
                child_line += f" [挂载点: {child_mount}]"
            output.append(child_line)
            
            # 如果有孙设备(如 LVM 逻辑卷)
            grandchildren = child.get('children', [])
            for grandchild in grandchildren:
                format_device(grandchild, indent + 2)
    
    # 处理所有顶级设备
    for i, disk in enumerate(disks):
        if 'error' in disk:
            output.append(f"错误: {disk['error']}")
            continue
            
        if i > 0:
            output.append("")  # 设备间空行
        
        format_device(disk, 0)
    
    return "\n".join(output)

async def main():
    """主函数:测试 MCP 服务器工具"""
    client = DiskMonitorClient()
    
    try:
        # 1. 连接到服务器
        print("="*50)
        print("连接到 MCP 服务器...")
        print("="*50)
        
        tools = await client.connect()
        print(f"找到 {len(tools)} 个工具:")
        for tool in tools:
            print(f"  - {tool.name}: {tool.description}")
        
        # 2. 测试 list_disks 工具
        print("\n" + "="*50)
        print("测试 list_disks 工具...")
        print("="*50)
        
        disks_result = await client.call_tool("list_disks")
        
        if disks_result:
            print(f"\nlist_disks 响应类型: {type(disks_result)}")
            
            # 处理响应
            if isinstance(disks_result, list) and len(disks_result) > 0:
                # 第一个元素可能是工具响应
                first_result = disks_result[0]
                
                if isinstance(first_result, dict):
                    # 如果是字典,尝试获取 blockdevices
                    disks = first_result.get('blockdevices', [])
                    if not disks and isinstance(first_result, dict):
                        # 或者直接就是磁盘列表
                        disks = first_result
                elif isinstance(first_result, str):
                    # 如果是字符串,尝试解析为JSON
                    try:
                        parsed = json.loads(first_result)
                        if isinstance(parsed, dict):
                            disks = parsed.get('blockdevices', [])
                        else:
                            disks = parsed
                    except:
                        disks = []
                else:
                    disks = first_result
                
                if isinstance(disks, list):
                    print(f"\n找到 {len(disks)} 个磁盘设备:")
                    print("-" * 40)
                    
                    # 格式化显示
                    formatted = format_disk_info(disks)
                    print(formatted)
                else:
                    print(f"磁盘信息格式不正确: {type(disks)}")
                    print(f"内容: {disks}")
            else:
                print(f"响应格式: {disks_result}")
        else:
            print("获取磁盘列表失败")
        
        # 3. 测试 check_disk_smart 工具
        print("\n" + "="*50)
        print("测试 check_disk_smart 工具...")
        print("="*50)
        
        # 检查 /dev/sda
        smart_result = await client.call_tool(
            "check_disk_smart", 
            {"device": "/dev/sda"}
        )
        
        if smart_result:
            print(f"/dev/sda 的 SMART 状态:")
            if isinstance(smart_result, list) and len(smart_result) > 0:
                result = smart_result[0]
                if isinstance(result, dict):
                    print(json.dumps(result, indent=2, ensure_ascii=False))
                else:
                    print(result)
            else:
                print(smart_result)
        else:
            print("检查 SMART 状态失败")
            
            
    except Exception as e:
        print(f"客户端错误: {e}")
        import traceback
        traceback.print_exc()
        
    finally:
        # 断开连接
        await client.disconnect()
        print("\n程序结束")

if __name__ == "__main__":
    # 运行客户端
    asyncio.run(main())

4. Docker镜像构建

4.1 Dockerfile

FROM python:3.11

WORKDIR /app

# 安装系统依赖(包括 smartmontools 用于 smartctl)
RUN apt-get update && apt-get install -y \
    smartmontools \
    util-linux \
    sudo \
    && rm -rf /var/lib/apt/lists/*

# 复制项目文件
COPY requirements.txt .
COPY mcp_server.py .
COPY mcp_client.py .

# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --proxy ""

# 允许非 root 用户使用 sudo 执行 smartctl(在容器中通常不需要)
RUN echo "ALL ALL=(ALL) NOPASSWD: /usr/sbin/smartctl" >> /etc/sudoers

# 暴露端口
EXPOSE 6666

# 默认启动服务器
CMD ["python", "mcp_server.py"]

4.2 构建镜像

在项目目录下执行以下命令:

docker build -t mcp-disk-monitor .

构建过程说明:

  1. 使用Python 3.11作为基础镜像
  2. 安装系统依赖:smartmontools、util-linux、sudo
  3. 复制项目文件到容器内
  4. 使用清华镜像源安装Python依赖
  5. 配置sudo权限,允许非root用户执行smartctl命令
  6. 暴露6666端口供MCP服务使用
  7. 设置默认启动命令为MCP服务器

5. 运行与测试

5.1 启动MCP服务器容器

docker run -d --name mcp-server -p 6666:6666 mcp-disk-monitor

参数说明:

  • -d: 后台运行容器
  • --name mcp-server: 指定容器名称
  • -p 6666:6666: 将容器内6666端口映射到宿主机6666端口

5.2 验证容器状态

# 查看容器运行状态
docker ps | grep mcp-server

CONTAINER ID   IMAGE              COMMAND                  CREATED          STATUS          PORTS                                       NAMES
c7867de1dfb0   mcp-disk-monitor   "python mcp_server.py"   48 minutes ago   Up 48 minutes   0.0.0.0:6666->6666/tcp, :::6666->6666/tcp   mcp-server

# 查看容器日志
docker logs mcp-server

╭──────────────────────────────────────────────────────────────────────────────╮
│                                                                              │
│                                                                              │
│                         ▄▀▀ ▄▀█ █▀▀ ▀█▀ █▀▄▀█ █▀▀ █▀█                        │
│                         █▀  █▀█ ▄▄█  █  █ ▀ █ █▄▄ █▀▀                        │
│                                                                              │
│                                                                              │
│                                FastMCP 2.14.3                                │
│                            https://gofastmcp.com                             │
│                                                                              │
│                    🖥  Server:      DiskMonitorServer                         │
│                    🚀 Deploy free: https://fastmcp.cloud                     │
│                                                                              │
╰──────────────────────────────────────────────────────────────────────────────╯
╭──────────────────────────────────────────────────────────────────────────────╮
│                          ✨ FastMCP 3.0 is coming!                           │
│       Pin `fastmcp < 3` in production, then upgrade when you're ready.       │
╰──────────────────────────────────────────────────────────────────────────────╯


[01/22/26 08:01:43] INFO     Starting MCP server                  server.py:2585
                             'DiskMonitorServer' with transport
                             'sse' on http://0.0.0.0:6666/sse
INFO:     Started server process [1]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:6666 (Press CTRL+C to quit)
INFO:     172.17.0.1:44662 - "GET /sse HTTP/1.1" 200 OK
INFO:     182.200.112.34:37226 - "GET /sse HTTP/1.1" 200 OK


### 5.3 运行客户端测试
```bash
# 在宿主机上运行客户端测试
python mcp_client.py

输出:

../bin/python mcp_client.py
==================================================
连接到 MCP 服务器...
==================================================
连接到 MCP 服务器: http://xxxx:6666/sse
可用工具: ['check_disk_smart', 'list_disks']
找到 2 个工具:
  - check_disk_smart: 检查指定硬盘的 SMART 自检状态。

Args:
    device (str): 要检查的硬盘设备路径,默认为 /dev/sda。

Returns:
    str: 返回 SMART 自检结果,为 "PASSED""FAILED"。
  - list_disks: 执行 lsblk 命令查询服务器上的磁盘列表。

Returns:
    List[Dict]: 返回一个字典列表,每个字典包含磁盘的名称、大小和类型等信息。

==================================================
测试 list_disks 工具...
==================================================

调用工具: list_disks
参数: {}
工具返回类型: <class 'list'>
工具返回长度: 1

list_disks 响应类型: <class 'list'>

找到 8 个磁盘设备:
----------------------------------------
├─ sda (disk, 447.1G)
  ├─ md126 (raid1, 424.8G)
    ├─ md126p1 (part, 600M)
    ├─ md126p2 (part, 1G)
    ├─ md126p3 (part, 423.2G)
  └─ md127 (None, 0B)

├─ sdb (disk, 447.1G)
  ├─ md126 (raid1, 424.8G)
    ├─ md126p1 (part, 600M)
    ├─ md126p2 (part, 1G)
    ├─ md126p3 (part, 423.2G)
  └─ md127 (None, 0B)

6. 高级配置与优化

6.1 安全配置建议

# 创建非root用户运行应用
RUN useradd -m -u 1000 appuser && \
    chown -R appuser:appuser /app

USER appuser

6.2 性能优化

# 使用多阶段构建减少镜像大小
FROM python:3.11-slim as builder

# 构建阶段...

FROM python:3.11-slim
COPY --from=builder /app /app

6.3 健康检查

# 添加健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:6666/health || exit 1
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐