从0到1实战:LangChain+Ollama+FastAPI部署可运行的私有AI助手
实例生命周期 = Uvicorn worker进程存活时间。时,所有session共享同一store字典;时,每个worker持独立storesession_id无法跨进程路由。这是开发期明确的性能权衡:牺牲横向扩展性,换取调试确定性。若需多worker支持,必须引入外部存储(如第6篇SQLite方案),否则session_id在负载均衡下必然丢失。本文交付的是一个可嵌入CI/CD、可被curlf
从0到1实战:LangChain + Ollama + FastAPI 部署可运行的私有AI助手
第4/8篇|LangChain学习实录系列|终端敲出来的每行命令都经过验证
问题背景:HTTP流式调用链断裂的真实现场
前三篇已验证单次交互能力:第1篇跑通LLMChain基础调用,第2篇厘清PromptTemplate变量注入逻辑,第3篇实现PDF/Markdown知识库RAG。但所有输出均在Python进程内完成——无HTTP入口、无并发处理、无流式chunk分发机制。真实集成中,前端发出的POST /chat请求会卡死或返回500,典型日志为:
TypeError: object of type 'AsyncIterator' is not subscriptable
这不是模型加载失败,而是FastAPI → LangChain → Ollama三者在请求接收→历史状态绑定→SSE流生成→客户端逐chunk消费这一闭环中,某一层对返回类型的隐式假设被打破。本篇只解决一个确定性工程目标:让本地Ollama模型通过FastAPI暴露为符合SSE基本规范的HTTP流式接口,且所有环节可定位、可回滚、可复现。
约束条件明确:
- 模型必须经Ollama REST API(
http://localhost:11434/api/chat)调用,禁用transformers直连; - 会话状态暂存内存(
InMemoryChatMessageHistory),但需明确定义其生命周期边界; curl -N与浏览器fetch()必须能实时读取data:块,不可整包缓存;- 所有命令在macOS M2/M3、Ubuntu 22.04、Windows WSL2(Ubuntu 22.04)三端实测通过。
核心依赖:FastAPI→LangChain→Ollama的时序关系
三者非并列,而是存在强时序耦合的管道:
- FastAPI层:必须在接收到请求后立即返回
StreamingResponse,否则fetch()在Chrome中60s强制断连(非超时设置导致,是浏览器策略); - LangChain层:
RunnableWithMessageHistory.astream()内部调用ChatOllama._stream(),后者需将HumanMessage准确序列化为Ollama要求的{"role":"user","content":"..."}结构——缺role字段即触发Ollama 400; - Ollama层:
/api/chat?stream=true返回SSE格式(data: {"message":{"content":"a"}}\n\n),ChatOllama必须逐行解析、跳过空行、JSON decode失败时不中断流。
通俗来说:把InMemoryChatMessageHistory想象成进程内的便签纸,Uvicorn worker重启,便签就丢了;而多worker模式下,每个worker持独立字典,session_id无法跨进程路由——这直接导致会话历史丢失。
可执行步骤:从ollama serve到curl -N看到首chunk
步骤1:锁定Ollama模型版本,规避latest陷阱
ollama list显示的latest可能指向未验证的nightly版。实测llama3:8b(SHA256: d5e095c145e3...)在16GB RAM设备稳定,llama3别名则可能拉取70B模型引发OOM。
# 前台启动便于观察日志
ollama serve
# 新终端拉取确定版本
ollama pull llama3:8b
# 验证基础API(非流式,排除网络/认证问题)
curl -s http://localhost:11434/api/chat \
-H "Content-Type: application/json" \
-d '{
"model": "llama3:8b",
"messages": [{"role":"user","content":"hi"}],
"stream": false
}' | jq -r '.message.content' | head -c 20
# 应输出类似 "Hello! How can I help" 的前20字符
步骤2:编写最小可行FastAPI服务(含SSE兼容修复)
app.py关键点:显式设置SSE header、astream返回类型判别、num_ctx=2048防context溢出。
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_community.chat_models import ChatOllama
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain.memory import InMemoryChatMessageHistory
from typing import Dict, List, Any, Iterator
app = FastAPI()
# WARNING: 多worker下InMemoryChatMessageHistory失效,生产必须替换为Redis-backed
store: Dict[str, InMemoryChatMessageHistory] = {}
def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
if session_id not in store:
store[session_id] = InMemoryChatMessageHistory()
return store[session_id]
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个技术助手,回答需简洁准确,不虚构信息。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
llm = ChatOllama(
model="llama3:8b",
temperature=0.3,
num_ctx=2048, # 防止Ollama报context length exceeded
streaming=True,
)
chain = prompt | llm
with_message_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
@app.post("/chat")
async def chat_endpoint(request: Request):
data = await request.json()
session_id = data.get("session_id")
user_input = data.get("input")
if not session_id or not user_input:
raise HTTPException(400, "missing session_id or input")
async def event_generator():
try:
# 小心:不传config会导致历史记录丢失!
async for chunk in with_message_history.astream(
{"input": user_input},
config={"configurable": {"session_id": session_id}},
):
# LangChain v0.1.x中chunk可能是str或AIMessage
content = chunk.content if hasattr(chunk, "content") else str(chunk)
if content.strip():
yield f"data: {content}\n\n"
except Exception as e:
yield f"data: [ERROR] {str(e)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
步骤3:启动并验证流式响应
pip install "langchain-community>=0.2.10" "fastapi>=0.111.0" uvicorn
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1 --reload
终端验证(模拟前端fetch行为):
curl -N http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"session_id":"test123","input":"Python中如何安全地读取CSV文件?"}' \
| grep -E "^data:" | head -n 5
排错与失败回滚:边界条件与权衡说明
边界条件:单worker内存存储的生命周期定义
InMemoryChatMessageHistory实例生命周期 = Uvicorn worker进程存活时间。--workers 1时,所有session共享同一store字典;--workers 4时,每个worker持独立store,session_id无法跨进程路由。这是开发期明确的性能权衡:牺牲横向扩展性,换取调试确定性。若需多worker支持,必须引入外部存储(如第6篇SQLite方案),否则session_id在负载均衡下必然丢失。
坑1:ChatOllama._stream()消息序列化失败 → Ollama 400
现象:astream调用返回400,日志显示"message" field is required
回滚动作:临时降级为streaming=False,改用invoke验证是否纯格式问题
llm = ChatOllama(model="llama3:8b", temperature=0.3, streaming=False)
# 替换astream为invoke,确认是否返回完整字符串
若invoke成功而astream失败,则锁定langchain-community==0.2.9(该版本_stream无此bug)。
坑2:StreamingResponse header缺失 → fetch()无法解析SSE
现象:浏览器控制台报Failed to construct 'EventSource'
原因:缺少Cache-Control: no-cache或Connection: keep-alive
修复:确保StreamingResponse显式传入headers参数,不可依赖默认值。
总结:什么是“可运行”的工程定义
本文交付的是一个可嵌入CI/CD、可被curl/fetch直连、每一层失败点均可定位的服务:
- ✅ Ollama API调用路径关键路径可控(
curl验证→ChatOllama源码补丁→astream输出校验); - ✅ FastAPI流式响应符合SSE基本规范(
data:前缀+双换行+必要header); - ✅ 会话隔离边界清晰(单worker内存有效,多worker需外部存储);
- ⚠️ 下一步收敛:第6篇将用SQLite实现
ChatMessageHistory持久化,解决worker重启后历史丢失问题。
技术选型水位线:
ChatOllama当前是LangChain对接Ollama的唯一成熟适配器,但其streaming分支维护滞后,建议锁版本并监控langchain-communityPR #1247(修复_stream消息序列化)。
更多推荐


所有评论(0)