Langfuse 链路追踪在基于 Agent 的 RAG 系统中的实现原理与代码实践
摘要: Langfuse是一款专为LLM应用设计的可观测性工具,通过链路追踪(Tracing)技术监控Agent-RAG系统的全流程(Agent调度、RAG检索、LLM调用等)。其核心原理是以TraceID串联操作,形成树状调用链(Trace为根节点,Span记录子操作耗时和状态)。在RAG系统中,Langfuse可追踪用户提问→Agent解析→文档检索→LLM生成→结果返回的全链路,并通过Pyt
Langfuse 是一款专注于 LLM 应用可观测性的工具,提供链路追踪(Tracing)、日志记录、性能分析等功能,特别适合监控基于 Agent 的 RAG 问答系统中复杂的交互流程(如 Agent 调度、文档检索、LLM 调用等环节)。
一、Langfuse 链路追踪的核心原理
🕊️链路追踪(Tracing)的核心是通过追踪 ID(Trace ID) 将系统中多个关联的操作(如 Agent 调用、RAG 检索、LLM 生成)串联成一个完整的 “调用链”,从而可视化整个流程的执行路径、耗时和状态🕊️
-
核心概念
- Trace(追踪):一个完整的用户请求流程(如 “用户提问→Agent 处理→RAG 检索→LLM 生成→返回结果”)对应一个 Trace,用唯一的
trace_id
标识。 - Span(跨度):Trace 中的每个具体操作(如 “Agent 调用”“RAG 检索”)称为 Span,包含操作名称、开始 / 结束时间、耗时、状态(成功 / 失败)等信息。
- 父子关系:Span 之间通过
parent_span_id
建立层级关系(如 “Agent 调用” 是 “RAG 检索” 的父 Span),形成树状结构。 - 元数据(Metadata):每个 Span 可附加自定义信息(如 Agent 名称、检索到的文档 ID、LLM 模型名等),用于问题定位。
- Trace(追踪):一个完整的用户请求流程(如 “用户提问→Agent 处理→RAG 检索→LLM 生成→返回结果”)对应一个 Trace,用唯一的
-
实现原理
- 当用户请求进入系统时,Langfuse 生成一个全局唯一的
trace_id
。 - 系统执行的每个步骤(如 Agent 初始化、RAG 检索、LLM 调用)都会创建一个 Span,并关联到同一个
trace_id
。 - 每个 Span 记录开始时间、结束时间、耗时等数据,并通过
parent_span_id
关联到父操作,形成完整链路。 - 所有 Span 数据被发送到 Langfuse 服务,通过 UI 可视化展示整个调用链的执行情况。
- 当用户请求进入系统时,Langfuse 生成一个全局唯一的
二、在基于 Agent 的 RAG 系统中的实现逻辑
🕊️基于 Agent 的 RAG 系统通常包含以下流程:用户提问 → Agent 解析问题 → RAG 检索相关文档 → Agent 调用 LLM 生成答案 → 返回结果
🕊️Langfuse 链路追踪需要覆盖上述每个环节,具体逻辑如下:
- 初始化 Trace:用户请求进入时,创建一个根 Trace,记录全局上下文(如用户 ID、提问内容)。
- 创建 Span 追踪关键步骤:
- Agent 调度 Span:记录 Agent 接收请求、解析问题的耗时和状态。
- RAG 检索 Span:记录检索文档的数量、耗时、文档 ID 等。
- LLM 调用 Span:记录调用的模型名、输入 / 输出 Token 数、耗时、响应内容等。
- 结果组装 Span:记录 Agent 整合检索结果和 LLM 输出的耗时。
- 关联 Span 层级:通过
parent_span_id
建立父子关系(如 RAG 检索和 LLM 调用是 Agent 调度的子操作)。 - 发送数据到 Langfuse:所有 Span 数据异步发送到 Langfuse 服务,用于可视化和分析。
三、代码实现(基于 Python 的 Agent-RAG 系统)
🕊️以下是在基于 Agent 的 RAG 系统中集成 Langfuse 链路追踪的具体代码,以 langfuse-python
SDK 为例:
1.安装环境:
# 安装依赖
pip install langfuse python-dotenv # Langfuse SDK 和环境变量管理
2. 配置 Langfuse
🕊️在项目根目录创建 .env
文件,填入 Langfuse 服务的 API 密钥(从 Langfuse 控制台获取):
LANGFUSE_PUBLIC_KEY=pk-xxx # 公钥
LANGFUSE_SECRET_KEY=sk-xxx # 私钥
LANGFUSE_HOST=http://localhost:3000 # 本地部署的 Langfuse 服务地址(默认)
import os
import time
from dotenv import load_dotenv
from langfuse import Langfuse
from langfuse.model import CreateTrace, CreateSpan, UpdateSpan
# 加载环境变量并初始化 Langfuse
load_dotenv()
langfuse = Langfuse(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host=os.getenv("LANGFUSE_HOST")
)
# 模拟 RAG 检索模块
class RAGRetriever:
@staticmethod
def retrieve(query: str, trace_id: str, parent_span_id: str) -> list:
"""模拟 RAG 检索文档,返回文档列表"""
# 创建 RAG 检索 Span(父 Span 为 Agent 调度)
span = langfuse.span(
CreateSpan(
trace_id=trace_id,
parent_span_id=parent_span_id,
name="rag_retrieval", # Span 名称:RAG 检索
input={"query": query} # 记录输入参数
)
)
try:
# 模拟检索耗时
time.sleep(0.5)
# 模拟检索结果(返回 2 个文档)
documents = [
{"id": "doc_1", "content": "Langfuse 是一款 LLM 可观测性工具"},
{"id": "doc_2", "content": "链路追踪可记录系统调用流程"}
]
# 更新 Span:记录输出和状态
span.update(
UpdateSpan(
output={"documents": [doc["id"] for doc in documents]}, # 记录文档 ID
status="success",
metadata={"doc_count": len(documents)} # 附加元数据:文档数量
)
)
return documents
except Exception as e:
# 出错时更新 Span 状态
span.update(
UpdateSpan(
status="error",
metadata={"error": str(e)}
)
)
raise e
finally:
# 结束 Span(自动计算耗时)
span.end()
# 模拟 LLM 调用模块
class LLMClient:
@staticmethod
def generate(prompt: str, trace_id: str, parent_span_id: str) -> str:
"""模拟 LLM 生成答案"""
# 创建 LLM 调用 Span(父 Span 为 Agent 调度)
span = langfuse.span(
CreateSpan(
trace_id=trace_id,
parent_span_id=parent_span_id,
name="llm_generation", # Span 名称:LLM 生成
input={"prompt": prompt}
)
)
try:
# 模拟 LLM 调用耗时
time.sleep(1.2)
# 模拟生成结果
answer = f"根据检索到的信息,回答:{prompt} 答案是...(基于文档内容)"
# 更新 Span:记录输出和 Token 用量
span.update(
UpdateSpan(
output={"answer": answer},
status="success",
metadata={"input_tokens": 120, "output_tokens": 80} # 模拟 Token 统计
)
)
return answer
except Exception as e:
span.update(
UpdateSpan(status="error", metadata={"error": str(e)})
)
raise e
finally:
span.end()
# 基于 Agent 的 RAG 系统核心逻辑
class RAGAgent:
def __init__(self, name: str):
self.name = name
self.retriever = RAGRetriever()
self.llm = LLMClient()
def run(self, user_query: str) -> str:
"""处理用户查询的完整流程"""
# 1. 创建根 Trace(全局唯一)
trace = langfuse.trace(
CreateTrace(
name="rag_agent_query", # Trace 名称:RAG Agent 处理查询
input={"user_query": user_query}, # 记录用户查询
metadata={"agent_name": self.name, "user_id": "user_123"} # 附加元数据
)
)
trace_id = trace.id # 获取全局 Trace ID
# 2. 创建 Agent 调度 Span(根 Span,无父 Span)
agent_span = langfuse.span(
CreateSpan(
trace_id=trace_id,
name="agent_execution", # Span 名称:Agent 执行
input={"user_query": user_query}
)
)
agent_span_id = agent_span.id
try:
# 3. 步骤 1:RAG 检索(子 Span,父 Span 为 agent_execution)
documents = self.retriever.retrieve(
query=user_query,
trace_id=trace_id,
parent_span_id=agent_span_id
)
# 4. 步骤 2:构建提示词(无需单独 Span,可合并到 Agent 调度)
prompt = f"基于以下文档回答问题:{user_query}\n文档:{[d['content'] for d in documents]}"
# 5. 步骤 3:调用 LLM 生成答案(子 Span,父 Span 为 agent_execution)
answer = self.llm.generate(
prompt=prompt,
trace_id=trace_id,
parent_span_id=agent_span_id
)
# 6. 更新 Agent 调度 Span 状态
agent_span.update(
UpdateSpan(
output={"final_answer": answer},
status="success"
)
)
return answer
except Exception as e:
agent_span.update(
UpdateSpan(status="error", metadata={"error": str(e)})
)
raise e
finally:
# 结束 Agent 调度 Span 和根 Trace
agent_span.end()
trace.end()
# 测试代码
if __name__ == "__main__":
# 创建 RAG Agent 实例
agent = RAGAgent(name="fact_extractor_agent")
# 模拟用户查询
user_query = "什么是 Langfuse 的链路追踪?"
# 执行 Agent 处理流程
result = agent.run(user_query)
print("最终回答:", result)
四、链路追踪的可视化与分析
-
查看追踪结果:
🕊️运行代码后,访问 Langfuse 控制台(默认http://localhost:3000
),可在 “Traces” 页面看到生成的调用链:- 根 Trace 显示整个用户请求的总耗时。
- 子 Span 按层级展示
agent_execution
(Agent 调度)、rag_retrieval
(RAG 检索)、llm_generation
(LLM 生成)的执行情况。 - 每个 Span 可查看详细信息(输入 / 输出、元数据、耗时等)。
-
关键分析维度:
- 性能瓶颈:通过 Span 耗时定位慢操作(如 LLM 调用耗时过长)。
- 错误追踪:当流程失败时,通过错误状态的 Span 快速定位问题环节(如 RAG 检索失败)。
- 资源消耗:通过 LLM 的 Token 用量统计,分析成本和效率。
致谢
⊹꙳🫧˖✧谢谢大家的阅读,很多不足支出,欢迎大家在评论区指出,如果我的内容对你有帮助,
可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!
请赐予我平静,去接受我无法改变的 :赐予我勇气,去改变我能改变的!
更多推荐
所有评论(0)