从0到1实战:LangChain + Ollama + FastAPI 部署可运行的私有AI助手

第4/8篇|LangChain学习实录系列|终端敲出来的每行命令都经过验证


问题背景:HTTP流式调用链断裂的真实现场

前三篇已验证单次交互能力:第1篇跑通LLMChain基础调用,第2篇厘清PromptTemplate变量注入逻辑,第3篇实现PDF/Markdown知识库RAG。但所有输出均在Python进程内完成——无HTTP入口、无并发处理、无流式chunk分发机制。真实集成中,前端发出的POST /chat请求会卡死或返回500,典型日志为:

TypeError: object of type 'AsyncIterator' is not subscriptable

这不是模型加载失败,而是FastAPI → LangChain → Ollama三者在请求接收→历史状态绑定→SSE流生成→客户端逐chunk消费这一闭环中,某一层对返回类型的隐式假设被打破。本篇只解决一个确定性工程目标:让本地Ollama模型通过FastAPI暴露为符合SSE基本规范的HTTP流式接口,且所有环节可定位、可回滚、可复现。

约束条件明确:

  • 模型必须经Ollama REST API(http://localhost:11434/api/chat)调用,禁用transformers直连;
  • 会话状态暂存内存(InMemoryChatMessageHistory),但需明确定义其生命周期边界;
  • curl -N与浏览器fetch()必须能实时读取data:块,不可整包缓存;
  • 所有命令在macOS M2/M3、Ubuntu 22.04、Windows WSL2(Ubuntu 22.04)三端实测通过。

核心依赖:FastAPI→LangChain→Ollama的时序关系

三者非并列,而是存在强时序耦合的管道:

  • FastAPI层:必须在接收到请求后立即返回StreamingResponse,否则fetch()在Chrome中60s强制断连(非超时设置导致,是浏览器策略);
  • LangChain层RunnableWithMessageHistory.astream()内部调用ChatOllama._stream(),后者需将HumanMessage准确序列化为Ollama要求的{"role":"user","content":"..."}结构——缺role字段即触发Ollama 400;
  • Ollama层/api/chat?stream=true返回SSE格式(data: {"message":{"content":"a"}}\n\n),ChatOllama必须逐行解析、跳过空行、JSON decode失败时不中断流。

通俗来说:把InMemoryChatMessageHistory想象成进程内的便签纸,Uvicorn worker重启,便签就丢了;而多worker模式下,每个worker持独立字典,session_id无法跨进程路由——这直接导致会话历史丢失。


可执行步骤:从ollama servecurl -N看到首chunk

步骤1:锁定Ollama模型版本,规避latest陷阱

ollama list显示的latest可能指向未验证的nightly版。实测llama3:8b(SHA256: d5e095c145e3...)在16GB RAM设备稳定,llama3别名则可能拉取70B模型引发OOM。

# 前台启动便于观察日志
ollama serve

# 新终端拉取确定版本
ollama pull llama3:8b

# 验证基础API(非流式,排除网络/认证问题)
curl -s http://localhost:11434/api/chat \
  -H "Content-Type: application/json" \
  -d '{
    "model": "llama3:8b",
    "messages": [{"role":"user","content":"hi"}],
    "stream": false
  }' | jq -r '.message.content' | head -c 20
# 应输出类似 "Hello! How can I help" 的前20字符

步骤2:编写最小可行FastAPI服务(含SSE兼容修复)

app.py关键点:显式设置SSE header、astream返回类型判别、num_ctx=2048防context溢出。

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_community.chat_models import ChatOllama
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain.memory import InMemoryChatMessageHistory
from typing import Dict, List, Any, Iterator

app = FastAPI()

# WARNING: 多worker下InMemoryChatMessageHistory失效,生产必须替换为Redis-backed
store: Dict[str, InMemoryChatMessageHistory] = {}

def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryChatMessageHistory()
    return store[session_id]

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个技术助手,回答需简洁准确,不虚构信息。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}"),
])

llm = ChatOllama(
    model="llama3:8b",
    temperature=0.3,
    num_ctx=2048,  # 防止Ollama报context length exceeded
    streaming=True,
)

chain = prompt | llm
with_message_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

@app.post("/chat")
async def chat_endpoint(request: Request):
    data = await request.json()
    session_id = data.get("session_id")
    user_input = data.get("input")

    if not session_id or not user_input:
        raise HTTPException(400, "missing session_id or input")

    async def event_generator():
        try:
            # 小心:不传config会导致历史记录丢失!
            async for chunk in with_message_history.astream(
                {"input": user_input},
                config={"configurable": {"session_id": session_id}},
            ):
                # LangChain v0.1.x中chunk可能是str或AIMessage
                content = chunk.content if hasattr(chunk, "content") else str(chunk)
                if content.strip():
                    yield f"data: {content}\n\n"
        except Exception as e:
            yield f"data: [ERROR] {str(e)}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
    )

步骤3:启动并验证流式响应

pip install "langchain-community>=0.2.10" "fastapi>=0.111.0" uvicorn
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1 --reload

终端验证(模拟前端fetch行为):

curl -N http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{"session_id":"test123","input":"Python中如何安全地读取CSV文件?"}' \
  | grep -E "^data:" | head -n 5

排错与失败回滚:边界条件与权衡说明

边界条件:单worker内存存储的生命周期定义

InMemoryChatMessageHistory实例生命周期 = Uvicorn worker进程存活时间。--workers 1时,所有session共享同一store字典;--workers 4时,每个worker持独立storesession_id无法跨进程路由。这是开发期明确的性能权衡:牺牲横向扩展性,换取调试确定性。若需多worker支持,必须引入外部存储(如第6篇SQLite方案),否则session_id在负载均衡下必然丢失。

坑1:ChatOllama._stream()消息序列化失败 → Ollama 400

现象astream调用返回400,日志显示"message" field is required
回滚动作:临时降级为streaming=False,改用invoke验证是否纯格式问题

llm = ChatOllama(model="llama3:8b", temperature=0.3, streaming=False)
# 替换astream为invoke,确认是否返回完整字符串

invoke成功而astream失败,则锁定langchain-community==0.2.9(该版本_stream无此bug)。

坑2:StreamingResponse header缺失 → fetch()无法解析SSE

现象:浏览器控制台报Failed to construct 'EventSource'
原因:缺少Cache-Control: no-cacheConnection: keep-alive
修复:确保StreamingResponse显式传入headers参数,不可依赖默认值。


总结:什么是“可运行”的工程定义

本文交付的是一个可嵌入CI/CD、可被curl/fetch直连、每一层失败点均可定位的服务:

  • ✅ Ollama API调用路径关键路径可控(curl验证→ChatOllama源码补丁→astream输出校验);
  • ✅ FastAPI流式响应符合SSE基本规范(data:前缀+双换行+必要header);
  • ✅ 会话隔离边界清晰(单worker内存有效,多worker需外部存储);
  • ⚠️ 下一步收敛:第6篇将用SQLite实现ChatMessageHistory持久化,解决worker重启后历史丢失问题。

技术选型水位线:ChatOllama当前是LangChain对接Ollama的唯一成熟适配器,但其streaming分支维护滞后,建议锁版本并监控langchain-community PR #1247(修复_stream消息序列化)。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐