用 FastAPI + LangGraph + MCP 搭 AI Agent,我踩了哪些坑

从"看起来很简单"到"这都能报错?"的真实记录


说实话,一开始我以为这事挺简单的。

FastAPI 写 API 我熟,LangGraph 做过几个 Demo,MCP 协议也看过文档。把这三拼在一起搭个 AI Agent,能有多难?

结果断断续续折腾了两天,我才把第一个能跑通的版本搞出来。

以下是我踩过的坑,以及我是怎么(暂时)爬出来的。


坑一:MCP Server 和 LangGraph 的"时差"

报错现场:

RuntimeError: Event loop is closed

当时我在干嘛:

按照文档,我用 FastMCP 启动了一个本地 MCP Server,然后在 LangGraph 的 Agent 里想调用它。

代码大概长这样:

from mcp import ClientSession, StdioServerParameters

# 启动 MCP Server
server_params = StdioServerParameters(
    command="python",
    args=["mcp_server.py"]
)

# 在 LangGraph 节点里用
async def agent_node(state):
    async with ClientSession(server_params) as session:
        result = await session.call_tool("search", {"query": state["question"]})
        return {"result": result}

看起来没问题对吧?但一跑就报 Event loop is closed

我花了多久发现: 大概40分钟,加两杯咖啡。

问题在哪:

MCP 的 ClientSession 是基于异步上下文管理器的,每次 async with 都会新建连接。但 LangGraph 的节点函数可能被多次调用,而且 FastAPI 本身有自己的事件循环管理机制。

简单说就是:MCP 想独占一个事件循环,但 LangGraph 和 FastAPI 不想给。

我是怎么解决的:

把 MCP Client 的初始化提到 Graph 外部,作为全局变量复用:

# 全局初始化,只建立一次连接
mcp_client = None

async def get_mcp_client():
    global mcp_client
    if mcp_client is None:
        session = await ClientSession.create(server_params)
        await session.initialize()
        mcp_client = session
    return mcp_client

# LangGraph 节点里直接用
async def agent_node(state):
    client = await get_mcp_client()
    result = await client.call_tool("search", {"query": state["question"]})
    return {"result": result}

但这样又引出了第二个坑…


坑二:全局变量在异步环境下的"薛定谔状态"

报错现场:

AttributeError: 'NoneType' object has no attribute 'call_tool'

当时我在干嘛:

兴冲冲地把代码改完,跑单元测试,过了。部署到 FastAPI,请求一进来,又崩了。

我花了多久发现: 这次快一点,20分钟。

问题在哪:

FastAPI 是多 worker 架构(默认用 uvicorn 多进程),每个 worker 有自己的内存空间。我在主进程里初始化的 mcp_client,子进程访问不到。

我是怎么解决的:

contextlib.asynccontextmanager 在 FastAPI 启动时初始化,存到 app state 里:

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时初始化 MCP Client
    session = await ClientSession.create(server_params)
    await session.initialize()
    app.state.mcp_client = session
    yield
    # 关闭时清理
    await session.close()

app = FastAPI(lifespan=lifespan)

# LangGraph 节点里从 app state 获取
async def agent_node(state):
    from fastapi import Request
    request = Request(scope={"app": app})  # 这里其实有问题...
    client = request.app.state.mcp_client
    ...

等等,上面这段代码其实有问题。

因为 LangGraph 节点函数拿不到 FastAPI 的 request 对象。


坑三:LangGraph 节点里的"上下文困境"

当时的情况:

LangGraph 的节点函数签名是这样的:

def node_function(state: State) -> State:
    ...

或者异步版本:

async def async_node_function(state: State) -> State:
    ...

没有 request 参数,没有 app 参数,什么都没有。

那我该怎么拿到 FastAPI 的 app.state.mcp_client

我花了多久发现: 这次比较久,大概1个小时,期间我去翻了 LangGraph 的源码。

问题在哪:

LangGraph 的设计哲学是"状态驱动",节点函数只关心输入状态和输出状态,不关心外部上下文。这本身是对的,但和 FastAPI 的依赖注入机制不太兼容。

我是怎么解决的:

用一个折中方案:自定义 State 类型,把 MCP Client 传进去:

from langgraph.graph import StateGraph
from typing import TypedDict, Any

class AgentState(TypedDict):
    question: str
    result: Any
    mcp_client: Any  # 把 client 塞进 state

# FastAPI handler 里初始化
@app.post("/chat")
async def chat(request: ChatRequest):
    # 从 app state 拿 client
    mcp_client = app.state.mcp_client
    
    # 塞进 initial state
    initial_state = {
        "question": request.question,
        "mcp_client": mcp_client
    }
    
    # 运行 graph
    result = await graph.ainvoke(initial_state)
    return {"answer": result["result"]}

# 节点函数里用
async def agent_node(state: AgentState):
    client = state["mcp_client"]
    result = await client.call_tool("search", {"query": state["question"]})
    return {"result": result}

这样虽然能跑,但总觉得怪怪的——MCP Client 不该属于业务状态,它是个基础设施。

不过先这样吧,能跑就行。


坑四:MCP Tool 的序列化玄学

报错现场:

TypeError: Object of type MyCustomClass is not JSON serializable

当时我在干嘛:

MCP Server 返回的结果里,有些字段是自定义类型(比如 datetime、Pydantic 模型)。我想直接把这个结果塞进 LangGraph 的 State,然后返回给前端。

我花了多久发现: 10分钟,因为错误信息挺明显的。

问题在哪:

LangGraph 的 State 需要能被 JSON 序列化(因为它内部可能做持久化或检查点),但 MCP 返回的数据结构可能包含非标准类型。

我是怎么解决的:

写了个简单的转换函数,把 MCP 结果转成纯 dict:

from datetime import datetime

def serialize_mcp_result(result):
    """把 MCP 结果转成 JSON 可序列化的格式"""
    if isinstance(result, dict):
        return {k: serialize_mcp_result(v) for k, v in result.items()}
    elif isinstance(result, list):
        return [serialize_mcp_result(item) for item in result]
    elif isinstance(result, datetime):
        return result.isoformat()
    elif hasattr(result, 'model_dump'):  # Pydantic v2
        return result.model_dump()
    elif hasattr(result, 'dict'):  # Pydantic v1
        return result.dict()
    else:
        return result

# 节点函数里用
async def agent_node(state: AgentState):
    client = state["mcp_client"]
    raw_result = await client.call_tool("search", {"query": state["question"]})
    clean_result = serialize_mcp_result(raw_result)
    return {"result": clean_result}

坑五:并发请求时的"资源争抢"

报错现场:

没有报错,但响应时间越来越慢,最后直接卡死。

当时我在干嘛:

压测。我用 wrk 模拟了50个并发请求,想看看性能怎么样。

结果前几个请求还挺快(几百毫秒),后面就越来越慢,最后直接超时。

我花了多久发现: 这次没有"发现",只有"意识到出大事了"。

问题在哪:

MCP Client 和 Server 之间用的是 STDIO 传输(标准输入输出),这种传输方式不支持并发。所有请求都在排队等同一个连接。

我是怎么解决的:

换成 SSE(Server-Sent Events)传输,或者每个请求独立启动一个 MCP Server 进程。

我选了后者,因为改动小:

import asyncio
from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor(max_workers=4)

async def agent_node(state: AgentState):
    # 在独立进程里运行 MCP 调用
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor,
        run_mcp_in_subprocess,
        state["question"]
    )
    return {"result": result}

def run_mcp_in_subprocess(question):
    """在子进程里运行 MCP 调用"""
    import subprocess
    import json
    
    result = subprocess.run(
        ["python", "mcp_call.py", question],
        capture_output=True,
        text=True
    )
    return json.loads(result.stdout)

当然,这个方案也有问题(进程间通信开销大),但至少能并发跑了。


现在能跑了,但…

折腾完这五个坑,我终于有了一个能跑通的版本:

  • FastAPI 提供 HTTP API
  • LangGraph 管理 Agent 流程
  • MCP 提供外部工具能力

代码大概长这样(简化版):

from fastapi import FastAPI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Any
import asyncio

class AgentState(TypedDict):
    question: str
    result: Any

# LangGraph 节点
async def agent_node(state: AgentState):
    # 这里是 MCP 调用逻辑
    result = await call_mcp_tool(state["question"])
    return {"result": result}

# 构建 Graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)
graph = workflow.compile()

# FastAPI 应用
app = FastAPI()

@app.post("/chat")
async def chat(question: str):
    result = await graph.ainvoke({"question": question})
    return result

看起来挺简洁的对吧?但我知道这只是"看起来"。

真实的代码里还有各种异常处理、重试逻辑、超时控制、日志打点…


一点感想

说实话,FastAPI + LangGraph + MCP 这个组合,文档看起来都很清晰,但真拼在一起跑,坑还是不少。

主要问题我觉得是:

  1. 异步编程的复杂性 - 三个框架都有自己的事件循环逻辑,混在一起容易打架
  2. 上下文传递 - LangGraph 的"无上下文"设计和 FastAPI 的依赖注入不太兼容
  3. MCP 的传输限制 - STDIO 不支持并发,SSE 又增加复杂度
  4. 序列化问题 - 数据在各个组件之间流转时,类型转换很麻烦

但我还是觉得这套组合有前途。

如果你也在折腾这个组合,希望这篇踩坑记录能帮你省点时间。

如果你有更好的解决方案,欢迎交流。


GitHub 仓库: https://github.com/YaBoom/fastapi-langgraph-mcp-zyt

(代码是实验性的,README 里写了 TODO 和已知问题)

写作风格:踩坑实录型

  • 完整记录错误代码→报错→调试→解决的过程
  • 包含真实的时间线和心理活动
  • 有困惑、有弯路、有不完美的妥协方案
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐