从零到一:用LangChain搭建房产RAG系统
这是我的第一个完整RAG系统Demo。用一周时间从零开始,搭建了一个房产推荐AI系统。系统涉及向量表示学习、检索增强生成(RAG)、提示工程、对话管理、LCEL编程范式等核心知识。核心流程:用户提问 → 向量化 → 检索相关房源 → LLM生成答案 → 保存对话历史。技术栈:LangChain + Chroma + DashScope + FastAPI + Streamlit。
从零到一:用LangChain搭建房产RAG系统
我用一周时间从零开始搭建了一个完整的房产推荐AI系统。这不仅是一个Demo,更是一个展示RAG核心原理的实战项目。
前言
这个项目涉及的知识点包括:向量表示学习、检索增强生成(RAG)、LCEL编程范式、多轮对话管理、流式输出优化等。
本文不仅记录实现过程,更重要的是解释每个环节背后的大模型原理,以及为什么这样设计。
第一部分:RAG基础
1. 大模型时代的新范式:RAG
在大模型出现之前,我们用搜索引擎解决信息检索问题。大模型改变了这一切。
三种方式的对比:
传统方式:
用户查询 → 搜索引擎 → 返回链接 → 用户自己阅读
大模型方式(容易幻觉):
用户查询 → LLM直接生成答案 ❌ 可能不准确
RAG方式(最佳实践):
用户查询 → 检索相关文档 → 结合文档让LLM生成答案 ✅ 准确可靠
为什么RAG更好?
- 解决知识过时:LLM的训练数据有截止日期,RAG可以检索最新信息
- 减少幻觉:有文档支撑,LLM不会凭空编造
- 可解释性:用户可以看到检索到的房源,知道答案从何而来
- 无需重训:只需更新知识库,不用重新训练模型
这个系统就是RAG的一个实践案例。核心思想是:让LLM基于真实数据生成答案,而不是凭空幻觉。
2. 系统架构设计
┌─────────────────────────────────────────────────────────┐
│ 用户交互层 │
│ FastAPI 后端 + Streamlit 前端 │
│ (处理HTTP请求,管理会话,支持流式输出) │
├─────────────────────────────────────────────────────────┤
│ 业务逻辑层 │
│ LangChain LCEL 链 + 对话记忆管理 │
│ (编排LLM、检索器、提示词的执行流程) │
│ (支持两种模式:RAG模式 + 普通对话模式) │
├─────────────────────────────────────────────────────────┤
│ 数据层 │
│ 向量存储(FAISS) + 嵌入模型(DashScope) │
│ (存储房产数据的向量表示,支持高效语义搜索) │
└─────────────────────────────────────────────────────────┘
核心数据流:
用户提问(带user_id)
↓
获取用户对话历史
↓
选择模式:
├─ RAG模式:检索房源数据库 + 历史上下文
└─ 普通对话模式:直接调用LLM + 历史记忆
↓
[向量化] 将问题转换为向量表示
↓
[检索] 在向量空间中找最相似的房源(Top-5)
↓
[增强] 将检索结果 + 对话历史作为上下文
↓
[生成] LLM基于上下文生成答案
↓
[流式/一次性] 返回答案给用户
↓
[记忆] 保存对话到用户历史
第二部分:核心实现
3. 向量存储与检索
为什么用FAISS?
- Meta开源的向量搜索库
- 性能优秀,支持大规模数据
- 支持多种距离度量(余弦相似度、欧氏距离等)
- 可以持久化到磁盘
检索器实现(retriever.py):
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import DashScopeEmbeddings
from typing import List, Dict
import os
from dotenv import load_dotenv
load_dotenv()
class HouseRetriever:
"""房产检索器 - 从向量数据库检索相关房源"""
def __init__(self, persist_directory: str = "data/embeddings/faiss"):
print(f"初始化嵌入模型...")
# 初始化DashScope的Embedding模型
self.embeddings = DashScopeEmbeddings(
model="text-embedding-v1",
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
print(f"加载向量数据库: {persist_directory}")
# 加载已有的FAISS向量数据库
self.vectorstore = FAISS.load_local(
persist_directory,
self.embeddings,
allow_dangerous_deserialization=True
)
print(f"创建检索器...")
# 创建检索器,默认返回Top-5
self.retriever = self.vectorstore.as_retriever(
search_kwargs={"k": 5}
)
print(f"检索器初始化完成")
def retrieve(self, query: str, k: int = 5) -> List[Dict]:
"""
检索相关房源
Args:
query: 用户查询(可能包含对话历史)
k: 返回的结果数量
Returns:
检索结果列表,每个结果包含content和metadata
"""
# 更新k值
self.retriever.search_kwargs["k"] = k
# 执行检索
docs = self.retriever.invoke(query)
# 格式化结果
results = []
for doc in docs:
results.append({
"content": doc.page_content,
"metadata": doc.metadata
})
return results
def format_results(self, results: List[Dict]) -> str:
"""
格式化检索结果为文本
这一步很关键:将结构化的检索结果转换为自然语言文本,
这样LLM才能理解并基于这些信息生成答案。
"""
formatted = []
for i, result in enumerate(results, 1):
meta = result['metadata']
formatted.append(
f"{i}. {meta['title']}\n"
f" 价格: {meta['price']}万\n"
f" 面积: {meta['area']}平米\n"
f" 位置: {meta['location']}"
)
return "\n\n".join(formatted)
关键点解析:
- DashScope Embedding:将文本转换为768维向量
- FAISS.load_local():加载已有的向量数据库(第一次需要初始化)
- as_retriever():将向量存储转换为检索器
- format_results():这一步很关键,将结构化数据转换为自然语言
4. RAG链的构建
为什么用LCEL?
LCEL(LangChain Expression Language)是LangChain的现代语法。相比传统的LLMChain:
- 代码更简洁(用
|连接组件) - 支持流式输出(
astream()) - 支持异步调用(
ainvoke()) - 支持批量处理(
batch())
RAG链实现(rag_chain.py):
from langchain_community.llms import Tongyi
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from .retriever import HouseRetriever
import os
from dotenv import load_dotenv
load_dotenv()
class HouseRAGChain:
"""房产RAG链 - 完整的检索增强生成流程"""
def __init__(self):
print("初始化RAG链...")
# 初始化LLM(使用DashScope的通义千问)
self.llm = Tongyi(
model="qwen-plus",
temperature=0.7,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
# 初始化检索器
self.retriever = HouseRetriever()
# 创建Prompt模板
# 注意:{context} 和 {question} 是占位符,会被动态替换
self.prompt = ChatPromptTemplate.from_template("""
你是一个专业的房产推荐助手。基于以下检索到的房源信息,回答用户的问题。
检索到的房源:
{context}
用户问题:{question}
请给出专业、友好的回答,如果检索结果中有合适的房源,请推荐给用户。
""")
# 创建RAG链 - 这是LCEL的核心
# 流程:
# 1. 接收 {"question": "..."} 输入
# 2. 执行 lambda 函数检索相关房源
# 3. 将 context 和 question 传入 prompt
# 4. 发送给 LLM 生成答案
# 5. 用 StrOutputParser 解析输出
self.chain = (
{
# 这一步是RAG的核心:根据问题检索相关文档
"context": lambda x: self._format_context(x["question"]),
# 问题直接传递
"question": RunnablePassthrough()
}
| self.prompt # 将context和question填入prompt
| self.llm # 发送给LLM
| StrOutputParser() # 解析输出为字符串
)
print("RAG链初始化完成")
def _format_context(self, question: str) -> str:
"""
检索并格式化上下文
这一步执行RAG的"检索"和"增强"步骤:
1. 根据问题检索相关房源
2. 格式化为自然语言文本
"""
results = self.retriever.retrieve(question, k=3)
return self.retriever.format_results(results)
def ask(self, question: str) -> str:
"""
问答接口
Args:
question: 用户问题(可能包含对话历史)
Returns:
LLM生成的答案
"""
return self.chain.invoke({"question": question})
LCEL链的工作流程:
输入: {"question": "我想在朝阳区找一个三居室"}
↓
lambda x: self._format_context(x["question"])
↓
执行检索:
- 向量化问题
- 在FAISS中搜索
- 返回Top-3房源
↓
格式化为文本:
"1. 朝阳区常营三居室 - 580万 - 地铁6号线
2. 朝阳区建国路三居室 - 620万 - 地铁1号线
..."
↓
填入Prompt:
"你是一个专业的房产推荐助手。基于以下检索到的房源信息...
检索到的房源:
1. 朝阳区常营三居室...
用户问题:我想在朝阳区找一个三居室"
↓
发送给LLM生成答案
↓
输出: "根据您的需求,我为您找到了以下房源..."
与简单链的对比:
# ❌ 简单链(没有检索)
simple_chain = prompt | llm | output_parser
# 问题:LLM凭空生成答案,容易幻觉
# ✅ RAG链(有检索)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| output_parser
)
# 优势:LLM基于真实数据生成答案,准确可靠
5. 对话记忆管理
为什么需要记忆?
多轮对话需要记住历史。例如:
- 用户第1轮:“我想在朝阳区找房子”
- 用户第2轮:“那个房子的具体位置在哪里?”
如果没有记忆,系统不知道"那个房子"指的是什么。
记忆管理实现(memory_manager.py):
from langchain.schema import HumanMessage, AIMessage
from typing import Dict, List
class MemoryManager:
"""对话记忆管理器 - 支持多用户并发"""
def __init__(self):
# 用字典存储每个用户的对话历史
# key: user_id, value: 对话消息列表
self.user_memories: Dict[str, List] = {}
def get_history(self, user_id: str) -> List:
"""
获取用户的对话历史
Args:
user_id: 用户ID
Returns:
对话历史列表(HumanMessage和AIMessage交替)
"""
if user_id not in self.user_memories:
self.user_memories[user_id] = []
return self.user_memories[user_id]
def add_message(self, user_id: str, human_msg: str, ai_msg: str):
"""
添加一轮对话
Args:
user_id: 用户ID
human_msg: 用户消息
ai_msg: AI回复
"""
history = self.get_history(user_id)
history.append(HumanMessage(content=human_msg))
history.append(AIMessage(content=ai_msg))
# 只保留最近20条消息(10轮对话)
# 这是WindowMemory的实现:节省内存,保持上下文
if len(history) > 20:
self.user_memories[user_id] = history[-20:]
def clear_history(self, user_id: str):
"""清空用户历史"""
if user_id in self.user_memories:
del self.user_memories[user_id]
# 全局实例
memory_manager = MemoryManager()
为什么这样设计?
- 多用户支持:用字典管理不同用户的历史
- 内存优化:只保留最近10轮对话(20条消息)
- 简单高效:不需要额外的LLM调用或数据库
- 生产就绪:已在项目中验证
对比不同的Memory类型:
BufferMemory:保存所有对话
优点:完整历史
缺点:占内存
适用:短对话(<10轮)
WindowMemory(我们的实现):保留最近K轮
优点:节省内存,保持上下文
缺点:丢失早期信息
适用:长对话(10-50轮)
SummaryMemory:LLM总结历史
优点:最省内存
缺点:需要额外LLM调用
适用:超长对话(>50轮)
第三部分:API设计与两种模式
6. 两种模式:RAG vs 普通对话
这个系统的创新之处在于支持两种模式,可以根据场景灵活选择。
模式1:RAG模式(use_rag=true)
- 检索房源数据库
- 基于真实数据回答
- 准确性高,实时性好
- 适合房源查询
模式2:普通对话模式(use_rag=false)
- 直接调用LLM
- 基于LLM知识回答
- 灵活性高,响应快
- 适合通用咨询
对比表:
| 特性 | RAG模式 | 普通对话模式 |
|------|--------|-----------|
| 数据来源 | 房源数据库 | LLM知识库 |
| 准确性 | ⭐⭐⭐⭐⭐ 高 | ⭐⭐⭐ 中 |
| 实时性 | ⭐⭐⭐⭐⭐ 高 | ⭐⭐ 低 |
| 灵活性 | ⭐⭐ 低 | ⭐⭐⭐⭐⭐ 高 |
| 响应速度 | 中等 | 快 |
| 适用场景 | 房源查询 | 通用咨询 |
| 示例问题 | "朝阳区三居室" | "房产投资建议" |
使用场景:
场景1:用户查询房源
用户问题:"我想在朝阳区找一个三居室,预算600万"
选择:use_rag=true(RAG模式)
原因:
- 需要准确的房源信息
- 需要实时的房价数据
- 用户关心具体房源
流程:
1. 检索房源数据库 → 找到符合条件的房源
2. 格式化房源信息 → 拼接到Prompt
3. LLM生成回答 → 推荐具体房源
4. 保存到记忆 → 下次对话可以参考
场景2:用户咨询房产知识
用户问题:"房产投资有什么建议?"
选择:use_rag=false(普通对话模式)
原因:
- 不需要查询数据库
- 需要灵活的回答
- 用户要求通用建议
流程:
1. 直接调用LLM
2. LLM基于知识库回答
3. 保存到记忆
4. 下次对话可以参考
7. 完整的API实现
FastAPI实现(main.py):
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from src.rag.rag_chain import HouseRAGChain
from src.utils.memory_manager import memory_manager
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.llms import Tongyi
from langchain.schema.output_parser import StrOutputParser
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI(title="房产AI助手 v1.0")
# 初始化RAG链
rag_chain = HouseRAGChain()
# 初始化普通对话链(带Memory)
llm = Tongyi(
model="qwen-plus",
temperature=0.7,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
# 创建带历史的对话模板
chat_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的房产推荐助手。"),
MessagesPlaceholder(variable_name="history"), # 对话历史
("human", "{question}")
])
# LCEL链:prompt | llm | output_parser
chat_chain = chat_prompt | llm | StrOutputParser()
# 数据模型
class Question(BaseModel):
question: str
user_id: str = "default"
use_rag: bool = True # 是否使用RAG
class Answer(BaseModel):
answer: str
source: str # "rag" 或 "chat"
@app.post("/chat", response_model=Answer)
async def chat(question: Question):
"""
智能问答接口(自动选择RAG或普通对话)
Args:
question: 用户问题
user_id: 用户ID(用于多用户管理)
use_rag: 是否使用RAG模式
Returns:
answer: AI回答
source: 回答来源("rag"或"chat")
"""
try:
# 获取用户历史记忆
history = memory_manager.get_history(question.user_id)
if question.use_rag:
# 模式1:RAG模式
# 构建包含历史的prompt
history_text = "\n".join([
f"用户: {msg.content}" if msg.__class__.__name__ == 'HumanMessage'
else f"助手: {msg.content}"
for msg in history[-4:] # 只用最近2轮对话
])
# 增强问题:加入对话历史
enhanced_question = question.question
if history_text:
enhanced_question = f"对话历史:\n{history_text}\n\n当前问题:{question.question}"
# 调用RAG链
answer = rag_chain.ask(enhanced_question)
source = "rag"
else:
# 模式2:普通对话模式
# 直接调用LLM(带历史)
answer = chat_chain.invoke({
"question": question.question,
"history": history
})
source = "chat"
# 保存到记忆
memory_manager.add_message(
question.user_id,
question.question,
answer
)
return Answer(answer=answer, source=source)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/clear/{user_id}")
async def clear_history(user_id: str):
"""清空用户对话历史"""
memory_manager.clear_history(user_id)
return {"message": f"已清空用户 {user_id} 的对话历史"}
@app.get("/")
async def root():
return {
"message": "房产AI助手API v1.0",
"features": ["RAG检索", "多轮对话", "智能推荐"]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
关键点解析:
- 两种模式的选择:通过
use_rag参数灵活切换 - 对话历史增强:将历史信息加入到问题中,让LLM理解上下文
- 多用户支持:通过
user_id管理不同用户的历史 - 错误处理:完整的异常捕获和HTTP错误返回
8. 流式输出:提升用户体验
为什么需要流式输出?
- 用户体验好:看到实时输出,不觉得卡
- 降低延迟感:即使总时间相同,感觉更快
- 可以中断:用户不满意可以立即停止
- 节省内存:不需要一次性加载全部
流式 vs 一次性返回的对比:
场景:用户问"北京房价怎么样?",AI需要5秒生成答案
一次性返回:
时间轴:
0s -------- 5s
等待... 返回完整答案
用户感受:感觉很慢,像卡住了
流式返回:
时间轴:
0s - 1s - 2s - 3s - 4s - 5s
北 京 房 价 近 年 来 呈 现 稳 定 态 势...
用户感受:看到实时输出,感觉很快
虽然总时间相同(都是5秒),但用户体验完全不同!
性能对比:
| 指标 | 一次性返回 | 流式返回 |
|------|----------|--------|
| 首字延迟 | 5秒 | 0秒(立即显示) |
| 用户感受 | 慢 | 快 |
| 可中断性 | 不能 | 可以 |
| 内存占用 | 高 | 低 |
| 实现复杂度 | 简单 | 中等 |
| 适用场景 | 简单查询 | Web应用 |
流式输出实现:
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(question: Question):
"""
流式问答接口(推荐用于Web应用)
优势:
1. 用户立即看到输出
2. 可以中断
3. 内存占用少
"""
async def generate():
try:
# 获取用户历史
history = memory_manager.get_history(question.user_id)
# 异步流式输出
async for chunk in chat_chain.astream({
"question": question.question,
"history": history
}):
yield chunk
except Exception as e:
yield f"Error: {str(e)}"
return StreamingResponse(generate(), media_type="text/event-stream")
关键点:
- astream():异步流式输出
- yield chunk:逐块返回数据
- StreamingResponse:FastAPI的流式响应
- text/event-stream:标准的流式传输格式
客户端测试:
import requests
# 一次性返回
response = requests.post(
"http://localhost:8000/chat",
json={"question": "朝阳区有什么好房子?", "user_id": "user_001"}
)
print(response.json())
# 流式返回
response = requests.post(
"http://localhost:8000/chat/stream",
json={"question": "朝阳区有什么好房子?", "user_id": "user_001"},
stream=True
)
for chunk in response.iter_content(decode_unicode=True):
print(chunk, end="", flush=True)
第四部分:实战指南
9. 快速开始
环境配置:
# 1. 创建虚拟环境
python -m venv venv
# 2. 激活虚拟环境
# Windows
venv\Scripts\activate
# 3. 安装依赖
pip install -r requirements.txt
# 4. 配置API密钥
cp .env.example .env
# 编辑.env,添加DASHSCOPE_API_KEY
初始化向量存储:
# 第一次运行需要初始化FAISS向量数据库
python src/utils/data_generator.py # 生成房产数据
python src/utils/data_cleaner.py # 清洗数据
python src/rag/vector_store.py # 创建向量存储
启动服务:
# 启动API
python src/api/main.py
# 新开终端,启动前端
streamlit run src/frontend/app.py
测试API:
# 测试RAG模式
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{
"question": "我想在朝阳区找一个三居室,预算600万",
"user_id": "user_001",
"use_rag": true
}'
# 测试普通对话模式
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{
"question": "房产投资有什么建议?",
"user_id": "user_001",
"use_rag": false
}'
# 测试流式输出
curl -X POST "http://localhost:8000/chat/stream" \
-H "Content-Type: application/json" \
-d '{
"question": "朝阳区房价怎么样?",
"user_id": "user_001"
}'
10. 常见问题解答
Q1: 向量检索不准确怎么办?
A: 可以尝试以下方案:
- 调整检索的k值(返回更多结果)
- 优化数据的search_text字段
- 使用重排序(Reranking)提高准确性
- 使用混合搜索(BM25 + 向量)
Q2: 如何处理长对话?
A: 当前实现保留最近10轮对话。如果需要更长的对话,可以:
- 增加保留的轮数(修改memory_manager.py中的20)
- 使用摘要记忆(SummaryMemory)
- 使用数据库存储历史
Q3: 如何切换LLM?
A: 只需修改一行代码:
# 从通义千问切换到OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
Q4: 如何部署到生产环境?
A: 建议方案:
- 使用Docker容器化
- 使用Redis存储对话历史
- 使用PostgreSQL存储向量
- 使用Nginx做负载均衡
- 使用LangSmith监控
Q5: 如何评估RAG效果?
A: 可以使用以下指标:
- 检索准确率(Recall@K)
- 生成质量(BLEU、ROUGE)
- 用户满意度(人工评估)
- 系统性能(响应时间、吞吐量)
总结与展望
核心知识点回顾
这个系统虽然简单,但完整展示了RAG的核心流程:
- 数据向量化(DashScope嵌入)
- 相似度检索(FAISS向量库)
- 上下文增强(LCEL链式调用)
- LLM生成(通义千问)
- 对话管理(内存记忆)
- 两种模式(RAG vs 普通对话)
- 流式输出(提升用户体验)
技术选型说明
| 组件 | 选择 | 原因 |
|---|---|---|
| LLM | 通义千问 | 国内API,延迟低,成本低 |
| 向量数据库 | FAISS | Meta开源,性能优秀,支持大规模 |
| 嵌入模型 | DashScope | 与LLM同源,集成方便 |
| Web框架 | FastAPI | 现代化,性能好,支持异步 |
| 前端 | Streamlit | 快速原型,无需前端知识 |
下一步学习计划
- Week 2:LlamaIndex框架与Transformer原理
- Week 3:RAG优化与向量数据库进阶
- Week 4:RAG评估与数据处理
- Week 5:LangGraph与Agent开发
- Week 6:AutoGen与CrewAI实战
- Week 7:模型微调与LoRA技术
- Week 8:Prompt工程与优化
- Week 9:LangServe部署与监控
- Week 10:DSPy框架与系统优化
感谢阅读! 如果有任何问题或建议,欢迎在评论区讨论。
更多推荐



所有评论(0)