LangChain 向量存储与集成完全指南

深入讲解 LangChain 的向量存储架构、支持的向量数据库、RAG 工作流、文档加载处理、嵌入模型和第三方集成

向量存储与集成


概览

LangChain 本身是一个 SDK/框架,不维护传统的关系型数据库。它通过 向量存储(Vector Stores)第三方集成 来处理数据持久化和检索。

支持的存储类型

  • 向量数据库(用于语义搜索和 RAG)
  • 文档存储(通过集成)
  • 消息存储(通过记忆组件)

向量存储架构

使用

实现

支持

支持

支持

支持

支持

支持

支持

向量化

添加

用户代码

VectorStore

具体向量库

ChromaDB

Pinecone

FAISS

Weaviate

Qdrant

PostgreSQL+PGVector

Elasticsearch

嵌入模型

文档

向量存储接口

基础抽象

from langchain_core.vectorstores import VectorStore, VectorStoreRetriever

class VectorStore(ABC):
    """向量存储基类"""

    @abstractmethod
    def add_documents(self, documents: List[Document]) -> List[str]:
        """添加文档到向量存储"""

    @abstractmethod
    def similarity_search(
        self,
        query: str,
        k: int = 4
    ) -> List[Document]:
        """相似度搜索"""

    @abstractmethod
    def similarity_search_with_score(
        self,
        query: str,
        k: int = 4
    ) -> List[Tuple[Document, float]]:
        """带分数的相似度搜索"""

支持的向量存储

ChromaDB

本地优先的向量数据库,易于使用:

from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings

# 创建本地向量存储
vectorstore = Chroma(
    collection_name="documents",
    embedding_function=OpenAIEmbeddings(),
    persist_directory="./chroma_db"
)

# 添加文档
vectorstore.add_documents(documents)

# 相似度搜索
results = vectorstore.similarity_search("query text", k=3)

Pinecone

托管的向量数据库服务:

from langchain_pinecone import PineconeVectorStore
import os

os.environ["PINECONE_API_KEY"] = "your-api-key"

vectorstore = PineconeVectorStore(
    index_name="my-index",
    embedding=OpenAIEmbeddings()
)

# 添加文档
vectorstore.add_documents(documents)

# 搜索
results = vectorstore.similarity_search("query", k=3)

FAISS

Meta 开发的本地向量索引:

from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 创建 FAISS 索引
vectorstore = FAISS.from_documents(
    documents=documents,
    embedding=OpenAIEmbeddings()
)

# 保存索引
vectorstore.save_local("faiss_index")

# 加载索引
vectorstore = FAISS.load_local(
    "faiss_index",
    OpenAIEmbeddings()
)

PGVector

PostgreSQL 的向量扩展:

from langchain_community.vectorstores import PGVector
from langchain_openai import OpenAIEmbeddings

# 连接配置
CONNECTION_STRING = "postgresql://user:***@localhost:5432/dbname"

vectorstore = PGVector(
    connection_string=CONNECTION_STRING,
    embedding_function=OpenAIEmbeddings(),
    collection_name="documents"
)

# 添加文档
vectorstore.add_documents(documents)

# 使用过滤器搜索
results = vectorstore.similarity_search_with_score(
    "query",
    k=3,
    filter={"category": "tech"}
)

Qdrant

高性能向量搜索引擎:

from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient

# 创建客户端
client = QdrantClient(url="http://localhost:6333")

vectorstore = QdrantVectorStore(
    client=client,
    collection_name="documents",
    embedding_function=OpenAIEmbeddings()
)

# 添加文档
vectorstore.add_documents(documents)

检索器 (Retrievers)

基础检索器

from langchain_core.vectorstores import VectorStoreRetriever

# 将向量存储转为检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 4}
)

# 使用检索器
docs = retriever.invoke("query text")

高级检索器

1. 最大边界相关性 (MMR) 检索
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 5,
        "fetch_k": 20,
        "lambda_mult": 0.5
    }
)
2. 相似度阈值检索
retriever = vectorstore.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={
        "score_threshold": 0.7,
        "k": 5
    }
)

多向量检索器

合并检索器
from langchain.retrieversers import MergerRetriever

# 合并多个检索器的结果
retriever = MergerRetriever(
    retrievers=[retriever1, retriever2, retriever3]
)
父文档检索器
from langchain.retrievers import ParentDocumentRetriever

# 小块搜索,返回大文档
retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=docstore,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter
)
上下文压缩检索器
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 压缩检索到的文档
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

RAG 工作流

基础 RAG 流程

检索

documents

query

格式化

生成

用户查询

检索器

提示模板

LLM

答案

代码示例

from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI

# 创建 RAG 链
qa_chain = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model="gpt-4"),
    chain_type="stuff",
    retriever=retriever,
    return_source_documents=True
)

# 执行查询
result = qa_chain.invoke({"query": "What is RAG?"})
print(result["result"])
print(result["source_documents"])

高级 RAG 配置

自定义提示模板
from langchain_core.prompts import PromptTemplate

prompt_template = """
Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.

Context: {context}

Question: {question}
Answer:"""

prompt = PromptTemplate(
    template=prompt_template,
    input_variables=["context", "question"]
)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever,
    chain_type_kwargs={"prompt": prompt}
)
返回源文档
from langchain.chains import RetrievalQAWithSourcesChain

qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm=ChatOpenAI(temperature=0),
    chain_type="stuff",
    retriever=retriever
)

result = qa_chain({"question": "What is RAG?"})
print(result["answer"])
print(result["sources"])

文档加载与处理

文档加载器

from langchain_community.document_loaders import (
    TextLoader,
    PyPDFLoader,
    DirectoryLoader,
    WebBaseLoader
)

# 加载文本文件
loader = TextLoader("file.txt")
documents = loader.load()

# 加载 PDF
loader = PyPDFLoader("document.pdf")
documents = loader.load()

# 加载整个目录
loader = DirectoryLoader(
    "./data",
    glob="**/*.txt",
    loader_cls=TextLoader
)
documents = loader.load()

# 加载网页
loader = WebBaseLoader("https://example.com")
documents = loader.load()

文档分割器

from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter
)

# 递归字符分割(推荐)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
)

# 分割文档
splits = text_splitter.split_documents(documents)

# 添加到向量存储
vectorstore.add_documents(splits)

嵌入模型

OpenAI 嵌入

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key="***"
)

# 嵌入文本
vector = embeddings.embed_query("Your text here")

# 嵌入文档
vectors = embeddings.embed_documents([
    "text 1",
    "text 2"
])

本地嵌入模型

from langchain_community.embeddings import HuggingFaceEmbeddings

embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"
)

其他嵌入提供商

提供商 模型
OpenAI text-embedding-3-small/large
Cohere embed-english-v3.0
HuggingFace sentence-transformers/*
Google embedding-gecko-001
Anthropic Claude 嵌入

消息存储(记忆)

ChatMessageHistory

from langchain_core.chat_history import ChatMessageHistory

# 创建消息历史
history = ChatMessageHistory()

# 添加消息
history.add_user_message("Hello")
history.add_ai_message("Hi there!")

# 获取消息
messages = history.messages

持久化存储

from langchain_community.chat_message_histories import (
    SQLChatMessageHistory,
    RedisChatMessageHistory,
    MongoDBChatMessageHistory
)

# PostgreSQL 存储
history = SQLChatMessageHistory(
    session_id="user-123",
    connection_string="postgresql://..."
)

# Redis 存储
history = RedisChatMessageHistory(
    session_id="user-123",
    url="redis://localhost:6379/0"
)

# MongoDB 存储
history = MongoDBChatMessageHistory(
    session_id="user-123",
    connection_string="mongodb://...",
    database_name="chat_history",
    collection_name="messages"
)

第三方集成

LLM 提供商集成

集成

集成

集成

集成

集成

集成

LangChain

OpenAI

Anthropic

Google

Cohere

Groq

Ollama 本地

工具集成

  • serpapi: Google 搜索
  • wikipedia: 维基百科搜索
  • wolfram-alpha: 数学计算
  • zapier: 自动化集成
  • slack: Slack 集成
  • github: GitHub 操作

数据源集成

  • 文件系统:本地文件
  • 云存储:S3, GCS, Azure Blob
  • 数据库:PostgreSQL, MongoDB, Snowflake
  • API:Notion, Confluence, SharePoint
  • 网站:通用网页爬取

性能优化

1. 批量嵌入

# 批量处理提高效率
texts = ["text 1", "text 2", "text 3"]
vectors = embeddings.embed_documents(texts)

2. 异步操作

# 异步添加文档
await vectorstore.aadd_documents(documents)

# 异步搜索
docs = await vectorstore.asimilarity_search("query")

3. 缓存

from langchain_core.caches import InMemoryCache

# 缓存嵌入结果
from langchain.globals import set_llm_cache
set_llm_cache(InMemoryCache())

# 或使用 Redis 缓存
from langchain_community.cache import RedisCache
set_llm_cache(RedisCache(redis_url="redis://..."))

最佳实践

1. 选择合适的向量存储

  • 本地开发:FAISS, ChromaDB
  • 生产环境:Pinecone, Qdrant, Weaviate
  • 现有 PostgreSQL:PGVector
  • 大规模:Elasticsearch, OpenSearch

2. 优化分割策略

# 根据内容类型选择
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,  # 适中大小
    chunk_overlap=200,  # 保持上下文
    separators=["\n\n", "\n", ".", " "]  # 优先段落
)

3. 使用适当的嵌入模型

  • 英文通用:text-embedding-3-small
  • 多语言:multilingual-e5-large
  • 本地部署:all-MiniLM-L6-v2

4. 添加元数据过滤

# 添加文档时包含元数据
document = Document(
    page_content="...",
    metadata={"category": "tech", "date": "2024-01-01"}
)

# 搜索时使用过滤
results = vectorstore.similarity_search(
    "query",
    k=3,
    filter={"category": "tech"}
)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐