从零到一:用LangChain搭建房产RAG系统

我用一周时间从零开始搭建了一个完整的房产推荐AI系统。这不仅是一个Demo,更是一个展示RAG核心原理的实战项目。

前言

这个项目涉及的知识点包括:向量表示学习检索增强生成(RAG)LCEL编程范式多轮对话管理流式输出优化等。

本文不仅记录实现过程,更重要的是解释每个环节背后的大模型原理,以及为什么这样设计。

第一部分:RAG基础

1. 大模型时代的新范式:RAG

在大模型出现之前,我们用搜索引擎解决信息检索问题。大模型改变了这一切。

三种方式的对比

传统方式:
用户查询 → 搜索引擎 → 返回链接 → 用户自己阅读

大模型方式(容易幻觉):
用户查询 → LLM直接生成答案 ❌ 可能不准确

RAG方式(最佳实践):
用户查询 → 检索相关文档 → 结合文档让LLM生成答案 ✅ 准确可靠

为什么RAG更好?

  1. 解决知识过时:LLM的训练数据有截止日期,RAG可以检索最新信息
  2. 减少幻觉:有文档支撑,LLM不会凭空编造
  3. 可解释性:用户可以看到检索到的房源,知道答案从何而来
  4. 无需重训:只需更新知识库,不用重新训练模型

这个系统就是RAG的一个实践案例。核心思想是:让LLM基于真实数据生成答案,而不是凭空幻觉

2. 系统架构设计

┌─────────────────────────────────────────────────────────┐
│  用户交互层                                              │
│  FastAPI 后端 + Streamlit 前端                          │
│  (处理HTTP请求,管理会话,支持流式输出)                 │
├─────────────────────────────────────────────────────────┤
│  业务逻辑层                                              │
│  LangChain LCEL 链 + 对话记忆管理                       │
│  (编排LLM、检索器、提示词的执行流程)                    │
│  (支持两种模式:RAG模式 + 普通对话模式)                 │
├─────────────────────────────────────────────────────────┤
│  数据层                                                  │
│  向量存储(FAISS) + 嵌入模型(DashScope)                  │
│  (存储房产数据的向量表示,支持高效语义搜索)             │
└─────────────────────────────────────────────────────────┘

核心数据流

用户提问(带user_id)
  ↓
获取用户对话历史
  ↓
选择模式:
  ├─ RAG模式:检索房源数据库 + 历史上下文
  └─ 普通对话模式:直接调用LLM + 历史记忆
  ↓
[向量化] 将问题转换为向量表示
  ↓
[检索] 在向量空间中找最相似的房源(Top-5)
  ↓
[增强] 将检索结果 + 对话历史作为上下文
  ↓
[生成] LLM基于上下文生成答案
  ↓
[流式/一次性] 返回答案给用户
  ↓
[记忆] 保存对话到用户历史

第二部分:核心实现

3. 向量存储与检索

为什么用FAISS?

  • Meta开源的向量搜索库
  • 性能优秀,支持大规模数据
  • 支持多种距离度量(余弦相似度、欧氏距离等)
  • 可以持久化到磁盘

检索器实现(retriever.py):

from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import DashScopeEmbeddings
from typing import List, Dict
import os
from dotenv import load_dotenv

load_dotenv()

class HouseRetriever:
    """房产检索器 - 从向量数据库检索相关房源"""
    
    def __init__(self, persist_directory: str = "data/embeddings/faiss"):
        print(f"初始化嵌入模型...")
        # 初始化DashScope的Embedding模型
        self.embeddings = DashScopeEmbeddings(
            model="text-embedding-v1",
            dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
        )
        
        print(f"加载向量数据库: {persist_directory}")
        # 加载已有的FAISS向量数据库
        self.vectorstore = FAISS.load_local(
            persist_directory,
            self.embeddings,
            allow_dangerous_deserialization=True
        )
        
        print(f"创建检索器...")
        # 创建检索器,默认返回Top-5
        self.retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": 5}
        )
        print(f"检索器初始化完成")
    
    def retrieve(self, query: str, k: int = 5) -> List[Dict]:
        """
        检索相关房源
        
        Args:
            query: 用户查询(可能包含对话历史)
            k: 返回的结果数量
        
        Returns:
            检索结果列表,每个结果包含content和metadata
        """
        # 更新k值
        self.retriever.search_kwargs["k"] = k
        
        # 执行检索
        docs = self.retriever.invoke(query)
        
        # 格式化结果
        results = []
        for doc in docs:
            results.append({
                "content": doc.page_content,
                "metadata": doc.metadata
            })
        
        return results
    
    def format_results(self, results: List[Dict]) -> str:
        """
        格式化检索结果为文本
        
        这一步很关键:将结构化的检索结果转换为自然语言文本,
        这样LLM才能理解并基于这些信息生成答案。
        """
        formatted = []
        for i, result in enumerate(results, 1):
            meta = result['metadata']
            formatted.append(
                f"{i}. {meta['title']}\n"
                f"   价格: {meta['price']}万\n"
                f"   面积: {meta['area']}平米\n"
                f"   位置: {meta['location']}"
            )
        return "\n\n".join(formatted)

关键点解析

  1. DashScope Embedding:将文本转换为768维向量
  2. FAISS.load_local():加载已有的向量数据库(第一次需要初始化)
  3. as_retriever():将向量存储转换为检索器
  4. format_results():这一步很关键,将结构化数据转换为自然语言

4. RAG链的构建

为什么用LCEL?

LCEL(LangChain Expression Language)是LangChain的现代语法。相比传统的LLMChain:

  • 代码更简洁(用 | 连接组件)
  • 支持流式输出(astream()
  • 支持异步调用(ainvoke()
  • 支持批量处理(batch()

RAG链实现(rag_chain.py):

from langchain_community.llms import Tongyi
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from .retriever import HouseRetriever
import os
from dotenv import load_dotenv

load_dotenv()

class HouseRAGChain:
    """房产RAG链 - 完整的检索增强生成流程"""
    
    def __init__(self):
        print("初始化RAG链...")
        
        # 初始化LLM(使用DashScope的通义千问)
        self.llm = Tongyi(
            model="qwen-plus",
            temperature=0.7,
            dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
        )
        
        # 初始化检索器
        self.retriever = HouseRetriever()
        
        # 创建Prompt模板
        # 注意:{context} 和 {question} 是占位符,会被动态替换
        self.prompt = ChatPromptTemplate.from_template("""
你是一个专业的房产推荐助手。基于以下检索到的房源信息,回答用户的问题。

检索到的房源:
{context}

用户问题:{question}

请给出专业、友好的回答,如果检索结果中有合适的房源,请推荐给用户。
""")
        
        # 创建RAG链 - 这是LCEL的核心
        # 流程:
        # 1. 接收 {"question": "..."} 输入
        # 2. 执行 lambda 函数检索相关房源
        # 3. 将 context 和 question 传入 prompt
        # 4. 发送给 LLM 生成答案
        # 5. 用 StrOutputParser 解析输出
        self.chain = (
            {
                # 这一步是RAG的核心:根据问题检索相关文档
                "context": lambda x: self._format_context(x["question"]),
                # 问题直接传递
                "question": RunnablePassthrough()
            }
            | self.prompt      # 将context和question填入prompt
            | self.llm         # 发送给LLM
            | StrOutputParser()  # 解析输出为字符串
        )
        
        print("RAG链初始化完成")
    
    def _format_context(self, question: str) -> str:
        """
        检索并格式化上下文
        
        这一步执行RAG的"检索"和"增强"步骤:
        1. 根据问题检索相关房源
        2. 格式化为自然语言文本
        """
        results = self.retriever.retrieve(question, k=3)
        return self.retriever.format_results(results)
    
    def ask(self, question: str) -> str:
        """
        问答接口
        
        Args:
            question: 用户问题(可能包含对话历史)
        
        Returns:
            LLM生成的答案
        """
        return self.chain.invoke({"question": question})

LCEL链的工作流程

输入: {"question": "我想在朝阳区找一个三居室"}
  ↓
lambda x: self._format_context(x["question"])
  ↓
执行检索:
  - 向量化问题
  - 在FAISS中搜索
  - 返回Top-3房源
  ↓
格式化为文本:
  "1. 朝阳区常营三居室 - 580万 - 地铁6号线
   2. 朝阳区建国路三居室 - 620万 - 地铁1号线
   ..."
  ↓
填入Prompt:
  "你是一个专业的房产推荐助手。基于以下检索到的房源信息...
   检索到的房源:
   1. 朝阳区常营三居室...
   用户问题:我想在朝阳区找一个三居室"
  ↓
发送给LLM生成答案
  ↓
输出: "根据您的需求,我为您找到了以下房源..."

与简单链的对比

# ❌ 简单链(没有检索)
simple_chain = prompt | llm | output_parser
# 问题:LLM凭空生成答案,容易幻觉

# ✅ RAG链(有检索)
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | output_parser
)
# 优势:LLM基于真实数据生成答案,准确可靠

5. 对话记忆管理

为什么需要记忆?

多轮对话需要记住历史。例如:

  • 用户第1轮:“我想在朝阳区找房子”
  • 用户第2轮:“那个房子的具体位置在哪里?”

如果没有记忆,系统不知道"那个房子"指的是什么。

记忆管理实现(memory_manager.py):

from langchain.schema import HumanMessage, AIMessage
from typing import Dict, List

class MemoryManager:
    """对话记忆管理器 - 支持多用户并发"""
    
    def __init__(self):
        # 用字典存储每个用户的对话历史
        # key: user_id, value: 对话消息列表
        self.user_memories: Dict[str, List] = {}
    
    def get_history(self, user_id: str) -> List:
        """
        获取用户的对话历史
        
        Args:
            user_id: 用户ID
        
        Returns:
            对话历史列表(HumanMessage和AIMessage交替)
        """
        if user_id not in self.user_memories:
            self.user_memories[user_id] = []
        return self.user_memories[user_id]
    
    def add_message(self, user_id: str, human_msg: str, ai_msg: str):
        """
        添加一轮对话
        
        Args:
            user_id: 用户ID
            human_msg: 用户消息
            ai_msg: AI回复
        """
        history = self.get_history(user_id)
        history.append(HumanMessage(content=human_msg))
        history.append(AIMessage(content=ai_msg))
        
        # 只保留最近20条消息(10轮对话)
        # 这是WindowMemory的实现:节省内存,保持上下文
        if len(history) > 20:
            self.user_memories[user_id] = history[-20:]
    
    def clear_history(self, user_id: str):
        """清空用户历史"""
        if user_id in self.user_memories:
            del self.user_memories[user_id]

# 全局实例
memory_manager = MemoryManager()

为什么这样设计?

  1. 多用户支持:用字典管理不同用户的历史
  2. 内存优化:只保留最近10轮对话(20条消息)
  3. 简单高效:不需要额外的LLM调用或数据库
  4. 生产就绪:已在项目中验证

对比不同的Memory类型

BufferMemory:保存所有对话
  优点:完整历史
  缺点:占内存
  适用:短对话(<10轮)

WindowMemory(我们的实现):保留最近K轮
  优点:节省内存,保持上下文
  缺点:丢失早期信息
  适用:长对话(10-50轮)

SummaryMemory:LLM总结历史
  优点:最省内存
  缺点:需要额外LLM调用
  适用:超长对话(>50轮)

第三部分:API设计与两种模式

6. 两种模式:RAG vs 普通对话

这个系统的创新之处在于支持两种模式,可以根据场景灵活选择。

模式1:RAG模式(use_rag=true)

  • 检索房源数据库
  • 基于真实数据回答
  • 准确性高,实时性好
  • 适合房源查询

模式2:普通对话模式(use_rag=false)

  • 直接调用LLM
  • 基于LLM知识回答
  • 灵活性高,响应快
  • 适合通用咨询

对比表

| 特性 | RAG模式 | 普通对话模式 |
|------|--------|-----------|
| 数据来源 | 房源数据库 | LLM知识库 |
| 准确性 | ⭐⭐⭐⭐⭐ 高 | ⭐⭐⭐ 中 |
| 实时性 | ⭐⭐⭐⭐⭐ 高 | ⭐⭐ 低 |
| 灵活性 | ⭐⭐ 低 | ⭐⭐⭐⭐⭐ 高 |
| 响应速度 | 中等 | 快 |
| 适用场景 | 房源查询 | 通用咨询 |
| 示例问题 | "朝阳区三居室" | "房产投资建议" |

使用场景

场景1:用户查询房源

用户问题:"我想在朝阳区找一个三居室,预算600万"

选择:use_rag=true(RAG模式)
原因:
- 需要准确的房源信息
- 需要实时的房价数据
- 用户关心具体房源

流程:
1. 检索房源数据库 → 找到符合条件的房源
2. 格式化房源信息 → 拼接到Prompt
3. LLM生成回答 → 推荐具体房源
4. 保存到记忆 → 下次对话可以参考

场景2:用户咨询房产知识

用户问题:"房产投资有什么建议?"

选择:use_rag=false(普通对话模式)
原因:
- 不需要查询数据库
- 需要灵活的回答
- 用户要求通用建议

流程:
1. 直接调用LLM
2. LLM基于知识库回答
3. 保存到记忆
4. 下次对话可以参考

7. 完整的API实现

FastAPI实现(main.py):

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from src.rag.rag_chain import HouseRAGChain
from src.utils.memory_manager import memory_manager
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.llms import Tongyi
from langchain.schema.output_parser import StrOutputParser
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI(title="房产AI助手 v1.0")

# 初始化RAG链
rag_chain = HouseRAGChain()

# 初始化普通对话链(带Memory)
llm = Tongyi(
    model="qwen-plus",
    temperature=0.7,
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)

# 创建带历史的对话模板
chat_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的房产推荐助手。"),
    MessagesPlaceholder(variable_name="history"),  # 对话历史
    ("human", "{question}")
])

# LCEL链:prompt | llm | output_parser
chat_chain = chat_prompt | llm | StrOutputParser()

# 数据模型
class Question(BaseModel):
    question: str
    user_id: str = "default"
    use_rag: bool = True  # 是否使用RAG

class Answer(BaseModel):
    answer: str
    source: str  # "rag" 或 "chat"

@app.post("/chat", response_model=Answer)
async def chat(question: Question):
    """
    智能问答接口(自动选择RAG或普通对话)
    
    Args:
        question: 用户问题
        user_id: 用户ID(用于多用户管理)
        use_rag: 是否使用RAG模式
    
    Returns:
        answer: AI回答
        source: 回答来源("rag"或"chat")
    """
    try:
        # 获取用户历史记忆
        history = memory_manager.get_history(question.user_id)
        
        if question.use_rag:
            # 模式1:RAG模式
            # 构建包含历史的prompt
            history_text = "\n".join([
                f"用户: {msg.content}" if msg.__class__.__name__ == 'HumanMessage' 
                else f"助手: {msg.content}"
                for msg in history[-4:]  # 只用最近2轮对话
            ])
            
            # 增强问题:加入对话历史
            enhanced_question = question.question
            if history_text:
                enhanced_question = f"对话历史:\n{history_text}\n\n当前问题:{question.question}"
            
            # 调用RAG链
            answer = rag_chain.ask(enhanced_question)
            source = "rag"
        else:
            # 模式2:普通对话模式
            # 直接调用LLM(带历史)
            answer = chat_chain.invoke({
                "question": question.question,
                "history": history
            })
            source = "chat"
        
        # 保存到记忆
        memory_manager.add_message(
            question.user_id,
            question.question,
            answer
        )
        
        return Answer(answer=answer, source=source)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/clear/{user_id}")
async def clear_history(user_id: str):
    """清空用户对话历史"""
    memory_manager.clear_history(user_id)
    return {"message": f"已清空用户 {user_id} 的对话历史"}

@app.get("/")
async def root():
    return {
        "message": "房产AI助手API v1.0",
        "features": ["RAG检索", "多轮对话", "智能推荐"]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

关键点解析

  1. 两种模式的选择:通过 use_rag 参数灵活切换
  2. 对话历史增强:将历史信息加入到问题中,让LLM理解上下文
  3. 多用户支持:通过 user_id 管理不同用户的历史
  4. 错误处理:完整的异常捕获和HTTP错误返回

8. 流式输出:提升用户体验

为什么需要流式输出?

  1. 用户体验好:看到实时输出,不觉得卡
  2. 降低延迟感:即使总时间相同,感觉更快
  3. 可以中断:用户不满意可以立即停止
  4. 节省内存:不需要一次性加载全部

流式 vs 一次性返回的对比

场景:用户问"北京房价怎么样?",AI需要5秒生成答案

一次性返回:
时间轴:
0s -------- 5s
等待...    返回完整答案
用户感受:感觉很慢,像卡住了

流式返回:
时间轴:
0s - 1s - 2s - 3s - 4s - 5s
北 京 房 价 近 年 来 呈 现 稳 定 态 势...
用户感受:看到实时输出,感觉很快

虽然总时间相同(都是5秒),但用户体验完全不同!

性能对比

| 指标 | 一次性返回 | 流式返回 |
|------|----------|--------|
| 首字延迟 | 5秒 | 0秒(立即显示) |
| 用户感受 | 慢 | 快 |
| 可中断性 | 不能 | 可以 |
| 内存占用 | 高 | 低 |
| 实现复杂度 | 简单 | 中等 |
| 适用场景 | 简单查询 | Web应用 |

流式输出实现

from fastapi.responses import StreamingResponse

@app.post("/chat/stream")
async def chat_stream(question: Question):
    """
    流式问答接口(推荐用于Web应用)
    
    优势:
    1. 用户立即看到输出
    2. 可以中断
    3. 内存占用少
    """
    async def generate():
        try:
            # 获取用户历史
            history = memory_manager.get_history(question.user_id)
            
            # 异步流式输出
            async for chunk in chat_chain.astream({
                "question": question.question,
                "history": history
            }):
                yield chunk
        except Exception as e:
            yield f"Error: {str(e)}"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

关键点

  1. astream():异步流式输出
  2. yield chunk:逐块返回数据
  3. StreamingResponse:FastAPI的流式响应
  4. text/event-stream:标准的流式传输格式

客户端测试

import requests

# 一次性返回
response = requests.post(
    "http://localhost:8000/chat",
    json={"question": "朝阳区有什么好房子?", "user_id": "user_001"}
)
print(response.json())

# 流式返回
response = requests.post(
    "http://localhost:8000/chat/stream",
    json={"question": "朝阳区有什么好房子?", "user_id": "user_001"},
    stream=True
)
for chunk in response.iter_content(decode_unicode=True):
    print(chunk, end="", flush=True)

第四部分:实战指南

9. 快速开始

环境配置

# 1. 创建虚拟环境
python -m venv venv

# 2. 激活虚拟环境
# Windows
venv\Scripts\activate

# 3. 安装依赖
pip install -r requirements.txt

# 4. 配置API密钥
cp .env.example .env
# 编辑.env,添加DASHSCOPE_API_KEY

初始化向量存储

# 第一次运行需要初始化FAISS向量数据库
python src/utils/data_generator.py  # 生成房产数据
python src/utils/data_cleaner.py    # 清洗数据
python src/rag/vector_store.py      # 创建向量存储

启动服务

# 启动API
python src/api/main.py

# 新开终端,启动前端
streamlit run src/frontend/app.py

测试API

# 测试RAG模式
curl -X POST "http://localhost:8000/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "question": "我想在朝阳区找一个三居室,预算600万",
    "user_id": "user_001",
    "use_rag": true
  }'

# 测试普通对话模式
curl -X POST "http://localhost:8000/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "question": "房产投资有什么建议?",
    "user_id": "user_001",
    "use_rag": false
  }'

# 测试流式输出
curl -X POST "http://localhost:8000/chat/stream" \
  -H "Content-Type: application/json" \
  -d '{
    "question": "朝阳区房价怎么样?",
    "user_id": "user_001"
  }'

10. 常见问题解答

Q1: 向量检索不准确怎么办?

A: 可以尝试以下方案:

  1. 调整检索的k值(返回更多结果)
  2. 优化数据的search_text字段
  3. 使用重排序(Reranking)提高准确性
  4. 使用混合搜索(BM25 + 向量)

Q2: 如何处理长对话?

A: 当前实现保留最近10轮对话。如果需要更长的对话,可以:

  1. 增加保留的轮数(修改memory_manager.py中的20)
  2. 使用摘要记忆(SummaryMemory)
  3. 使用数据库存储历史

Q3: 如何切换LLM?

A: 只需修改一行代码:

# 从通义千问切换到OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")

Q4: 如何部署到生产环境?

A: 建议方案:

  1. 使用Docker容器化
  2. 使用Redis存储对话历史
  3. 使用PostgreSQL存储向量
  4. 使用Nginx做负载均衡
  5. 使用LangSmith监控

Q5: 如何评估RAG效果?

A: 可以使用以下指标:

  1. 检索准确率(Recall@K)
  2. 生成质量(BLEU、ROUGE)
  3. 用户满意度(人工评估)
  4. 系统性能(响应时间、吞吐量)

总结与展望

核心知识点回顾

这个系统虽然简单,但完整展示了RAG的核心流程:

  1. 数据向量化(DashScope嵌入)
  2. 相似度检索(FAISS向量库)
  3. 上下文增强(LCEL链式调用)
  4. LLM生成(通义千问)
  5. 对话管理(内存记忆)
  6. 两种模式(RAG vs 普通对话)
  7. 流式输出(提升用户体验)

技术选型说明

组件 选择 原因
LLM 通义千问 国内API,延迟低,成本低
向量数据库 FAISS Meta开源,性能优秀,支持大规模
嵌入模型 DashScope 与LLM同源,集成方便
Web框架 FastAPI 现代化,性能好,支持异步
前端 Streamlit 快速原型,无需前端知识

下一步学习计划

  • Week 2:LlamaIndex框架与Transformer原理
  • Week 3:RAG优化与向量数据库进阶
  • Week 4:RAG评估与数据处理
  • Week 5:LangGraph与Agent开发
  • Week 6:AutoGen与CrewAI实战
  • Week 7:模型微调与LoRA技术
  • Week 8:Prompt工程与优化
  • Week 9:LangServe部署与监控
  • Week 10:DSPy框架与系统优化

感谢阅读! 如果有任何问题或建议,欢迎在评论区讨论。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐