LangSmith 进阶实践:基于 OpenTelemetry 构建分布式全链路追踪体系

摘要: 随着 LLM 应用架构的日益复杂,从单链路发展到多服务协作,传统的日志监控已难以满足需求。OpenTelemetry 作为云原生的可观测性标准,能够无缝集成 LangChain,实现细粒度的 Tracing(追踪)。本文将深入讲解如何配置 LangChain 的 OTEL 导出器,如何将追踪数据同时发送至 LangSmith 与其他平台,以及如何在微服务架构中实现跨服务的上下文传播。

1. 前言:为什么 LangChain 需要 OpenTelemetry?

LangChain 提供了内置的 LangSmith 追踪,但这通常局限于 LangChain 自身的生态系统。在企业级应用中,我们往往面临以下需求:

  • 统一可观测性:希望 LLM 调用的链路数据能与应用的其他性能指标(APM)聚合在同一个平台(如 Datadog, Jaeger)。
  • 多端备份:希望数据同时发送到 LangSmith 进行 LLM 专用分析,并发送到内部 Prometheus/Grafana 栈进行统一监控。
  • 分布式追踪:当应用拆分为多个服务(如一个网关服务调用 LLM 服务)时,需要一条链路贯穿始终。
    OpenTelemetry (OTEL) 正是解决这些问题的标准方案。

2. 基础篇:启用 OTEL 并将数据发送至 LangSmith

LangChain 内置了对 OpenTelemetry 的支持。最简单的使用方式是通过环境变量启用,LangChain 会自动配置一个将数据发送到 LangSmith 的导出器。

2.1 环境变量配置

无需编写任何 Python 代码,只需设置以下环境变量,即可开启 OTEL 集成:


export LANGSMITH_OTEL_ENABLED="true"
export LANGSMITH_TRACING="true"
# 确保 LANGCHAIN_API_KEY 已设置
2.2 代码示例

你的 LangChain 代码无需任何修改即可生成追踪数据。

import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 确保 API Key 已设置
os.environ["LANGCHAIN_API_KEY"] = "lsv2_sk_..."
os.environ["LANGSMITH_OTEL_ENABLED"] = "true"

# 创建一个简单的链
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = ChatOpenAI()
chain = prompt | model

# 运行链
# 此操作会自动生成 OTEL Span 并发送至 LangSmith
result = chain.invoke({"topic": "programming"})
print(result.content)

3. 进阶篇:配置备用 OTLP 端点(双写或多写)

如果你的企业使用了自己的可观测性平台(如 Grafana Tempo, Jaeger),你可能希望数据同时发送到 LangSmith 和你的平台。这可以通过自定义 TracerProvider 来实现。

3.1 禁用混合模式

默认情况下,LangChain 会同时向 LangSmith 和自定义端点发送数据。如果你只想发送到自定义端点,设置环境变量:

export LANGSMITH_OTEL_ONLY = "true"
3.2 代码实现:配置自定义导出器

下面的代码展示了如何配置一个将数据发送到自定义 OTLP 端点的导出器。

import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 1. 设置 LangChain 环境变量
os.environ["LANGSMITH_OTEL_ENABLED"] = "true"
os.environ["LANGSMITH_TRACING"] = "true"

# 2. 配置 OTLP 导出器,指向你的提供商
provider = TracerProvider()
otlp_exporter = OTLPSpanExporter(
    # 替换为你的提供商端点
    endpoint="https://otel.your-provider.com/v1/traces" ,
    # 添加认证头
    headers={"api-key": "your-api-key"}
)

# 3. 将导出器注册到 Provider
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# 4. 运行 LangChain 应用
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = ChatOpenAI()
chain = prompt | model

result = chain.invoke({"topic": "programming"})
print(result.content)

4. 专家篇:使用 Collector 实现扇出架构

在代码中硬编码多个导出器并不是最佳实践,缺乏灵活性。OpenTelemetry Collector 是一个中间件,可以接收应用发出的追踪数据,并根据配置将其扇出(Fan-out) 到多个后端。

4.1 Collector 配置文件 (otel-collector-config.yaml)

创建一个 YAML 文件,定义接收器、处理器和导出器。


receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:

exporters:
  # 导出器 1: 发送到 LangSmith
  otlphttp/langsmith:
    endpoint: https://api.smith.langchain.com/otel/v1/traces
    headers:
      x-api-key: ${env:LANGSMITH_API_KEY}
      Langsmith-Project: my_project

  # 导出器 2: 发送到其他提供商
  otlphttp/other_provider:
    endpoint: https://otel.your-provider.com/v1/traces
    headers:
      api-key: ${env:OTHER_PROVIDER_API_KEY}

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [trace_idnormalizer, batch]
      exporters: [otlphttp/langsmith, otlphttp/other_provider]
4.2 应用端配置:指向本地 Collector

此时,你的应用只需将数据发送到本地运行的 Collector(通常监听在 4318 端口),由 Collector 负责分发。

import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 指向本地 OpenTelemetry Collector
otlp_exporter = OTLPSpanExporter(
    endpoint="http://localhost:4318/v1/traces"
)

provider = TracerProvider()
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# 启用 LangChain OTEL
os.environ["LANGSMITH_OTEL_ENABLED"] = "true"
os.environ["LANGSMITH_TRACING"] = "true"

# ... 运行你的链 ...

5. 巅峰篇:微服务下的分布式追踪

当你的 LLM 应用跨越多个服务(例如,Service A 调用 LLM,然后将结果传递给 Service B 进行后处理)时,上下文传播变得至关重要。我们需要确保 Trace ID 在服务间流转,从而在后台看到一条完整的链路。

5.1 核心概念:Inject 和 Extract
  • Inject(注入):在服务 A 中,将当前的 Trace 上下文信息注入到 HTTP Headers 中。
  • Extract(提取):在服务 B 中,从 HTTP Headers 中提取 Trace 上下文,并关联到新的 Span 上。
5.2 完整代码示例

Service A:发起请求并注入上下文

import os
import requests
from opentelemetry import trace
from opentelemetry.propagate import inject
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 初始化 Tracer
tracer = trace.get_tracer(__name__)

def service_a():
    with tracer.start_as_current_span("service_a_operation") as span:
        # 1. 执行 LangChain 逻辑
        prompt = ChatPromptTemplate.from_template("Summarize: {text}")
        model = ChatOpenAI()
        chain = prompt | model
        result = chain.invoke({"text": "OpenTelemetry is an observability framework"})

        # 2. 注入上下文到 Headers
        headers = {}
        inject(headers) # 关键步骤:将 trace 信息写入 headers

        # 3. 发送请求给 Service B,携带 headers
        response = requests.post(
            "http://localhost:5000/process" ,
            headers=headers,
            json={"summary": result.content}
        )
        return response.json()

if __name__ == "__main__":
    print(service_a())
Service B:接收请求并提取上下文


from flask import Flask, request, jsonify
from opentelemetry import trace
from opentelemetry.propagate import extract
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

app = Flask(__name__)
tracer = trace.get_tracer(__name__)

@app.route("/process", methods=["POST"])
def service_b_endpoint():
    # 1. 从请求头中提取上下文
    context = extract(request.headers)

    # 2. 使用提取的上下文创建新的 Span
    with tracer.start_as_current_span("service_b_operation", context=context) as span:
        data = request.json
        summary = data.get("summary", "")

        # 3. 执行 LangChain 逻辑
        prompt = ChatPromptTemplate.from_template("Analyze the sentiment of: {text}")
        model = ChatOpenAI()
        chain = prompt | model
        result = chain.invoke({"text": summary})

        return jsonify({"analysis": result.content})

if __name__ == "__main__":
    app.run(port=5000)
    

6. 总结

通过 OpenTelemetry,LangChain 的可观测性能力得到了极大的扩展。

  • 开发阶段:直接使用 LangSmith + OTEL,快速调试链路。
  • 测试阶段:使用 Collector 扇出,同时发送到测试环境和监控平台。
  • 生产环境:在微服务架构中利用上下文传播,实现端到端的分布式追踪,快速定位跨服务故障。
    掌握这套组合拳,将使你的 LLM 应用具备企业级的可维护性与可靠性。🚀

参考资源:

LangSmith OpenTelemetry Docs
OpenTelemetry Python Documentation
OpenTelemetry Collector Configuration

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐