智能化问题分析(Cherry Stdio+ MCP)
背景
当前系统日志分散在Kibana和SkyWalking中,故障排查需要人工关联分析,效率低下,需要建立相关的智能体进行快速定位。
涉及开发语言
java 、python
业务目标
- 实现跨系统的日志追踪关联分析
- 将平均故障定位时间(MTTD)缩短40%
- 建立智能化的根因分析能力
分层架构图

功能流程图

工具:Cherry Stdio 工具作为MCP的客户端。
整体项目步骤介绍
1,了解整体项目的路由策略,目的后续根据url需要查询出所有的代码,例如controller->service->dao
2,将工程代码进行解析,目的了解项目的结构和为库系统做数据准备
3, 了解kinbana 和skywalking的数据存储位置,需要获取相应的链接。
4,涉及到两个项目,一个是java项目(analyseProjectCode),一个是python项目(logAnalyse-MCP)
`analyseProjectCode` 是一个 Java 项目分析工具,能够扫描和解析 Java 源代码及 MyBatis XML 映射文件,并将分析结果存储到数据库中。它还支持建立方法之间的调用关系。
logAnalyse-MCP是MCP Server 端,向LLM提供工具
项目整体演示
1,将工程数据利用java项目进行项目分析

2,启动MCP Server端

3,打开cherry stdio 客户端,并添加MCP服务器

4,根据场景进行分析
1,错误日志分析
在kinbana上获取到traceId,前提是日志中有traceId。就不截图了。
19f0ab40118e4c198cff8aa2d729fe32.1838.17515966802584869
直接发送给LLM,【这是我的traceId19f0ab40118e4c198cff8aa2d729fe32.1838.17515966802584869 帮我分析错误】



后续的建议修复都会有~~
2,性能日志分析
在skywalking 上获取到耗时最长的接口的traceId
764ffdf3596445ea9d21e44c1f913c1e.303.17534333595660373

向LLM发送【请帮分析一下764ffdf3596445ea9d21e44c1f913c1e.303.17534333595660373 性能报告】
跨系统的性能排查:这个接口是跨系统的。



其他的性能排查:



python核心logAnalyseHttp.py 代码:
from __future__ import annotations
import argparse
import asyncio
import json
from typing import Any, AsyncIterator, Callable
import httpx
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponse
import logAnalyse2
# ---------------------------------------------------------------------------
# Server constants
# ---------------------------------------------------------------------------
SERVER_NAME = "LogAnalyse"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2024-11-05"
# ---------------------------------------------------------------------------
# FastAPI app and tools registry
# ---------------------------------------------------------------------------
app = FastAPI(title="LogAnalyse HTTP-Stream v8")
# ---------------------------------------------------------------------------
# Dummy tool implementations (to be replaced with actual logic)
# ---------------------------------------------------------------------------
async def query_error_logs(trace_id: str) -> dict:
print(f"traceId:{trace_id}")
result = await logAnalyse2.query_error_logs(trace_id)
print(result)
return result
async def query_skywalking_spans(trace_id: str) -> dict:
return await logAnalyse2.query_skywalking_spans(trace_id)
async def query_common_code(project_code: str = None, humanFlag: str = None) -> dict:
return await logAnalyse2.query_common_code(project_code, humanFlag)
async def query_code_by_className_method(project_code: str = None, fullClassName_method: str = None) -> dict:
return await logAnalyse2.query_code_by_className_method(project_code, fullClassName_method)
async def query_table_create_sql(dbName: str = None) -> dict:
return await logAnalyse2.query_table_create_sql(dbName)
async def query_url_code(project_code: str = None, url: str = None, http_method: str = None) -> dict:
return await logAnalyse2.query_url_code(project_code, url, http_method)
# Tool registry for dispatch
TOOL_FUNCTIONS: dict[str, Callable[..., Any]] = {
"query_error_logs": query_error_logs,
"query_skywalking_spans": query_skywalking_spans,
"query_common_code": query_common_code,
"query_code_by_className_method": query_code_by_className_method,
"query_table_create_sql": query_table_create_sql,
"query_url_code": query_url_code
}
TOOLS_REGISTRY = {
"tools": [
{
"name": "query_error_logs",
"description": "通过 trace_id 获取错误日志信息,主要用途是收集错误日志。",
"inputSchema": {
"type": "object",
"properties": {
"trace_id": {
"type": "string",
"description": "SkyWalking 的 trace_id"
}
},
"required": ["trace_id"]
}
},
{
"name": "query_skywalking_spans",
"description": "通过 trace_id 查询 SkyWalking GraphQL 接口,返回调用链中的数据,主要用途是做性能分析。",
"inputSchema": {
"type": "object",
"properties": {
"trace_id": {
"type": "string",
"description": "SkyWalking 的 trace_id"
}
},
"required": ["trace_id"]
}
},
{
"name": "query_common_code",
"description": "如果错误日志中检查到使用 Feign 拦截器或异步线程池调用(Async),则传入 FeignConfig 或 AsyncConfig 获取相关代码实现。",
"inputSchema": {
"type": "object",
"properties": {
"project_code": {
"type": "string",
"description": "项目编号"
},
"humanFlag": {
"type": "string",
"description": "类名,如 FeignConfig 或 AsyncConfig"
}
},
"required": ["humanFlag"]
}
},
{
"name": "query_code_by_className_method",
"description": "根据类全路径和方法名查询方法实现代码,并递归查询其调用的方法实现。",
"inputSchema": {
"type": "object",
"properties": {
"project_code": {
"type": "string",
"description": "项目编号"
},
"fullClassName_method": {
"type": "string",
"description": "类全路径与方法名,格式如 com.xxx.UserService.findById"
}
},
"required": ["fullClassName_method"]
}
},
{
"name": "query_table_create_sql",
"description": "根据数据库名称查询所有表的建表语句。",
"inputSchema": {
"type": "object",
"properties": {
"dbName": {
"type": "string",
"description": "数据库名称"
}
},
"required": ["dbName"]
}
},
{
"name": "query_url_code",
"description": "根据项目编号、URL 和 HTTP 方法查询接口实现代码路径信息。",
"inputSchema": {
"type": "object",
"properties": {
"project_code": {
"type": "string",
"description": "项目编号"
},
"url": {
"type": "string",
"description": "请求 URL,如 /v1/jyht/cad/anno/modify/users"
},
"http_method": {
"type": "string",
"description": "HTTP 方法,如 POST、GET"
}
},
"required": ["url", "http_method"]
}
}
],
"nextCursor": "qinqing"
}
@app.get("/mcp")
async def mcp_initialize_via_get():
return {
"jsonrpc": "2.0",
"id": 0,
"result": {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"streaming": True, "tools": {"listChanged": True}},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
"instructions": "Use the tool to query logs, spans, or code."
}
}
@app.post("/mcp")
async def mcp_endpoint(request: Request):
try:
body = await request.json()
print("💡 收到请求:", json.dumps(body, ensure_ascii=False, indent=2))
except Exception:
return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700,"message": "Parse error"}}
req_id = body.get("id", 1)
method = body.get("method")
print(f"🔧 方法: {method}")
if method == "notifications/initialized":
return Response(status_code=status.HTTP_204_NO_CONTENT)
if method is None:
return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP server online."}}
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"streaming": True, "tools": {"listChanged": True}},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
"instructions": "Use the tool to query logs, spans, or code."
}
}
if method == "tools/list":
return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}
if method == "tools/call":
params = body.get("params", {})
tool_name = params.get("name")
args = params.get("arguments", {})
if tool_name not in TOOL_FUNCTIONS:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Tool '{tool_name}' not found"}}
try:
result = await TOOL_FUNCTIONS[tool_name](**args)
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],
"isError": False
}
}
except Exception as e:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(e)}}
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "Method not found"}}
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="LogAnalyse MCP HTTP-Stream v8")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
import uvicorn
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":
main()
java 工程代码:analyseProjectCode
python工程代码:logAnalyse-MCP
待完善的点:
- url 根据通配符进行匹配
更多推荐


所有评论(0)