如何把基于 LangChain / LlamaIndex 的智能工作流,接入企业监控与告警体系


在前五篇里,我们已经把企业智能助手一路从「能答题」进化到「能跑业务流程」:

  1. ​(一)​ DeepSeek + RAG:企业知识库问答
  2. ​(二)​ 接入钉钉 / 企业微信:在 IM 里 @ 机器人就能问
  3. ​(三)​ 多轮对话 + 工具调用:会澄清、会查系统、会办事
  4. ​(四)​ 反馈与日志闭环:能根据真实使用持续优化
  5. ​(五)​ LangChain / LlamaIndex 工作流编排:把复杂业务拆成多节点智能流程

但只要真正上线到生产一段时间,一定会遇到这些问题:

  • 某个流程节点偶尔卡住,用户反馈「又卡死了」,你却不知道卡在哪一步
  • 某天 DeepSeek 账单突然暴涨,事后才发现有人用机器人批量生成大段内容,成本报警为时已晚
  • 用户说「这个回答完全错」,你翻日志发现只看到几条简单记录,根本复现不了当时的链路
  • 外部依赖系统(审批、CRM、采购)偶发慢请求 / 超时,导致整条工作流表现不稳定,但你没有统一视角去观察整条链路

这些问题都指向同一件事:

你的 LangChain / LlamaIndex 工作流,需要被纳入一个企业级的监控与告警体系,而不是「黑盒脚本」。

这一篇,就带你从实战角度落地这件事。


一、先搞清楚:到底要监控什么?

针对「基于 LangChain / LlamaIndex 的智能工作流」,建议把监控拆成三层来看:

1. 基础设施层(Infra)

这是传统服务已有的一层,避免遗忘:

  • 服务存活:工作流服务 / RAG 服务 / IM 网关是否正常
  • 资源利用:CPU、内存、磁盘、网络
  • 依赖组件:Redis、数据库、消息队列等

一般用 Prometheus + Grafana、云监控(阿里云、腾讯云等)就能搞定。

2. 应用层(AI / 工作流)

这是这篇文章的重点,也是传统监控体系最容易缺失的一层,需要面向:

  1. LLM 调用维度

    • 单次调用耗时(latency)
    • 请求 / 响应 token 数
    • 不同模型 / 不同 Prompt 的调用次数
    • 错误率(超时、429 限流、5xx、SDK 异常等)
  2. RAG / LlamaIndex 检索维度

    • 检索延迟(embedding + 向量查询)
    • Top-K 相似度分布(命中质量高不高)
    • 命中文档数量、索引中断 / 失败
  3. LangChain / LangGraph 工作流维度

    • 单个工作流实例总耗时
    • 每个节点(Node / Chain / Agent)的耗时
    • 节点成功率 / 失败率 / 重试次数
    • 工具调用(Tool Call)成功率、耗时、错误原因
  4. 对话 / 用户维度

    • 每次任务涉及的对话轮数
    • 同一用户并发任务数量(防滥用)
    • 用户侧可感知的延迟(从发起到拿到结果的时间)

3. 业务层(流程与价值)

这是「给老板看」的一层:

  • 流程成功率(如采购流程从发起到完成的成功率)
  • 流程平均用时(人力+AI 完成一次流程的平均时间)
  • 人工介入率(多少流程需要人工兜底)
  • 用户满意度、投诉率(来自第(四)篇的反馈闭环)

原则:监控不只是「系统活着没」,而是要回答三件事:

  1. 系统跑得稳不稳?
  2. 钱花得合不合理(LLM 调用成本)?
  3. 业务上到底有没有帮到人?

二、技术路线:LangSmith + OpenTelemetry + 你现有的监控栈

1. 为什么推荐用 OpenTelemetry

现在主流 AI 监控/观测方案,基本都围绕 OpenTelemetry(OTel)展开:

  • 标准化 Trace / Metric / Log 协议
  • 支持多种导出端(LangSmith、Jaeger、Tempo、Elastic 等)
  • 有专门针对 LangChain 的 instrumentation,可以自动把 Chain、Agent、Tool 的调用转成 Trace

对你来说的意义:

  • 保持与现有 Prometheus / Grafana、APM 体系兼容
  • 可以先把数据打到 LangSmith 用来调试和评估,未来想自建也可以迁移
  • 不需要在代码里写一堆“自研 tracer”,只要在关键位置打少量埋点即可

2. LangSmith 适合干什么?

LangSmith 是 LangChain 官方的观测与评估平台,优势在于:

  • 开箱可视化 Chain / Agent / Tool 调用链路
  • 和 LangChain / LangGraph 默认兼容,改一行环境变量就能开始追踪
  • 支持记录 Prompt 版本、评估 LLM 输出质量、对比不同版本链路

结合 OpenTelemetry,你可以:

  • 用 LangSmith 看「AI 视角」的 Trace(Prompt、工具调用、RAG 命中情况)
  • 用 Prometheus / Grafana 看「系统视角」的 Metric(QPS、延迟、错误率)

对于中小团队,非常适合先用 LangSmith + 自家监控栈双轮驱动。


三、落地步骤:给你的工作流上“眼睛”和“神经”

假设你已经有了第(五)篇中的:

  • core/workflow_graph.py:用 LangGraph 定义的采购工作流
  • agents/*:各个节点用的 Agent(采购解析、审批、供应商选择等)
  • agents/document_agent.py:封装了 LlamaIndex 的政策查询
  • api/workflow_entry.py:对外暴露 /start_procurement 等 HTTP 接口

下面按「最小可行改造」给出接入方案。

步骤 1:初始化 OpenTelemetry(一次性)

新建一个 config/telemetry.py,负责在应用启动时初始化:

# config/telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter

from opentelemetry.instrumentation.langchain import LangChainInstrumentor

import os

def init_telemetry():
    # LangSmith 环境变量(如不使用 LangSmith,可改为你自建 OTEL Collector 地址)
    os.environ.setdefault("LANGCHAIN_TRACING_V2", "true")
    os.environ.setdefault("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com")
    os.environ.setdefault("LANGCHAIN_API_KEY", "YOUR_LANGSMITH_KEY")
    os.environ.setdefault("LANGCHAIN_PROJECT", "enterprise-ai-workflow")

    # Trace Provider
    trace_provider = TracerProvider()
    span_exporter = OTLPSpanExporter(  # 可指向 LangSmith 或 OTEL Collector
        endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "https://api.smith.langchain.com/traces")
    )
    span_processor = BatchSpanProcessor(span_exporter)
    trace_provider.add_span_processor(span_processor)
    trace.set_tracer_provider(trace_provider)

    # Metrics Provider(可选)
    metric_exporter = OTLPMetricExporter(
        endpoint=os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "http://localhost:4318/v1/metrics")
    )
    metric_reader = PeriodicExportingMetricReader(metric_exporter)
    meter_provider = MeterProvider(metric_readers=[metric_reader])
    metrics.set_meter_provider(meter_provider)

    # LangChain 自动埋点
    LangChainInstrumentor().instrument()

    print("Telemetry initialized.")

在 FastAPI 启动时调用:

# api/workflow_entry.py
from fastapi import FastAPI
from config.telemetry import init_telemetry

init_telemetry()
app = FastAPI()

到这一步,只要你的工作流节点里用了 LangChain 的 Chain / Agent / Tools,大部分 Trace 都会自动被捕获,包括:

  • 每个 LLM 调用的输入 / 输出
  • Chain 串联关系
  • 工具调用和返回结果(截断后)

步骤 2:为工作流入口加上「业务 Trace」

我们希望每次发起采购流程,都能在 Trace 中看到完整的「业务上下文」。

# api/workflow_entry.py
from fastapi import FastAPI, Request
from opentelemetry import trace
from core.workflow_graph import create_procurement_workflow
from core.state import ProcurementState

tracer = trace.get_tracer(__name__)
app = FastAPI()

@app.post("/start_procurement")
async def start_procurement(request: Request):
    payload = await request.json()
    user_id = payload.get("user_id")
    platform = payload.get("platform", "unknown")
    user_input = payload.get("query", "")

    # 创建根 Span
    with tracer.start_as_current_span("procurement_workflow") as span:
        span.set_attribute("user.id", user_id)
        span.set_attribute("platform", platform)
        span.set_attribute("workflow.type", "procurement")

        # 初始化状态对象
        state = ProcurementState(applicant=user_id)
        state.logs.append({"type": "user_input", "content": user_input})

        # 创建工作流(LangGraph)
        workflow = create_procurement_workflow()
        final_state = workflow.invoke(state)

        span.set_attribute("workflow.status", final_state.approval_status)
        span.set_attribute("procurement.id", final_state.procurement_id)

        return {
            "procurement_id": final_state.procurement_id,
            "approval_status": final_state.approval_status
        }

这样,LangSmith / OTel 里就会看到一条完整的 Trace:

  • 根 Span:procurement_workflow
  • 子 Span:
    • parse_request(LLM 解析金额 / 用途)
    • run_approval(查规则 + 调审批系统)
    • select_supplier(供应商 Agent)
    • generate_contract(合同生成)
  • 每个 Span 下还有 LLM 调用、工具调用的细节

步骤 3:对业务关键节点手动打点(Metric)

例如在第(五)篇里的 run_approval_agent 节点里,我们加上业务指标:

# agents/approval_agent.py
from core.state import ProcurementState
from opentelemetry import trace, metrics

tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

# 采购审批次数
approval_counter = meter.create_counter(
    "workflow.procurement.approval.count",
    description="Number of procurement approval decisions"
)

def run_approval_agent(state: ProcurementState) -> ProcurementState:
    with tracer.start_as_current_span("approval_agent") as span:
        # ... 原有审批逻辑,查询 LlamaIndex、决定 approver 等 ...

        # 打业务指标 & Trace 属性
        approval_counter.add(1, {
            "status": state.approval_status,
            "amount_range": _amount_range_label(state.amount)
        })
        span.set_attribute("approval.status", state.approval_status)
        span.set_attribute("approval.approver", state.approver)
        span.set_attribute("amount", state.amount)

        return state

def _amount_range_label(amount: float) -> str:
    if amount < 5000:
        return "<5k"
    elif amount < 20000:
        return "5k-20k"
    else:
        return ">20k"

此时,在 Prometheus / Grafana 里,你就可以直接画图:

  • 不同金额段的审批量、失败率
  • 某段时间内「pending_ceo」状态的趋势

步骤 4:把 LlamaIndex 的检索效果纳入观测

agents/document_agent.py 里封装 LlamaIndex RAG 时,建议:

  1. 使用统一的 Query Engine
  2. 在每次查询前后加 Span + 部分指标
# agents/document_agent.py
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from opentelemetry import trace, metrics

tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

retrieval_counter = meter.create_counter(
    "rag.query.count",
    description="RAG query count"
)
retrieval_latency = meter.create_histogram(
    "rag.query.latency",
    description="RAG query latency (ms)"
)

_documents = SimpleDirectoryReader("policies/").load_data()
_embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
_index = VectorStoreIndex.from_documents(_documents, embed_model=_embed_model)
_llm = OpenAI(model="deepseek-reasoner-671b")

def query_policy(query: str) -> str:
    import time
    with tracer.start_as_current_span("rag_query") as span:
        span.set_attribute("rag.query", query)
        start = time.time()

        engine = _index.as_query_engine(
            similarity_top_k=3,
            llm=_llm
        )
        resp = engine.query(query)

        elapsed_ms = (time.time() - start) * 1000
        retrieval_counter.add(1)
        retrieval_latency.record(elapsed_ms)

        span.set_attribute("rag.latency_ms", elapsed_ms)
        # 可以适当记录命中文档信息(注意长度和隐私)
        span.set_attribute("rag.source_count", len(resp.source_nodes))

        return resp.response

这样,当 RAG 效果变差 / 延迟升高时,你可以:

  • 快速定位是 Embedding、向量库、还是 LLM 回复的问题
  • 对比不同索引版本的表现(方便做「索引升级 A/B 测试」)

四、告警体系:出问题要第一时间“叫人”

有了 Metric/Trace 之后,下一步就是定义 告警规则通知通道

1. 建议重点告警的几类场景

  1. 工作流执行异常

    • 单个流程失败率在 5 分钟内 > 5%
    • 某个节点(比如 run_approval)错误率 > 2%
  2. 延迟异常

    • LLM 调用 P95 延迟 > 30s
    • 工作流总耗时 P95 > 300s
  3. 成本异常

    • 1 小时内总 token 消耗比近 7 日平均值高出 50%
    • 单次请求 token 数 > 某个阈值(如 8k tokens)
  4. 检索效果异常

    • RAG 查询平均相似度低于某个阈值(需要你在代码里记录)
    • 某个索引版本上线后,流程成功率明显下降
  5. 业务 SLA 异常

    • 采购流程 24 小时内未完成的占比 > 10%
    • 客服投诉流程人工介入率 > 30%

2. 告警通知接到哪里?

结合你前面已经接入的钉钉 / 企业微信,其实可以直接复用机器人通道,专门创建一个「AI 系统告警群」。

示例:封装一个简单的告警通知工具(钉钉 / 企微任选四一个):

# tools/alert_notifier.py
import os
import requests

ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK")  # 钉钉/企微机器人地址

def send_alert(title: str, content: str, level: str = "warning"):
    if not ALERT_WEBHOOK:
        return

    markdown_text = f"""**[{level.upper()}] {title}**  

{content}
"""
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "title": title,
            "text": markdown_text
        }
    }
    try:
        requests.post(ALERT_WEBHOOK, json=payload, timeout=5)
    except Exception:
        # 生产环境可以写到本地日志
        pass

在工作流关键节点中触发:

from tools.alert_notifier import send_alert

# 比如:发现审批状态长时间为 pending_ceo
if state.approval_status == "pending_ceo" and waiting_seconds > 7200:
    send_alert(
        "采购审批长时间未处理",
        f"采购单 {state.procurement_id} 已等待 CEO 审批超过 {waiting_seconds/3600:.1f} 小时",
        level="critical"
    )

五、和第(四)篇的「反馈闭环」怎么融合?

第(四)篇我们已经把:

  • 对话日志
  • 用户评分 / 投诉
  • 成功 / 失败 / 转人工标记

串成了一个反馈分析体系。

现在接入监控之后,你可以做一件非常有价值的事情:

把“主观反馈”与“客观监控指标”关联起来。​

举个例子:

  • 某一周采购流程的用户评分突然下降,你在日志里看到大量「太慢」「等太久了」的反馈
  • 打开监控仪表盘,发现:
    • LLM 延迟其实还好
    • 真正变慢的是「供应商比价节点」调用的外部接口

这样,研发团队知道要去优化 / 扩容外部服务,而不是盲目怀疑「是不是模型变笨了」。

再比如:

  • 某天开始,客服场景的「人工介入率」猛增
  • 监控显示:
    • RAG 检索命中率正常
    • 但某个「价格策略」相关的工作流节点错误率升高
  • 一查 LlamaIndex 索引版本,发现前一天刚把「价格策略」文档替换成了测试版本

这种问题,如果没有统一的监控 + Trace,很容易变成“口水仗”;
有了之后,可以迅速定位「哪个版本的索引 / Prompt / 工作流节点」导致问题。


六、从“能看”走向“能自愈”(进阶方向)

当你把监控 + 告警打通之后,可以考虑做一点自动化恢复 / 自我保护逻辑,比如:

  1. LLM 延迟 / 错误率飙升时

    • 自动降低调用频率(限流)
    • 临时切换到 fallback 模型(如由大模型切换小一点的模型)
    • 对非关键流程返回「系统繁忙」提示,避免堆积
  2. RAG 命中率长期偏低

    • 自动降级为模板化知识回答(例如只返回固定 FAQ)
    • 标记当前索引版本为「疑似异常」,阻止进一步索引更新
    • 触发「索引健康检查」任务(检查文档数量、embedding 模型是否变更等)
  3. 某个工作流节点出错率高

    • 自动把该节点标记为「观望状态」,暂时走简化流程
    • 把相关信息推送给负责人(工程师 / 业务 owner)进行核查
    • 在 LangSmith 上自动汇总该节点的失败 Trace 供排查

这些逻辑不用一上来就全部实现,但你需要在设计时预留扩展点:让每个工作流节点对异常情况有「明确的枚举输出」,而不是简单 raise Exception 抛给最外层。


七、总结:让 LangChain / LlamaIndex 工作流成为“可观测的企业基础设施”

通过这一篇,你应该已经有一个清晰的升级路线:

  1. 把 Telemetry 接入到 LangChain / LlamaIndex 工作流中

    • 使用 OpenTelemetry 统一收集 Trace / Metric
    • 使用 LangSmith 可视化 AI 链路,结合现有监控栈
  2. 在关键业务节点打点

    • 工作流入口:记录 user_id、platform、workflow_type 等业务上下文
    • 每个节点:记录状态、耗时、错误、业务字段(amount、approver 等)
    • RAG 层:记录检索耗时、命中文档数量/相似度
  3. 建立一套可执行的告警策略

    • LLM 质量与成本
    • 工作流的成功率、耗时
    • 业务 SLA(比如 24 小时审批完成率)
    • 将告警统一汇总到「AI 系统告警群」
  4. 和反馈闭环融合

    • 把主观评分 / 投诉与监控指标对齐
    • 发现「反复被投诉的流程节点」,进行针对性优化
  5. 为未来的“自愈 / 自动化治理”打基础

    • 节点错误时可以自动降级 / 切模型 / 切索引
    • 通过指标驱动版本回滚 / A/B 测试决策

至此,你的 DeepSeek + LangChain / LlamaIndex 企业智能系统,已经从:

「能跑起来的 Demo」

真正升级成一个:

“有监控、有告警、可观测、可审计、可持续优化”的企业级 AI 工作流平台。​

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐