用 FastAPI + LangGraph + MCP 搭 AI Agent,我踩了哪些坑
摘要 本文记录了作者整合 FastAPI、LangGraph 和 MCP 构建 AI Agent 过程中遇到的四个主要技术问题及解决方案: MCP Server 与 LangGraph 的事件循环冲突:因异步上下文管理机制冲突导致"Event loop is closed"错误,通过全局复用 MCP Client 解决。 全局变量在 FastAPI 多进程环境失效:发现 Fas
用 FastAPI + LangGraph + MCP 搭 AI Agent,我踩了哪些坑
从"看起来很简单"到"这都能报错?"的真实记录
说实话,一开始我以为这事挺简单的。
FastAPI 写 API 我熟,LangGraph 做过几个 Demo,MCP 协议也看过文档。把这三拼在一起搭个 AI Agent,能有多难?
结果断断续续折腾了两天,我才把第一个能跑通的版本搞出来。
以下是我踩过的坑,以及我是怎么(暂时)爬出来的。
坑一:MCP Server 和 LangGraph 的"时差"
报错现场:
RuntimeError: Event loop is closed
当时我在干嘛:
按照文档,我用 FastMCP 启动了一个本地 MCP Server,然后在 LangGraph 的 Agent 里想调用它。
代码大概长这样:
from mcp import ClientSession, StdioServerParameters
# 启动 MCP Server
server_params = StdioServerParameters(
command="python",
args=["mcp_server.py"]
)
# 在 LangGraph 节点里用
async def agent_node(state):
async with ClientSession(server_params) as session:
result = await session.call_tool("search", {"query": state["question"]})
return {"result": result}
看起来没问题对吧?但一跑就报 Event loop is closed。
我花了多久发现: 大概40分钟,加两杯咖啡。
问题在哪:
MCP 的 ClientSession 是基于异步上下文管理器的,每次 async with 都会新建连接。但 LangGraph 的节点函数可能被多次调用,而且 FastAPI 本身有自己的事件循环管理机制。
简单说就是:MCP 想独占一个事件循环,但 LangGraph 和 FastAPI 不想给。
我是怎么解决的:
把 MCP Client 的初始化提到 Graph 外部,作为全局变量复用:
# 全局初始化,只建立一次连接
mcp_client = None
async def get_mcp_client():
global mcp_client
if mcp_client is None:
session = await ClientSession.create(server_params)
await session.initialize()
mcp_client = session
return mcp_client
# LangGraph 节点里直接用
async def agent_node(state):
client = await get_mcp_client()
result = await client.call_tool("search", {"query": state["question"]})
return {"result": result}
但这样又引出了第二个坑…
坑二:全局变量在异步环境下的"薛定谔状态"
报错现场:
AttributeError: 'NoneType' object has no attribute 'call_tool'
当时我在干嘛:
兴冲冲地把代码改完,跑单元测试,过了。部署到 FastAPI,请求一进来,又崩了。
我花了多久发现: 这次快一点,20分钟。
问题在哪:
FastAPI 是多 worker 架构(默认用 uvicorn 多进程),每个 worker 有自己的内存空间。我在主进程里初始化的 mcp_client,子进程访问不到。
我是怎么解决的:
用 contextlib.asynccontextmanager 在 FastAPI 启动时初始化,存到 app state 里:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化 MCP Client
session = await ClientSession.create(server_params)
await session.initialize()
app.state.mcp_client = session
yield
# 关闭时清理
await session.close()
app = FastAPI(lifespan=lifespan)
# LangGraph 节点里从 app state 获取
async def agent_node(state):
from fastapi import Request
request = Request(scope={"app": app}) # 这里其实有问题...
client = request.app.state.mcp_client
...
等等,上面这段代码其实有问题。
因为 LangGraph 节点函数拿不到 FastAPI 的 request 对象。
坑三:LangGraph 节点里的"上下文困境"
当时的情况:
LangGraph 的节点函数签名是这样的:
def node_function(state: State) -> State:
...
或者异步版本:
async def async_node_function(state: State) -> State:
...
没有 request 参数,没有 app 参数,什么都没有。
那我该怎么拿到 FastAPI 的 app.state.mcp_client?
我花了多久发现: 这次比较久,大概1个小时,期间我去翻了 LangGraph 的源码。
问题在哪:
LangGraph 的设计哲学是"状态驱动",节点函数只关心输入状态和输出状态,不关心外部上下文。这本身是对的,但和 FastAPI 的依赖注入机制不太兼容。
我是怎么解决的:
用一个折中方案:自定义 State 类型,把 MCP Client 传进去:
from langgraph.graph import StateGraph
from typing import TypedDict, Any
class AgentState(TypedDict):
question: str
result: Any
mcp_client: Any # 把 client 塞进 state
# FastAPI handler 里初始化
@app.post("/chat")
async def chat(request: ChatRequest):
# 从 app state 拿 client
mcp_client = app.state.mcp_client
# 塞进 initial state
initial_state = {
"question": request.question,
"mcp_client": mcp_client
}
# 运行 graph
result = await graph.ainvoke(initial_state)
return {"answer": result["result"]}
# 节点函数里用
async def agent_node(state: AgentState):
client = state["mcp_client"]
result = await client.call_tool("search", {"query": state["question"]})
return {"result": result}
这样虽然能跑,但总觉得怪怪的——MCP Client 不该属于业务状态,它是个基础设施。
不过先这样吧,能跑就行。
坑四:MCP Tool 的序列化玄学
报错现场:
TypeError: Object of type MyCustomClass is not JSON serializable
当时我在干嘛:
MCP Server 返回的结果里,有些字段是自定义类型(比如 datetime、Pydantic 模型)。我想直接把这个结果塞进 LangGraph 的 State,然后返回给前端。
我花了多久发现: 10分钟,因为错误信息挺明显的。
问题在哪:
LangGraph 的 State 需要能被 JSON 序列化(因为它内部可能做持久化或检查点),但 MCP 返回的数据结构可能包含非标准类型。
我是怎么解决的:
写了个简单的转换函数,把 MCP 结果转成纯 dict:
from datetime import datetime
def serialize_mcp_result(result):
"""把 MCP 结果转成 JSON 可序列化的格式"""
if isinstance(result, dict):
return {k: serialize_mcp_result(v) for k, v in result.items()}
elif isinstance(result, list):
return [serialize_mcp_result(item) for item in result]
elif isinstance(result, datetime):
return result.isoformat()
elif hasattr(result, 'model_dump'): # Pydantic v2
return result.model_dump()
elif hasattr(result, 'dict'): # Pydantic v1
return result.dict()
else:
return result
# 节点函数里用
async def agent_node(state: AgentState):
client = state["mcp_client"]
raw_result = await client.call_tool("search", {"query": state["question"]})
clean_result = serialize_mcp_result(raw_result)
return {"result": clean_result}
坑五:并发请求时的"资源争抢"
报错现场:
没有报错,但响应时间越来越慢,最后直接卡死。
当时我在干嘛:
压测。我用 wrk 模拟了50个并发请求,想看看性能怎么样。
结果前几个请求还挺快(几百毫秒),后面就越来越慢,最后直接超时。
我花了多久发现: 这次没有"发现",只有"意识到出大事了"。
问题在哪:
MCP Client 和 Server 之间用的是 STDIO 传输(标准输入输出),这种传输方式不支持并发。所有请求都在排队等同一个连接。
我是怎么解决的:
换成 SSE(Server-Sent Events)传输,或者每个请求独立启动一个 MCP Server 进程。
我选了后者,因为改动小:
import asyncio
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=4)
async def agent_node(state: AgentState):
# 在独立进程里运行 MCP 调用
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
run_mcp_in_subprocess,
state["question"]
)
return {"result": result}
def run_mcp_in_subprocess(question):
"""在子进程里运行 MCP 调用"""
import subprocess
import json
result = subprocess.run(
["python", "mcp_call.py", question],
capture_output=True,
text=True
)
return json.loads(result.stdout)
当然,这个方案也有问题(进程间通信开销大),但至少能并发跑了。
现在能跑了,但…
折腾完这五个坑,我终于有了一个能跑通的版本:
- FastAPI 提供 HTTP API
- LangGraph 管理 Agent 流程
- MCP 提供外部工具能力
代码大概长这样(简化版):
from fastapi import FastAPI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Any
import asyncio
class AgentState(TypedDict):
question: str
result: Any
# LangGraph 节点
async def agent_node(state: AgentState):
# 这里是 MCP 调用逻辑
result = await call_mcp_tool(state["question"])
return {"result": result}
# 构建 Graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)
graph = workflow.compile()
# FastAPI 应用
app = FastAPI()
@app.post("/chat")
async def chat(question: str):
result = await graph.ainvoke({"question": question})
return result
看起来挺简洁的对吧?但我知道这只是"看起来"。
真实的代码里还有各种异常处理、重试逻辑、超时控制、日志打点…
一点感想
说实话,FastAPI + LangGraph + MCP 这个组合,文档看起来都很清晰,但真拼在一起跑,坑还是不少。
主要问题我觉得是:
- 异步编程的复杂性 - 三个框架都有自己的事件循环逻辑,混在一起容易打架
- 上下文传递 - LangGraph 的"无上下文"设计和 FastAPI 的依赖注入不太兼容
- MCP 的传输限制 - STDIO 不支持并发,SSE 又增加复杂度
- 序列化问题 - 数据在各个组件之间流转时,类型转换很麻烦
但我还是觉得这套组合有前途。
如果你也在折腾这个组合,希望这篇踩坑记录能帮你省点时间。
如果你有更好的解决方案,欢迎交流。
GitHub 仓库: https://github.com/YaBoom/fastapi-langgraph-mcp-zyt
(代码是实验性的,README 里写了 TODO 和已知问题)
写作风格:踩坑实录型
- 完整记录错误代码→报错→调试→解决的过程
- 包含真实的时间线和心理活动
- 有困惑、有弯路、有不完美的妥协方案
更多推荐

所有评论(0)