Spring AI, LangChain和LangGraph(二)
RAG(Retrieval-Augmented Generation,检索增强生成)已经成为当前大模型应用中最主流的工程模式之一。
它通过「向量检索 + 大模型生成」的方式,让 LLM 能够基于外部知识进行回答,从而解决:
-
大模型幻觉问题
-
私有数据无法直接训练的问题
-
实时知识更新的问题
本文将使用 LangChain,LangChain, LangGraph+ 智谱 LLM,从零构建一个:
✅ 可运行
✅ 有向量检索
✅ 有对话记忆
✅ 有结构化输出
的简单 RAG Demo。
Spring AI
Maven 项目结构
zhipu-springai-demo/
├── pom.xml
└── src
└── main
├── java
│ └── com/example/zhipu/ZhipuSpringAIRag.java
└── resources
└── application.properties
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>zhipu-springai-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
<spring-ai.version>0.0.1</spring-ai.version> <!-- 请替换为最新版本 -->
</properties>
<dependencies>
<!-- Spring AI Chat -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-chat</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- Spring AI Embeddings -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-embeddings</artifactId>
<version>${spring-ai.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- 使用 exec 插件运行 main 方法 -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<mainClass>com.example.zhipu.ZhipuSpringAIRag</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
src/main/resources/application.properties
# 你的智谱(Zhipu)API Key
spring.ai.zhipuai.api-key=YOUR_ZHIPU_API_KEY
src/main/java/com/example/zhipu/ZhipuSpringAIRag.java
package com.example.zhipu;
import org.springframework.ai.chat.ChatClient;
import org.springframework.ai.chat.ChatModel;
import org.springframework.ai.chat.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.advisor.RetrievalAdvisor;
import org.springframework.ai.chat.memory.MessageChatMemory;
import org.springframework.ai.embedding.EmbeddingClient;
import org.springframework.ai.embedding.EmbeddingVectorStore;
import org.springframework.ai.embedding.InMemoryVectorStore;
import java.util.List;
public class ZhipuSpringAIRag {
public record RagAnswer(String answer, List<String> sources) {}
public static void main(String[] args) {
// 1️⃣ 创建 Zhipu ChatModel
ChatModel chatModel = ChatModel.ofZhipuai();
// 2️⃣ 内存对话 + 向量存储
MessageChatMemory memory = new MessageChatMemory();
EmbeddingClient embeddingClient = EmbeddingClient.ofZhipuai();
EmbeddingVectorStore vectorStore = new InMemoryVectorStore(embeddingClient);
// 3️⃣ 添加一个示例文档
vectorStore.addDocument(
"RAG = Retrieval-Augmented Generation. LLMs use external documents to generate better answers."
);
// 4️⃣ 构建带 Advisor 的 ChatClient
ChatClient chatClient = ChatClient.builder(chatModel)
.advisors(
new MessageChatMemoryAdvisor(memory),
new RetrievalAdvisor(vectorStore)
)
.build();
// 5️⃣ 发送请求
RagAnswer result = chatClient.prompt()
.system("You are a helpful assistant explaining RAG.")
.user("Explain RAG in simple terms.")
.call()
.entity(RagAnswer.class);
// 6️⃣ 输出结果
System.out.println("Answer: " + result.answer());
System.out.println("Sources: " + result.sources());
}
}
✅ 如何运行
-
在
application.properties中配置你的 Zhipu API Key -
打开终端,进入项目根目录
-
执行以下命令:
mvn clean compile exec:java
该命令将:
-
编译项目
-
执行
ZhipuSpringAIRag的main方法 -
输出带有 RAG(检索增强生成)能力 的回答结果
LangChain:
本示例包含以下核心组件:
-
Chat 模型:智谱 GLM-4
-
向量数据库:FAISS(内存级)
-
Embedding 模型:示例中使用HuggingFaceEmbeddings
-
Memory:Python List 实现的内存对话
-
Structured Output:Pydantic 结构化输出
整体流程如下:
用户问题
↓
向量检索(FAISS)
↓
拼接上下文(Context)
↓
合并历史对话(Memory)
↓
调用 GLM-4
↓
结构化解析结果
项目结构
zhipu-langchain-rag/
├── requirements.txt
└── main.py
依赖说明(requirements.txt)
langchain>=0.1.0
faiss-cpu
pydantic
langchain-community
依赖解释:
-
langchain:核心编排框架
-
langchain-community:社区模型与向量库(包含 Zhipu)
-
faiss-cpu:本地向量相似度搜索
-
pydantic:结构化输出与数据校验
代码(main.py)
import os
from typing import List
from pydantic import BaseModel
# 智谱 AI 的 LangChain Chat 模型
from langchain_community.chat_models import ChatZhipuAI
# 向量模型(示例使用 HuggingFaceEmbeddings)
from langchain.embeddings import HuggingFaceEmbeddings
# FAISS 向量数据库
from langchain.vectorstores import FAISS
# LangChain 消息类型
from langchain.schema import HumanMessage, SystemMessage
# --------------------------
# 1️⃣ 设置智谱 API Key
# --------------------------
# 建议使用环境变量方式设置,避免在代码中硬编码
os.environ["ZHIPUAI_API_KEY"] = "YOUR_ZHIPU_API_KEY"
# --------------------------
# 2️⃣ 初始化 Chat 模型(GLM-4)
# --------------------------
# temperature 控制生成结果的随机性
llm = ChatZhipuAI(
model="glm-4",
temperature=0.7
)
# --------------------------
# 3️⃣ 构建内存级向量数据库(RAG 核心)
# --------------------------
# 使用 FAISS 作为向量存储
# HuggingFaceEmbeddings 仅作为示例
embeddings = HuggingFaceEmbeddings()
# 初始化向量库并写入示例文档
vector_store = FAISS.from_texts(
[
"RAG = Retrieval-Augmented Generation. "
"LLMs use external documents for better answers."
],
embedding=embeddings
)
# --------------------------
# 4️⃣ 定义结构化输出模型
# --------------------------
class RagAnswer(BaseModel):
"""
RAG 返回结果结构:
- answer: 模型生成的回答
- sources: 参考来源
"""
answer: str
sources: List[str]
# --------------------------
# 5️⃣ 内存对话记录(In-memory Memory)
# --------------------------
# 使用 List 保存历史问答
chat_memory: List[str] = []
# --------------------------
# 6️⃣ RAG 主逻辑函数
# --------------------------
def ask_rag(question: str) -> RagAnswer:
"""
RAG 问答流程:
1. 向量检索
2. 构造 Prompt
3. 调用大模型
4. 解析结构化结果
"""
# ① 从向量数据库中检索最相似的文档
docs = vector_store.similarity_search(question, k=1)
# ② 拼接检索到的上下文
context = "\n".join([doc.page_content for doc in docs])
# ③ 系统提示词
system_prompt = "You are a helpful assistant explaining RAG."
# ④ 用户提示词(包含上下文)
user_prompt = (
f"Context:\n{context}\n\n"
f"Question:\n{question}\n"
"Please answer in JSON: {'answer':'', 'sources':[]}"
)
# ⑤ 构造消息列表
messages = [SystemMessage(content=system_prompt)]
# 加入历史对话
for m in chat_memory:
messages.append(HumanMessage(content=m))
# 加入当前问题
messages.append(HumanMessage(content=user_prompt))
# ⑥ 调用智谱 GLM-4
response = llm.invoke(messages)
reply = response.content
# ⑦ 保存到内存
chat_memory.append(f"Q: {question}\nA: {reply}")
# ⑧ 解析 JSON 结构化输出
import json
try:
parsed = json.loads(reply)
return RagAnswer(
answer=parsed["answer"],
sources=parsed["sources"]
)
except Exception:
# JSON 解析失败时的兜底方案
return RagAnswer(
answer=reply,
sources=[]
)
# --------------------------
# 7️⃣ 示例运行入口
# --------------------------
if __name__ == "__main__":
result = ask_rag("Explain RAG in simple terms.")
print("Answer:", result.answer)
print("Sources:", result.sources)
关键设计点解析
1️⃣ 为什么要用 FAISS?
-
本地运行、零依赖服务
-
非常适合 Demo / PoC
-
与 Milvus、Qdrant 的接口一致
2️⃣ Memory 的作用是什么?
-
让模型具备「上下文感知能力」
-
类似:
-
LangChain
ConversationBufferMemory -
Spring AI
ChatMemory
-
3️⃣ 为什么要用 Pydantic?
-
强约束输出结构
-
方便前端 / API / 测试
-
与 Spring AI 的
@StructuredOutput理念一致
LangGraph:
下面示例展示如何利用 LangChain、智谱 AI 和 LangGraph 构建一个带多轮记忆的 RAG 系统。系统核心包括三个模块:**检索节点(Retrieve Node)**用于向量数据库中检索相关文档,**LLM 节点(LLM Node)**将文档上下文和历史问答作为提示发送给智谱 AI 大模型生成回答,**解析节点(Parse Node)**将模型输出解析成结构化 JSON,并更新聊天历史,实现多轮对话记忆。通过使用 LangGraph 状态图(StateGraph),整个问答流程从检索到生成再到解析可形成可控、可扩展的流程,方便实现 RAG 问答的多轮会话。示例中向量数据库使用 FAISS,向量嵌入使用 HuggingFaceEmbeddings(可替换为智谱向量接口),每轮问答都将历史问题和回答保存到 chat_history,保证下一轮问题可以利用上下文生成更连贯的回答。该系统不仅可以处理单轮问题,也可以支持多轮问答,使模型输出更贴合用户连续提问的逻辑。
import os
import json
from typing import List, TypedDict, Optional
from pydantic import BaseModel
from langchain_community.chat_models import ChatZhipuAI
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, END
# =========================================================
# 1️⃣ 设置 API Key
# =========================================================
os.environ["ZHIPUAI_API_KEY"] = "YOUR_ZHIPU_API_KEY" # 替换为你的智谱 AI Key
# =========================================================
# 2️⃣ 初始化聊天模型
# =========================================================
llm = ChatZhipuAI(
model="glm-4",
temperature=0.7 # 控制回答随机性,值越大回答越多样
)
# =========================================================
# 3️⃣ 构建向量存储(RAG)
# =========================================================
embeddings = HuggingFaceEmbeddings() # 嵌入
vector_store = FAISS.from_texts(
[
"RAG stands for Retrieval-Augmented Generation.",
"RAG allows LLMs to use external documents to improve answers.",
"RAG combines vector search with language models."
],
embedding=embeddings
)
# =========================================================
# 4️⃣ 定义结构化输出模型
# =========================================================
class RagAnswer(BaseModel):
answer: str # 模型生成的回答
sources: List[str] # 引用的文档来源
# =========================================================
# 5️⃣ LangGraph 状态(用于多轮记忆)
# =========================================================
class GraphState(TypedDict):
question: str # 当前问题
context: str # 检索到的文档上下文
chat_history: List[str] # 聊天历史
raw_response: str # 原始模型输出
result: Optional[RagAnswer] # 解析后的结果
# =========================================================
# 6️⃣ 构建 Graph 节点
# =========================================================
# --- 检索节点(RAG) ---
def retrieve_node(state: GraphState) -> GraphState:
docs = vector_store.similarity_search(state["question"], k=2) # 检索前两条相关文档
context = "\n".join(doc.page_content for doc in docs)
return {
**state,
"context": context
}
# --- LLM 节点(结合历史记忆) ---
def llm_node(state: GraphState) -> GraphState:
system_message = SystemMessage(
content="You are a helpful assistant explaining RAG clearly." # 系统角色提示
)
messages = [system_message]
# 添加聊天历史到提示中
for m in state["chat_history"]:
messages.append(HumanMessage(content=m))
user_message = HumanMessage(
content=f"""
Context:
{state['context']}
Question:
{state['question']}
Please answer strictly in JSON:
{{
"answer": "...",
"sources": ["..."]
}}
"""
)
messages.append(user_message)
response = llm.invoke(messages)
return {
**state,
"raw_response": response.content
}
# --- 解析节点(结构化输出 + 内存更新) ---
def parse_node(state: GraphState) -> GraphState:
try:
parsed = json.loads(state["raw_response"])
result = RagAnswer(
answer=parsed.get("answer", ""),
sources=parsed.get("sources", [])
)
except Exception:
result = RagAnswer(
answer=state["raw_response"],
sources=[]
)
# 更新聊天历史
new_history = state["chat_history"] + [
f"Q: {state['question']}\nA: {result.answer}"
]
return {
**state,
"chat_history": new_history,
"result": result
}
# =========================================================
# 7️⃣ 构建 LangGraph 流程
# =========================================================
graph = StateGraph(GraphState)
graph.add_node("retrieve", retrieve_node)
graph.add_node("llm", llm_node)
graph.add_node("parse", parse_node)
graph.set_entry_point("retrieve")
graph.add_edge("retrieve", "llm")
graph.add_edge("llm", "parse")
graph.add_edge("parse", END)
app = graph.compile()
# =========================================================
# 8️⃣ 演示(多轮对话示例)
# =========================================================
if __name__ == "__main__":
# ---- 第一轮 ----
state: GraphState = {
"question": "Explain RAG in simple terms.",
"context": "",
"chat_history": [],
"raw_response": "",
"result": None
}
output = app.invoke(state)
print("=== Turn 1 ===")
print("Answer:", output["result"].answer)
print("Sources:", output["result"].sources)
# ---- 第二轮(利用历史记忆) ----
state = {
"question": "Why is it useful?",
"context": "",
"chat_history": output["chat_history"],
"raw_response": "",
"result": None
}
output = app.invoke(state)
print("\n=== Turn 2 ===")
print("Answer:", output["result"].answer)
print("Sources:", output["result"].sources)
解释与流程说明
该示例展示了一个完整的 多轮 RAG 系统:
-
检索节点:使用 FAISS 向量检索找到与当前问题最相关的文档,生成上下文。
-
LLM 节点:将上下文和聊天历史整合进 Prompt,调用智谱 AI 大模型生成回答,并要求严格返回 JSON 格式。
-
解析节点:解析模型输出 JSON,转换成结构化的
RagAnswer对象,并将问答对添加到聊天历史,保证下一轮对话可以使用历史信息。 -
多轮对话:通过 LangGraph 状态图(StateGraph)控制流程,每轮都将历史问答作为上下文,模型可以生成更连贯、贴合逻辑的回答。
-
结构化输出:保证返回内容可直接程序化处理,包括回答文本和引用来源。
。
更多推荐



所有评论(0)