背景

当前系统日志分散在Kibana和SkyWalking中,故障排查需要人工关联分析,效率低下,需要建立相关的智能体进行快速定位。

涉及开发语言

java 、python

业务目标

  1. 实现跨系统的日志追踪关联分析
  2. 将平均故障定位时间(MTTD)缩短40%
  3. 建立智能化的根因分析能力

分层架构图

功能流程图

工具:Cherry Stdio 工具作为MCP的客户端。

整体项目步骤介绍

1,了解整体项目的路由策略,目的后续根据url需要查询出所有的代码,例如controller->service->dao

2,将工程代码进行解析,目的了解项目的结构和为库系统做数据准备

3, 了解kinbana 和skywalking的数据存储位置,需要获取相应的链接。

4,涉及到两个项目,一个是java项目(analyseProjectCode),一个是python项目(logAnalyse-MCP)

`analyseProjectCode` 是一个 Java 项目分析工具,能够扫描和解析 Java 源代码及 MyBatis XML 映射文件,并将分析结果存储到数据库中。它还支持建立方法之间的调用关系。

logAnalyse-MCP是MCP Server 端,向LLM提供工具

项目整体演示

1,将工程数据利用java项目进行项目分析

2,启动MCP Server端

 

3,打开cherry stdio 客户端,并添加MCP服务器

4,根据场景进行分析
1,错误日志分析

在kinbana上获取到traceId,前提是日志中有traceId。就不截图了。

19f0ab40118e4c198cff8aa2d729fe32.1838.17515966802584869

直接发送给LLM,【这是我的traceId19f0ab40118e4c198cff8aa2d729fe32.1838.17515966802584869 帮我分析错误】

 

后续的建议修复都会有~~ 

2,性能日志分析

在skywalking 上获取到耗时最长的接口的traceId

764ffdf3596445ea9d21e44c1f913c1e.303.17534333595660373

向LLM发送【请帮分析一下764ffdf3596445ea9d21e44c1f913c1e.303.17534333595660373 性能报告】

跨系统的性能排查:这个接口是跨系统的。

 

 

 其他的性能排查:

 python核心logAnalyseHttp.py 代码:

from __future__ import annotations
import argparse
import asyncio
import json
from typing import Any, AsyncIterator, Callable
import httpx
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponse

import logAnalyse2

# ---------------------------------------------------------------------------
# Server constants
# ---------------------------------------------------------------------------
SERVER_NAME = "LogAnalyse"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2024-11-05"

# ---------------------------------------------------------------------------
# FastAPI app and tools registry
# ---------------------------------------------------------------------------
app = FastAPI(title="LogAnalyse HTTP-Stream v8")

# ---------------------------------------------------------------------------
# Dummy tool implementations (to be replaced with actual logic)
# ---------------------------------------------------------------------------
async def query_error_logs(trace_id: str) -> dict:
    print(f"traceId:{trace_id}")
    result = await logAnalyse2.query_error_logs(trace_id)
    print(result)
    return result

async def query_skywalking_spans(trace_id: str) -> dict:
    return await logAnalyse2.query_skywalking_spans(trace_id)
async def query_common_code(project_code: str = None, humanFlag: str = None) -> dict:
    return await logAnalyse2.query_common_code(project_code, humanFlag)

async def query_code_by_className_method(project_code: str = None, fullClassName_method: str = None) -> dict:
    return await logAnalyse2.query_code_by_className_method(project_code, fullClassName_method)

async def query_table_create_sql(dbName: str = None) -> dict:
    return await logAnalyse2.query_table_create_sql(dbName)

async def query_url_code(project_code: str = None, url: str = None, http_method: str = None) -> dict:
    return await logAnalyse2.query_url_code(project_code, url, http_method)

# Tool registry for dispatch
TOOL_FUNCTIONS: dict[str, Callable[..., Any]] = {
    "query_error_logs": query_error_logs,
    "query_skywalking_spans": query_skywalking_spans,
    "query_common_code": query_common_code,
    "query_code_by_className_method": query_code_by_className_method,
    "query_table_create_sql": query_table_create_sql,
    "query_url_code": query_url_code
}

TOOLS_REGISTRY = {
    "tools": [
        {
            "name": "query_error_logs",
            "description": "通过 trace_id 获取错误日志信息,主要用途是收集错误日志。",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "trace_id": {
                        "type": "string",
                        "description": "SkyWalking 的 trace_id"
                    }
                },
                "required": ["trace_id"]
            }
        },
        {
            "name": "query_skywalking_spans",
            "description": "通过 trace_id 查询 SkyWalking GraphQL 接口,返回调用链中的数据,主要用途是做性能分析。",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "trace_id": {
                        "type": "string",
                        "description": "SkyWalking 的 trace_id"
                    }
                },
                "required": ["trace_id"]
            }
        },
        {
            "name": "query_common_code",
            "description": "如果错误日志中检查到使用 Feign 拦截器或异步线程池调用(Async),则传入 FeignConfig 或 AsyncConfig 获取相关代码实现。",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "project_code": {
                        "type": "string",
                        "description": "项目编号"
                    },
                    "humanFlag": {
                        "type": "string",
                        "description": "类名,如 FeignConfig 或 AsyncConfig"
                    }
                },
                "required": ["humanFlag"]
            }
        },
        {
            "name": "query_code_by_className_method",
            "description": "根据类全路径和方法名查询方法实现代码,并递归查询其调用的方法实现。",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "project_code": {
                        "type": "string",
                        "description": "项目编号"
                    },
                    "fullClassName_method": {
                        "type": "string",
                        "description": "类全路径与方法名,格式如 com.xxx.UserService.findById"
                    }
                },
                "required": ["fullClassName_method"]
            }
        },
        {
            "name": "query_table_create_sql",
            "description": "根据数据库名称查询所有表的建表语句。",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "dbName": {
                        "type": "string",
                        "description": "数据库名称"
                    }
                },
                "required": ["dbName"]
            }
        },
        {
            "name": "query_url_code",
            "description": "根据项目编号、URL 和 HTTP 方法查询接口实现代码路径信息。",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "project_code": {
                        "type": "string",
                        "description": "项目编号"
                    },
                    "url": {
                        "type": "string",
                        "description": "请求 URL,如 /v1/jyht/cad/anno/modify/users"
                    },
                    "http_method": {
                        "type": "string",
                        "description": "HTTP 方法,如 POST、GET"
                    }
                },
                "required": ["url", "http_method"]
            }
        }
    ],
    "nextCursor": "qinqing"
}

@app.get("/mcp")
async def mcp_initialize_via_get():
    return {
        "jsonrpc": "2.0",
        "id": 0,
        "result": {
            "protocolVersion": PROTOCOL_VERSION,
            "capabilities": {"streaming": True, "tools": {"listChanged": True}},
            "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
            "instructions": "Use the tool to query logs, spans, or code."
        }
    }

@app.post("/mcp")
async def mcp_endpoint(request: Request):
    try:
        body = await request.json()
        print("💡 收到请求:", json.dumps(body, ensure_ascii=False, indent=2))
    except Exception:
        return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700,"message": "Parse error"}}

    req_id = body.get("id", 1)
    method = body.get("method")
    print(f"🔧 方法: {method}")

    if method == "notifications/initialized":
        return Response(status_code=status.HTTP_204_NO_CONTENT)

    if method is None:
        return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP server online."}}

    if method == "initialize":
        return {
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "protocolVersion": PROTOCOL_VERSION,
                "capabilities": {"streaming": True, "tools": {"listChanged": True}},
                "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
                "instructions": "Use the tool to query logs, spans, or code."
            }
        }

    if method == "tools/list":
        return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}

    if method == "tools/call":
        params = body.get("params", {})
        tool_name = params.get("name")
        args = params.get("arguments", {})

        if tool_name not in TOOL_FUNCTIONS:
            return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Tool '{tool_name}' not found"}}

        try:
            result = await TOOL_FUNCTIONS[tool_name](**args)
            return {
                "jsonrpc": "2.0",
                "id": req_id,
                "result": {
                    "content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],
                    "isError": False
                }
            }
        except Exception as e:
            return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(e)}}

    return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "Method not found"}}

# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:
    parser = argparse.ArgumentParser(description="LogAnalyse MCP HTTP-Stream v8")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()
    import uvicorn
    uvicorn.run(app, host=args.host, port=args.port, log_level="info")

if __name__ == "__main__":
    main()

java 工程代码:analyseProjectCode

python工程代码:logAnalyse-MCP

待完善的点:

  1. url 根据通配符进行匹配

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐