Langfuse 是一款专注于 LLM 应用可观测性的工具,提供链路追踪(Tracing)、日志记录、性能分析等功能,特别适合监控基于 Agent 的 RAG 问答系统中复杂的交互流程(如 Agent 调度、文档检索、LLM 调用等环节)。

一、Langfuse 链路追踪的核心原理

🕊️链路追踪(Tracing)的核心是通过追踪 ID(Trace ID) 将系统中多个关联的操作(如 Agent 调用、RAG 检索、LLM 生成)串联成一个完整的 “调用链”,从而可视化整个流程的执行路径、耗时和状态🕊️

  1. 核心概念

    • Trace(追踪):一个完整的用户请求流程(如 “用户提问→Agent 处理→RAG 检索→LLM 生成→返回结果”)对应一个 Trace,用唯一的 trace_id 标识。
    • Span(跨度):Trace 中的每个具体操作(如 “Agent 调用”“RAG 检索”)称为 Span,包含操作名称、开始 / 结束时间、耗时、状态(成功 / 失败)等信息。
    • 父子关系:Span 之间通过 parent_span_id 建立层级关系(如 “Agent 调用” 是 “RAG 检索” 的父 Span),形成树状结构。
    • 元数据(Metadata):每个 Span 可附加自定义信息(如 Agent 名称、检索到的文档 ID、LLM 模型名等),用于问题定位。
  2. 实现原理

    • 当用户请求进入系统时,Langfuse 生成一个全局唯一的 trace_id
    • 系统执行的每个步骤(如 Agent 初始化、RAG 检索、LLM 调用)都会创建一个 Span,并关联到同一个 trace_id
    • 每个 Span 记录开始时间、结束时间、耗时等数据,并通过 parent_span_id 关联到父操作,形成完整链路。
    • 所有 Span 数据被发送到 Langfuse 服务,通过 UI 可视化展示整个调用链的执行情况。
二、在基于 Agent 的 RAG 系统中的实现逻辑

🕊️基于 Agent 的 RAG 系统通常包含以下流程:
用户提问 → Agent 解析问题 → RAG 检索相关文档 → Agent 调用 LLM 生成答案 → 返回结果

🕊️Langfuse 链路追踪需要覆盖上述每个环节,具体逻辑如下:

  1. 初始化 Trace:用户请求进入时,创建一个根 Trace,记录全局上下文(如用户 ID、提问内容)。
  2. 创建 Span 追踪关键步骤
    • Agent 调度 Span:记录 Agent 接收请求、解析问题的耗时和状态。
    • RAG 检索 Span:记录检索文档的数量、耗时、文档 ID 等。
    • LLM 调用 Span:记录调用的模型名、输入 / 输出 Token 数、耗时、响应内容等。
    • 结果组装 Span:记录 Agent 整合检索结果和 LLM 输出的耗时。
  3. 关联 Span 层级:通过 parent_span_id 建立父子关系(如 RAG 检索和 LLM 调用是 Agent 调度的子操作)。
  4. 发送数据到 Langfuse:所有 Span 数据异步发送到 Langfuse 服务,用于可视化和分析。
三、代码实现(基于 Python 的 Agent-RAG 系统)

🕊️以下是在基于 Agent 的 RAG 系统中集成 Langfuse 链路追踪的具体代码,以 langfuse-python SDK 为例:

1.安装环境:

# 安装依赖
pip install langfuse python-dotenv  # Langfuse SDK 和环境变量管理

2. 配置 Langfuse

🕊️在项目根目录创建 .env 文件,填入 Langfuse 服务的 API 密钥(从 Langfuse 控制台获取):

LANGFUSE_PUBLIC_KEY=pk-xxx  # 公钥
LANGFUSE_SECRET_KEY=sk-xxx  # 私钥
LANGFUSE_HOST=http://localhost:3000  # 本地部署的 Langfuse 服务地址(默认)
import os
import time
from dotenv import load_dotenv
from langfuse import Langfuse
from langfuse.model import CreateTrace, CreateSpan, UpdateSpan

# 加载环境变量并初始化 Langfuse
load_dotenv()
langfuse = Langfuse(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host=os.getenv("LANGFUSE_HOST")
)


# 模拟 RAG 检索模块
class RAGRetriever:
    @staticmethod
    def retrieve(query: str, trace_id: str, parent_span_id: str) -> list:
        """模拟 RAG 检索文档,返回文档列表"""
        # 创建 RAG 检索 Span(父 Span 为 Agent 调度)
        span = langfuse.span(
            CreateSpan(
                trace_id=trace_id,
                parent_span_id=parent_span_id,
                name="rag_retrieval",  # Span 名称:RAG 检索
                input={"query": query}  # 记录输入参数
            )
        )
        
        try:
            # 模拟检索耗时
            time.sleep(0.5)
            # 模拟检索结果(返回 2 个文档)
            documents = [
                {"id": "doc_1", "content": "Langfuse 是一款 LLM 可观测性工具"},
                {"id": "doc_2", "content": "链路追踪可记录系统调用流程"}
            ]
            # 更新 Span:记录输出和状态
            span.update(
                UpdateSpan(
                    output={"documents": [doc["id"] for doc in documents]},  # 记录文档 ID
                    status="success",
                    metadata={"doc_count": len(documents)}  # 附加元数据:文档数量
                )
            )
            return documents
        except Exception as e:
            # 出错时更新 Span 状态
            span.update(
                UpdateSpan(
                    status="error",
                    metadata={"error": str(e)}
                )
            )
            raise e
        finally:
            # 结束 Span(自动计算耗时)
            span.end()


# 模拟 LLM 调用模块
class LLMClient:
    @staticmethod
    def generate(prompt: str, trace_id: str, parent_span_id: str) -> str:
        """模拟 LLM 生成答案"""
        # 创建 LLM 调用 Span(父 Span 为 Agent 调度)
        span = langfuse.span(
            CreateSpan(
                trace_id=trace_id,
                parent_span_id=parent_span_id,
                name="llm_generation",  # Span 名称:LLM 生成
                input={"prompt": prompt}
            )
        )
        
        try:
            # 模拟 LLM 调用耗时
            time.sleep(1.2)
            # 模拟生成结果
            answer = f"根据检索到的信息,回答:{prompt} 答案是...(基于文档内容)"
            # 更新 Span:记录输出和 Token 用量
            span.update(
                UpdateSpan(
                    output={"answer": answer},
                    status="success",
                    metadata={"input_tokens": 120, "output_tokens": 80}  # 模拟 Token 统计
                )
            )
            return answer
        except Exception as e:
            span.update(
                UpdateSpan(status="error", metadata={"error": str(e)})
            )
            raise e
        finally:
            span.end()


# 基于 Agent 的 RAG 系统核心逻辑
class RAGAgent:
    def __init__(self, name: str):
        self.name = name
        self.retriever = RAGRetriever()
        self.llm = LLMClient()

    def run(self, user_query: str) -> str:
        """处理用户查询的完整流程"""
        # 1. 创建根 Trace(全局唯一)
        trace = langfuse.trace(
            CreateTrace(
                name="rag_agent_query",  # Trace 名称:RAG Agent 处理查询
                input={"user_query": user_query},  # 记录用户查询
                metadata={"agent_name": self.name, "user_id": "user_123"}  # 附加元数据
            )
        )
        trace_id = trace.id  # 获取全局 Trace ID

        # 2. 创建 Agent 调度 Span(根 Span,无父 Span)
        agent_span = langfuse.span(
            CreateSpan(
                trace_id=trace_id,
                name="agent_execution",  # Span 名称:Agent 执行
                input={"user_query": user_query}
            )
        )
        agent_span_id = agent_span.id

        try:
            # 3. 步骤 1:RAG 检索(子 Span,父 Span 为 agent_execution)
            documents = self.retriever.retrieve(
                query=user_query,
                trace_id=trace_id,
                parent_span_id=agent_span_id
            )

            # 4. 步骤 2:构建提示词(无需单独 Span,可合并到 Agent 调度)
            prompt = f"基于以下文档回答问题:{user_query}\n文档:{[d['content'] for d in documents]}"

            # 5. 步骤 3:调用 LLM 生成答案(子 Span,父 Span 为 agent_execution)
            answer = self.llm.generate(
                prompt=prompt,
                trace_id=trace_id,
                parent_span_id=agent_span_id
            )

            # 6. 更新 Agent 调度 Span 状态
            agent_span.update(
                UpdateSpan(
                    output={"final_answer": answer},
                    status="success"
                )
            )
            return answer
        except Exception as e:
            agent_span.update(
                UpdateSpan(status="error", metadata={"error": str(e)})
            )
            raise e
        finally:
            # 结束 Agent 调度 Span 和根 Trace
            agent_span.end()
            trace.end()


# 测试代码
if __name__ == "__main__":
    # 创建 RAG Agent 实例
    agent = RAGAgent(name="fact_extractor_agent")
    # 模拟用户查询
    user_query = "什么是 Langfuse 的链路追踪?"
    # 执行 Agent 处理流程
    result = agent.run(user_query)
    print("最终回答:", result)
四、链路追踪的可视化与分析
  1. 查看追踪结果
    🕊️运行代码后,访问 Langfuse 控制台(默认 http://localhost:3000),可在 “Traces” 页面看到生成的调用链:

    • 根 Trace 显示整个用户请求的总耗时。
    • 子 Span 按层级展示 agent_execution(Agent 调度)、rag_retrieval(RAG 检索)、llm_generation(LLM 生成)的执行情况。
    • 每个 Span 可查看详细信息(输入 / 输出、元数据、耗时等)。
  2. 关键分析维度

    • 性能瓶颈:通过 Span 耗时定位慢操作(如 LLM 调用耗时过长)。
    • 错误追踪:当流程失败时,通过错误状态的 Span 快速定位问题环节(如 RAG 检索失败)。
    • 资源消耗:通过 LLM 的 Token 用量统计,分析成本和效率。

致谢

⊹꙳🫧˖✧谢谢大家的阅读,很多不足支出,欢迎大家在评论区指出,如果我的内容对你有帮助,

可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!

请赐予我平静,去接受我无法改变的 :赐予我勇气,去改变我能改变的!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐