LangChain源码-04格式化输出
Langchain格式化输出系统解析 LangChain的格式化输出系统是一个多层次的结果处理架构,它将LLM原始文本转换为结构化数据。系统包含: 核心处理流程:从文本输入到结构化输出的完整转换链 解析器类型:包括字符串、JSON、Pydantic等多种解析器 高级功能层:提供数据清洗、验证和类型转换等能力 组合优化层:支持解析器的组合使用和性能优化 监控层:包含缓存、性能指标和错误处理机制 系统
·
04. Langchain格式化输出的源码分析
格式化输出架构概览
格式化输出系统是LangChain的结果处理器,它负责将LLM的原始文本输出转换为结构化的、易于程序处理的数据格式。让我们先理解其整体架构:
# 格式化输出架构图
output_formatting_architecture = '''
LLM原始输出 → 输出解析器 → 格式验证 → 数据转换 → 结构化结果 → 应用程序
↓ ↓ ↓ ↓ ↓ ↓
文本字符串 解析算法 类型检查 对象转换 Python对象 业务逻辑
'''
# 核心组件关系
component_relationships = '''
BaseOutputParser (抽象基类)
↑
StrOutputParser ←→ JsonOutputParser ←→ PydanticOutputParser
↑ ↑ ↑
具体实现类 具体实现类 具体实现类
↑ ↑ ↑
PandasOutputParser ←→ DatetimeOutputParser ←→ EnumOutputParser
格式化输出架构图与核心组件关系图
格式化输出架构图
查看大图:鼠标右键 → “在新标签页打开图片” → 浏览器自带放大
graph TD
subgraph "格式化输出整体架构"
LLMRawOutput["LLM原始输出<br/>Raw Text"]
OutputParser["输出解析器<br/>Output Parser"]
FormatValidation["格式验证<br/>Format Validation"]
DataTransformation["数据转换<br/>Data Transformation"]
StructuredResult["结构化结果<br/>Structured Result"]
Application["应用程序<br/>Application"]
end
subgraph "处理流程层"
TextString["文本字符串"]
ParsingAlgorithm["解析算法"]
TypeChecking["类型检查"]
ObjectConversion["对象转换"]
PythonObject["Python对象"]
BusinessLogic["业务逻辑"]
end
subgraph "解析器类型层"
StrOutputParser["StrOutputParser<br/>字符串解析器"]
JsonOutputParser["JsonOutputParser<br/>JSON解析器"]
PydanticOutputParser["PydanticOutputParser<br/>Pydantic解析器"]
StructuredOutputParser["StructuredOutputParser<br/>结构化解析器"]
DatetimeOutputParser["DatetimeOutputParser<br/>日期时间解析器"]
EnumOutputParser["EnumOutputParser<br/>枚举解析器"]
end
subgraph "高级功能层"
JsonCleaning["JSON清理"]
MarkdownExtraction["Markdown提取"]
PydanticValidation["Pydantic验证"]
RegexPatternMatching["正则表达式匹配"]
DateTimeParsing["日期时间解析"]
TypeConversion["类型转换"]
end
subgraph "组合与优化层"
CombiningParser["CombiningParser<br/>组合解析器"]
SequentialParser["SequentialParser<br/>顺序解析器"]
ConditionalParser["ConditionalParser<br/>条件解析器"]
CachedParser["CachedParser<br/>缓存解析器"]
AsyncParser["AsyncParser<br/>异步解析器"]
BatchParser["BatchParser<br/>批量解析器"]
end
subgraph "性能与监控层"
ParserCache["解析器缓存"]
PerformanceMetrics["性能指标"]
ErrorHandling["错误处理"]
ValidationResults["验证结果"]
AsyncProcessing["异步处理"]
BatchProcessing["批量处理"]
end
%% 主流程
LLMRawOutput --> OutputParser
OutputParser --> FormatValidation
FormatValidation --> DataTransformation
DataTransformation --> StructuredResult
StructuredResult --> Application
%% 处理流程反馈
TextString --> ParsingAlgorithm
ParsingAlgorithm --> TypeChecking
TypeChecking --> ObjectConversion
ObjectConversion --> PythonObject
PythonObject --> BusinessLogic
%% 解析器类型选择
OutputParser --> StrOutputParser
OutputParser --> JsonOutputParser
OutputParser --> PydanticOutputParser
OutputParser --> StructuredOutputParser
OutputParser --> DatetimeOutputParser
OutputParser --> EnumOutputParser
%% 高级功能处理
JsonOutputParser --> JsonCleaning
JsonOutputParser --> MarkdownExtraction
PydanticOutputParser --> PydanticValidation
StructuredOutputParser --> RegexPatternMatching
DatetimeOutputParser --> DateTimeParsing
EnumOutputParser --> TypeConversion
%% 组合优化处理
OutputParser --> CombiningParser
OutputParser --> SequentialParser
OutputParser --> ConditionalParser
OutputParser --> CachedParser
OutputParser --> AsyncParser
OutputParser --> BatchParser
%% 性能监控连接
OutputParser --> ParserCache
FormatValidation --> PerformanceMetrics
DataTransformation --> ErrorHandling
StructuredResult --> ValidationResults
OutputParser --> AsyncProcessing
OutputParser --> BatchProcessing
%% 双向关系
ParserCache -.-> OutputParser
PerformanceMetrics -.-> FormatValidation
ErrorHandling -.-> DataTransformation
ValidationResults -.-> StructuredResult
AsyncProcessing -.-> OutputParser
BatchProcessing -.-> OutputParser
%% 样式定义
classDef mainFlow fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef processFlow fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef parserType fill:#fff3e0,stroke:#f57c00,stroke-width:2px
classDef advancedFeature fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
classDef compositionLayer fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef performanceLayer fill:#e0f2f1,stroke:#00695c,stroke-width:2px
class LLMRawOutput,OutputParser,FormatValidation,DataTransformation,StructuredResult,Application mainFlow
class TextString,ParsingAlgorithm,TypeChecking,ObjectConversion,PythonObject,BusinessLogic processFlow
class StrOutputParser,JsonOutputParser,PydanticOutputParser,StructuredOutputParser,DatetimeOutputParser,EnumOutputParser parserType
class JsonCleaning,MarkdownExtraction,PydanticValidation,RegexPatternMatching,DateTimeParsing,TypeConversion advancedFeature
class CombiningParser,SequentialParser,ConditionalParser,CachedParser,AsyncParser,BatchParser compositionLayer
class ParserCache,PerformanceMetrics,ErrorHandling,ValidationResults,AsyncProcessing,BatchProcessing performanceLayer
格式化输出核心组件关系图
查看大图:鼠标右键 → “在新标签页打开图片” → 浏览器自带放大
graph TD
subgraph "抽象基类层"
BaseOutputParser["BaseOutputParser<br/>抽象基类"]
end
subgraph "基础解析器"
StrOutputParser["StrOutputParser<br/>字符串解析器"]
CommaSeparatedListOutputParser["CommaSeparatedListOutputParser<br/>逗号分隔列表解析器"]
end
subgraph "JSON解析器家族"
JsonOutputParser["JsonOutputParser<br/>JSON解析器"]
JsonOutputParserAdvanced["JsonOutputParserAdvanced<br/>高级JSON解析器"]
PydanticOutputParser["PydanticOutputParser<br/>Pydantic解析器"]
PydanticOutputParserAdvanced["PydanticOutputParserAdvanced<br/>高级Pydantic解析器"]
end
subgraph "结构化解析器"
StructuredOutputParser["StructuredOutputParser<br/>结构化解析器"]
ResponseSchema["ResponseSchema<br/>响应模式"]
DatetimeOutputParser["DatetimeOutputParser<br/>日期时间解析器"]
DatetimeOutputParserAdvanced["DatetimeOutputParserAdvanced<br/>高级日期时间解析器"]
end
subgraph "组合解析器"
CombiningOutputParser["CombiningOutputParser<br/>组合解析器"]
SequentialOutputParser["SequentialOutputParser<br/>顺序解析器"]
ConditionalOutputParser["ConditionalOutputParser<br/>条件解析器"]
end
subgraph "缓存与优化"
OutputParserCache["OutputParserCache<br/>解析器缓存"]
CachedOutputParser["CachedOutputParser<br/>缓存解析器"]
AsyncOutputParser["AsyncOutputParser<br/>异步解析器"]
AsyncBatchOutputParser["AsyncBatchOutputParser<br/>异步批量解析器"]
end
subgraph "核心依赖"
BaseModel["BaseModel<br/>Pydantic模型"]
ValidationError["ValidationError<br/>验证错误"]
PromptValue["PromptValue<br/>提示词值"]
CallbackManagerForChainRun["CallbackManagerForChainRun<br/>回调管理器"]
end
subgraph "实际应用系统"
IntelligentDataAnalysisSystem["IntelligentDataAnalysisSystem<br/>智能数据分析系统"]
MultilingualContentParser["MultilingualContentParser<br/>多语言内容解析器"]
end
%% 继承关系
BaseOutputParser --> StrOutputParser
BaseOutputParser --> JsonOutputParser
BaseOutputParser --> StructuredOutputParser
BaseOutputParser --> DatetimeOutputParser
%% JSON解析器层次
JsonOutputParser --> JsonOutputParserAdvanced
JsonOutputParser --> PydanticOutputParser
PydanticOutputParser --> PydanticOutputParserAdvanced
%% 结构化解析器层次
StructuredOutputParser --> ResponseSchema
DatetimeOutputParser --> DatetimeOutputParserAdvanced
%% 组合解析器关系
CombiningOutputParser --> JsonOutputParser
CombiningOutputParser --> StrOutputParser
SequentialOutputParser --> JsonOutputParser
ConditionalOutputParser --> JsonOutputParser
%% 缓存优化关系
CachedOutputParser --> OutputParserCache
CachedOutputParser --> JsonOutputParser
AsyncBatchOutputParser --> AsyncOutputParser
AsyncBatchOutputParser --> JsonOutputParser
%% 核心依赖关系
PydanticOutputParser -.-> BaseModel
PydanticOutputParser -.-> ValidationError
JsonOutputParser -.-> PromptValue
BaseOutputParser -.-> CallbackManagerForChainRun
%% 实际应用关系
IntelligentDataAnalysisSystem --> JsonOutputParser
IntelligentDataAnalysisSystem --> StructuredOutputParser
MultilingualContentParser --> JsonOutputParser
MultilingualContentParser --> StructuredOutputParser
%% 样式定义
classDef abstractClass fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef basicParser fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef jsonParser fill:#fff3e0,stroke:#f57c00,stroke-width:2px
classDef structuredParser fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
classDef combiningParser fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef cacheOptimization fill:#e0f2f1,stroke:#00695c,stroke-width:2px
classDef coreDependency fill:#f1f8e9,stroke:#827717,stroke-width:2px
classDef realApplication fill:#fff8e1,stroke:#f57c00,stroke-width:2px
class BaseOutputParser abstractClass
class StrOutputParser,CommaSeparatedListOutputParser basicParser
class JsonOutputParser,JsonOutputParserAdvanced,PydanticOutputParser,PydanticOutputParserAdvanced jsonParser
class StructuredOutputParser,ResponseSchema,DatetimeOutputParser,DatetimeOutputParserAdvanced structuredParser
class CombiningOutputParser,SequentialOutputParser,ConditionalOutputParser combiningParser
class OutputParserCache,CachedOutputParser,AsyncOutputParser,AsyncBatchOutputParser cacheOptimization
class BaseModel,ValidationError,PromptValue,CallbackManagerForChainRun coreDependency
class IntelligentDataAnalysisSystem,MultilingualContentParser realApplication
核心抽象基类分析
1. BaseOutputParser - 输出解析器根接口
# 文件: libs/langchain-core/output_parsers/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, Union
from pydantic import BaseModel, Field
class BaseOutputParser(BaseModel, ABC):
"""所有输出解析器的抽象基类"""
@abstractmethod
def parse(self, text: str) -> Any:
"""解析文本输出 - 必须实现"""
pass
def parse_with_prompt(self, completion: str, prompt: PromptValue) -> Any:
"""结合提示词解析输出(可选实现)"""
return self.parse(completion)
def get_format_instructions(self) -> str:
"""获取格式说明(可选实现)"""
return ""
@property
def _type(self) -> str:
"""返回解析器类型"""
return "base"
def dict(self, **kwargs: Any) -> Dict[str, Any]:
"""序列化为字典"""
output_parser_dict = super().dict(**kwargs)
output_parser_dict["_type"] = self._type
return output_parser_dict
# 设计分析
base_design_analysis = '''
1. 最小接口设计:
- 只定义了最核心的parse()方法
- 其他方法都有默认实现
- 保持接口的简洁性和灵活性
2. 可扩展性:
- 支持自定义格式说明
- 支持结合提示词解析
- 易于添加新的解析器类型
3. 类型安全:
- 使用抽象基类确保子类实现
- 支持泛型类型注解
- 运行时类型检查
4. 序列化支持:
- 继承BaseModel获得Pydantic能力
- 支持JSON序列化
- 便于配置和存储
'''
2. 基础输出解析器实现
# 文件: libs/langchain-core/output_parsers/string.py
class StrOutputParser(BaseOutputParser):
"""最简单的字符串输出解析器"""
def parse(self, text: str) -> str:
"""直接返回文本,只去除首尾空白"""
return text.strip()
@property
def _type(self) -> str:
return "str"
# 文件: libs/langchain-core/output_parsers/list.py
class CommaSeparatedListOutputParser(BaseOutputParser):
"""逗号分隔列表输出解析器"""
def parse(self, text: str) -> List[str]:
"""解析逗号分隔的列表"""
# 按逗号分割并清理空白
items = [item.strip() for item in text.split(",")]
# 过滤空项
return [item for item in items if item]
def get_format_instructions(self) -> str:
"""返回格式说明"""
return "你的回答应该是一个逗号分隔的列表,例如:项目1, 项目2, 项目3"
@property
def _type(self) -> str:
return "comma_separated_list"
# 基础解析器特点
basic_parser_features = '''
1. 简单直接:
- StrOutputParser: 零处理成本
- List解析器: 简单字符串分割
- 适合基础格式化需求
2. 鲁棒性强:
- 空白字符处理
- 空值过滤
- 错误容忍度高
3. 性能优异:
- 最小化处理开销
- 无复杂计算
- 内存使用高效
4. 可预测性:
- 行为简单明确
- 易于测试验证
- 结果稳定可靠
'''
JSON输出解析器深度分析
1. JsonOutputParser完整实现
# 文件: libs/langchain-core/output_parsers/json.py
import json
import re
from typing import Any, Dict, List, Optional, Type
class JsonOutputParser(BaseOutputParser):
"""JSON格式输出解析器"""
def __init__(self, pydantic_object: Optional[Type[BaseModel]] = None):
"""初始化JSON解析器"""
self.pydantic_object = pydantic_object
def parse(self, text: str) -> Any:
"""解析JSON格式的文本输出"""
try:
# 清理和提取JSON
cleaned_text = self._clean_json_text(text)
# 解析JSON
json_data = json.loads(cleaned_text)
# 如果指定了Pydantic模型,进行验证
if self.pydantic_object:
return self.pydantic_object(**json_data)
return json_data
except json.JSONDecodeError as e:
raise ValueError(f"无效的JSON格式: {e}\n原始文本: {text}")
except Exception as e:
raise ValueError(f"JSON解析失败: {e}\n原始文本: {text}")
def _clean_json_text(self, text: str) -> str:
"""清理和提取JSON文本"""
# 移除Markdown代码块标记
text = re.sub(r'```json\s*', '', text, flags=re.IGNORECASE)
text = re.sub(r'\s*```', '', text)
# 查找JSON对象或数组
json_patterns = [
r'\{[^{}]*\}', # 简单对象
r'\[[^\[\]]*\]', # 简单数组
r'\{.*\}', # 复杂对象(非贪婪)
r'\[.*\]', # 复杂数组(非贪婪)
]
for pattern in json_patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(0)
# 如果没有找到,返回清理后的文本
return text.strip()
def get_format_instructions(self) -> str:
"""返回JSON格式说明"""
if self.pydantic_object:
# 如果有Pydantic模型,返回模型架构
schema = self.pydantic_object.schema()
return f"你的回答必须是有效的JSON格式,符合以下架构:\n{json.dumps(schema, indent=2, ensure_ascii=False)}"
else:
return "你的回答必须是有效的JSON格式"
@property
def _type(self) -> str:
return "json"
# JSON解析器高级功能
class JsonOutputParserAdvanced(JsonOutputParser):
"""高级JSON输出解析器"""
def __init__(self, pydantic_object: Optional[Type[BaseModel]] = None,
validation_mode: str = "strict",
repair_json: bool = True):
super().__init__(pydantic_object)
self.validation_mode = validation_mode
self.repair_json = repair_json
def parse(self, text: str) -> Any:
"""增强的JSON解析,支持修复和验证"""
try:
# 尝试直接解析
return super().parse(text)
except ValueError as e:
if self.repair_json:
# 尝试修复JSON
repaired_text = self._repair_json(text)
if repaired_text:
return super().parse(repaired_text)
# 如果修复失败,抛出原始错误
raise e
def _repair_json(self, text: str) -> Optional[str]:
"""尝试修复损坏的JSON"""
# 常见的JSON修复策略
# 1. 修复引号
text = re.sub(r'"([^"]*)"', r'"\1"', text) # 标准化引号
text = re.sub(r"'([^']*)'", r'"\1"', text) # 替换单引号为双引号
# 2. 修复缺失的引号
text = re.sub(r'(\w+):', r'"\1":', text) # 给键添加引号
# 3. 修复缺失的逗号
text = re.sub(r'}\s*{', r'}, {', text) # 对象之间添加逗号
text = re.sub(r']\s*\[', r'], [', text) # 数组之间添加逗号
# 4. 修复不完整的结构
if not text.strip().startswith(('{', '[')):
# 尝试提取JSON内容
json_match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)
if json_match:
text = json_match.group(1)
# 验证修复后的JSON
try:
json.loads(text)
return text
except json.JSONDecodeError:
return None
# JSON解析器使用示例
json_parser_examples = '''
# 基础JSON解析
json_parser = JsonOutputParser()
result = json_parser.parse('{"name": "John", "age": 30}')
print(result) # {'name': 'John', 'age': 30}
# 带Markdown代码块的JSON
markdown_json = '''
```json
{
"product": "LangChain",
"features": ["prompts", "chains", "agents"],
"rating": 4.8
}
```
'''
result = json_parser.parse(markdown_json)
print(result) # 解析成功
# 使用Pydantic模型
from pydantic import BaseModel
class Product(BaseModel):
name: str
features: List[str]
rating: float
pydantic_parser = JsonOutputParser(pydantic_object=Product)
result = pydantic_parser.parse('{"name": "LangChain", "features": ["RAG"], "rating": 4.9}')
print(result) # Product(name='LangChain', features=['RAG'], rating=4.9)
'''
2. Pydantic模型集成
# 文件: libs/langchain-core/output_parsers/pydantic.py
from pydantic import BaseModel, ValidationError
from typing import Type, Any
class PydanticOutputParser(JsonOutputParser):
"""Pydantic模型输出解析器"""
def __init__(self, pydantic_object: Type[BaseModel]):
"""初始化Pydantic解析器"""
super().__init__(pydantic_object=pydantic_object)
self.pydantic_object = pydantic_object
def parse(self, text: str) -> BaseModel:
"""解析并验证Pydantic模型"""
try:
# 先使用父类方法解析JSON
json_data = super().parse(text)
# 使用Pydantic模型验证
return self.pydantic_object(**json_data)
except ValidationError as e:
# 提供详细的验证错误信息
error_details = []
for error in e.errors():
field = " -> ".join(str(loc) for loc in error["loc"])
error_details.append(f"字段 '{field}': {error['msg']}")
raise ValueError(f"Pydantic验证失败:\n" + "\n".join(error_details))
except Exception as e:
raise ValueError(f"Pydantic输出解析失败: {e}")
def get_format_instructions(self) -> str:
"""返回Pydantic模型的格式说明"""
schema = self.pydantic_object.schema()
# 生成详细的格式说明
instructions = f"""
你的回答必须是有效的JSON格式,并且符合以下Pydantic模型架构:
模型名称: {self.pydantic_object.__name__}
字段说明:
"""
# 添加字段说明
if "properties" in schema:
for field_name, field_info in schema["properties"].items():
field_type = field_info.get("type", "unknown")
field_description = field_info.get("description", "")
is_required = field_name in schema.get("required", [])
instructions += f"\n{field_name} ({field_type})"
if field_description:
instructions += f": {field_description}"
if not is_required:
instructions += " (可选)"
instructions += "\n\n请确保所有必填字段都有值,并且数据类型正确。"
return instructions
@property
def _type(self) -> str:
return f"pydantic_{self.pydantic_object.__name__}"
# Pydantic解析器高级功能
class PydanticOutputParserAdvanced(PydanticOutputParser):
"""高级Pydantic输出解析器"""
def __init__(self, pydantic_object: Type[BaseModel],
validation_mode: str = "strict",
field_descriptions: Optional[Dict[str, str]] = None):
super().__init__(pydantic_object)
self.validation_mode = validation_mode
self.field_descriptions = field_descriptions or {}
def parse(self, text: str) -> BaseModel:
"""增强的Pydantic解析"""
try:
# 基础解析
result = super().parse(text)
# 根据验证模式进行额外验证
if self.validation_mode == "strict":
self._strict_validation(result)
elif self.validation_mode == "lenient":
self._lenient_validation(result)
return result
except Exception as e:
# 尝试修复和重新验证
if self.validation_mode == "repair":
return self._repair_and_validate(text)
raise e
def _strict_validation(self, result: BaseModel) -> None:
"""严格验证模式"""
# 检查所有必填字段
for field_name, field_info in self.pydantic_object.schema()["properties"].items():
if field_name in self.pydantic_object.schema().get("required", []):
value = getattr(result, field_name)
if value is None or (isinstance(value, str) and not value.strip()):
raise ValueError(f"必填字段 '{field_name}' 不能为空")
def _lenient_validation(self, result: BaseModel) -> None:
"""宽松验证模式"""
# 允许一些字段为空或格式不完全正确
# 主要验证数据类型和基本格式
pass
def _repair_and_validate(self, text: str) -> BaseModel:
"""修复并重新验证"""
# 尝试修复JSON数据
repaired_data = self._repair_json_data(text)
if repaired_data:
try:
return self.pydantic_object(**repaired_data)
except ValidationError:
pass
# 如果修复失败,尝试部分验证
return self._partial_validation(text)
def _repair_json_data(self, text: str) -> Optional[Dict[str, Any]]:
"""修复JSON数据"""
try:
json_data = json.loads(text)
# 获取模型架构
schema = self.pydantic_object.schema()
# 修复每个字段
repaired_data = {}
for field_name, field_info in schema["properties"].items():
if field_name in json_data:
value = json_data[field_name]
repaired_value = self._repair_field_value(field_name, value, field_info)
repaired_data[field_name] = repaired_value
return repaired_data
except Exception:
return None
def _repair_field_value(self, field_name: str, value: Any, field_info: Dict[str, Any]) -> Any:
"""修复字段值"""
field_type = field_info.get("type", "string")
if field_type == "string":
return str(value) if value is not None else ""
elif field_type == "integer":
try:
return int(value)
except (ValueError, TypeError):
return 0
elif field_type == "number":
try:
return float(value)
except (ValueError, TypeError):
return 0.0
elif field_type == "boolean":
return bool(value)
elif field_type == "array":
return list(value) if isinstance(value, (list, tuple)) else []
else:
return value
# Pydantic解析器使用示例
pydantic_parser_examples = '''
# 定义Pydantic模型
from pydantic import BaseModel, Field
from typing import List, Optional
class Product(BaseModel):
name: str = Field(description="产品名称")
features: List[str] = Field(description="产品特性")
price: float = Field(description="价格")
in_stock: bool = Field(description="是否有库存")
tags: Optional[List[str]] = Field(None, description="标签")
# 创建解析器
parser = PydanticOutputParser(pydantic_object=Product)
# 测试解析
json_text = '''
{
"name": "LangChain Pro",
"features": ["RAG", "Agents", "Memory"],
"price": 99.99,
"in_stock": true,
"tags": ["AI", "NLP"]
}
'''
result = parser.parse(json_text)
print(result) # Product对象,字段已验证
# 获取格式说明
instructions = parser.get_format_instructions()
print(instructions) # 详细的JSON格式要求
'''
高级输出解析器分析
1. 结构化输出解析器
# 文件: libs/langchain-core/output_parsers/structured.py
class StructuredOutputParser(BaseOutputParser):
"""结构化输出解析器"""
def __init__(self, response_schemas: List[ResponseSchema]):
"""初始化结构化解析器"""
self.response_schemas = response_schemas
def parse(self, text: str) -> Dict[str, Any]:
"""解析结构化输出"""
# 构建解析正则表达式
pattern = self._build_parsing_pattern()
# 使用正则表达式提取结构化数据
match = re.search(pattern, text, re.DOTALL)
if match:
result = {}
for schema in self.response_schemas:
field_name = schema.name
if field_name in match.groupdict():
field_value = match.group(field_name)
# 应用字段类型转换
if schema.type == "integer":
field_value = int(field_value)
elif schema.type == "number":
field_value = float(field_value)
elif schema.type == "boolean":
field_value = field_value.lower() in ("true", "1", "yes")
result[field_name] = field_value
return result
else:
raise ValueError(f"无法从文本中提取结构化数据。文本: {text}")
def _build_parsing_pattern(self) -> str:
"""构建解析正则表达式模式"""
patterns = []
for schema in self.response_schemas:
field_pattern = self._build_field_pattern(schema)
patterns.append(field_pattern)
# 组合所有字段模式
return r'\n'.join(patterns)
def _build_field_pattern(self, schema: ResponseSchema) -> str:
"""构建单个字段的正则表达式模式"""
field_name = schema.name
field_description = schema.description
# 基础模式:字段名和值
base_pattern = rf"{field_name}:\s*(?P<{field_name}>.*?)"
# 根据字段类型调整模式
if schema.type == "integer":
base_pattern = rf"{field_name}:\s*(?P<{field_name}>\d+)"
elif schema.type == "number":
base_pattern = rf"{field_name}:\s*(?P<{field_name}>\d+\.?\d*)"
elif schema.type == "boolean":
base_pattern = rf"{field_name}:\s*(?P<{field_name}>(true|false|yes|no|1|0))"
return base_pattern
def get_format_instructions(self) -> str:
"""返回格式说明"""
instructions = "你的回答必须严格遵循以下格式:\n\n"
for schema in self.response_schemas:
instructions += f"{schema.name}: {schema.description}\n"
return instructions
# ResponseSchema定义
class ResponseSchema(BaseModel):
"""响应模式定义"""
name: str = Field(description="字段名称")
description: str = Field(description="字段描述")
type: str = Field(default="string", description="字段类型")
class Config:
schema_extra = {
"example": {
"name": "answer",
"description": "问题的答案",
"type": "string"
}
}
# 结构化解析器使用示例
structured_parser_examples = '''
# 定义响应模式
response_schemas = [
ResponseSchema(name="answer", description="问题的答案"),
ResponseSchema(name="confidence", description="回答的置信度", type="number"),
ResponseSchema(name="sources", description="信息来源列表", type="array")
]
# 创建解析器
parser = StructuredOutputParser(response_schemas=response_schemas)
# 测试解析
structured_text = '''
answer: LangChain是一个用于构建LLM应用的框架
confidence: 0.95
sources: ["官方文档", "GitHub仓库", "社区教程"]
'''
result = parser.parse(structured_text)
print(result) # {'answer': 'LangChain是一个用于构建LLM应用的框架', 'confidence': 0.95, 'sources': ["官方文档", "GitHub仓库", "社区教程"]}
# 获取格式说明
instructions = parser.get_format_instructions()
print(instructions)
'''
2. 日期时间输出解析器
# 文件: libs/langchain-core/output_parsers/datetime.py
from datetime import datetime
import dateutil.parser as parser
class DatetimeOutputParser(BaseOutputParser):
"""日期时间输出解析器"""
def __init__(self, format: str = "%Y-%m-%d %H:%M:%S",
timezone: Optional[str] = None):
"""初始化日期时间解析器"""
self.format = format
self.timezone = timezone
def parse(self, text: str) -> datetime:
"""解析日期时间文本"""
try:
# 使用dateutil解析各种日期格式
dt = parser.parse(text)
# 处理时区
if self.timezone:
import pytz
tz = pytz.timezone(self.timezone)
if dt.tzinfo is None:
dt = tz.localize(dt)
else:
dt = dt.astimezone(tz)
return dt
except (ValueError, TypeError) as e:
raise ValueError(f"无法解析日期时间: '{text}'. 错误: {e}")
def get_format_instructions(self) -> str:
"""返回日期时间格式说明"""
if self.format:
return f"你的回答必须是有效的日期时间,格式为: {self.format}"
else:
return "你的回答必须是有效的日期时间,支持多种常见格式"
@property
def _type(self) -> str:
return "datetime"
# 高级日期时间解析器
class DatetimeOutputParserAdvanced(DatetimeOutputParser):
"""高级日期时间输出解析器"""
def __init__(self, format: str = "%Y-%m-%d %H:%M:%S",
timezone: Optional[str] = None,
relative_base: Optional[datetime] = None):
super().__init__(format, timezone)
self.relative_base = relative_base or datetime.now()
def parse(self, text: str) -> datetime:
"""增强的日期时间解析"""
try:
# 尝试直接解析
return super().parse(text)
except ValueError:
# 尝试解析相对时间
return self._parse_relative_time(text)
def _parse_relative_time(self, text: str) -> datetime:
"""解析相对时间表达"""
text_lower = text.lower().strip()
# 常见相对时间模式
relative_patterns = {
r'(\d+)\s*seconds?\s*ago': lambda m: self.relative_base - timedelta(seconds=int(m.group(1))),
r'(\d+)\s*minutes?\s*ago': lambda m: self.relative_base - timedelta(minutes=int(m.group(1))),
r'(\d+)\s*hours?\s*ago': lambda m: self.relative_base - timedelta(hours=int(m.group(1))),
r'(\d+)\s*days?\s*ago': lambda m: self.relative_base - timedelta(days=int(m.group(1))),
r'(\d+)\s*weeks?\s*ago': lambda m: self.relative_base - timedelta(weeks=int(m.group(1))),
r'in\s*(\d+)\s*seconds?': lambda m: self.relative_base + timedelta(seconds=int(m.group(1))),
r'in\s*(\d+)\s*minutes?': lambda m: self.relative_base + timedelta(minutes=int(m.group(1))),
r'in\s*(\d+)\s*hours?': lambda m: self.relative_base + timedelta(hours=int(m.group(1))),
r'in\s*(\d+)\s*days?': lambda m: self.relative_base + timedelta(days=int(m.group(1))),
r'in\s*(\d+)\s*weeks?': lambda m: self.relative_base + timedelta(weeks=int(m.group(1))),
}
import re
from datetime import timedelta
for pattern, handler in relative_patterns.items():
match = re.search(pattern, text_lower)
if match:
return handler(match)
# 尝试解析自然语言日期
return self._parse_natural_language_date(text)
def _parse_natural_language_date(self, text: str) -> datetime:
"""解析自然语言日期表达"""
text_lower = text.lower().strip()
# 常见自然语言模式
natural_patterns = {
'today': lambda: self.relative_base.replace(hour=0, minute=0, second=0, microsecond=0),
'tomorrow': lambda: (self.relative_base + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0),
'yesterday': lambda: (self.relative_base - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0),
'now': lambda: self.relative_base,
'next week': lambda: (self.relative_base + timedelta(weeks=1)).replace(hour=0, minute=0, second=0, microsecond=0),
'last week': lambda: (self.relative_base - timedelta(weeks=1)).replace(hour=0, minute=0, second=0, microsecond=0),
}
if text_lower in natural_patterns:
return natural_patterns[text_lower]()
# 如果都不匹配,回退到基础解析
return super().parse(text)
# 日期时间解析器使用示例
datetime_parser_examples = '''
# 基础日期时间解析
datetime_parser = DatetimeOutputParser()
result = datetime_parser.parse("2024-01-15 14:30:00")
print(result) # datetime(2024, 1, 15, 14, 30, 0)
# 相对时间解析
advanced_parser = DatetimeOutputParserAdvanced()
result = advanced_parser.parse("2 hours ago")
print(result) # 当前时间减去2小时
# 自然语言解析
result = advanced_parser.parse("tomorrow")
print(result) # 明天的日期,时间为00:00:00
# 复杂格式
result = advanced_parser.parse("in 3 days and 2 hours")
print(result) # 当前时间加上3天2小时
'''
输出解析器链和组合
1. 组合输出解析器
# 文件: libs/langchain-core/output_parsers/combining.py
class CombiningOutputParser(BaseOutputParser):
"""组合多个输出解析器"""
def __init__(self, parsers: List[BaseOutputParser]):
"""初始化组合解析器"""
self.parsers = parsers
# 验证输入变量不冲突
all_input_variables = []
for parser in parsers:
if hasattr(parser, 'input_variables'):
all_input_variables.extend(parser.input_variables)
# 检查重复变量
if len(all_input_variables) != len(set(all_input_variables)):
raise ValueError("组合解析器中的输入变量存在重复")
def parse(self, text: str) -> Dict[str, Any]:
"""使用所有解析器解析文本"""
results = {}
for i, parser in enumerate(self.parsers):
try:
# 尝试使用当前解析器
result = parser.parse(text)
results[f"parser_{i}"] = result
except Exception as e:
# 记录解析失败,但继续尝试其他解析器
results[f"parser_{i}_error"] = str(e)
return results
def get_format_instructions(self) -> str:
"""返回所有解析器的格式说明"""
instructions = []
for i, parser in enumerate(self.parsers):
parser_instructions = parser.get_format_instructions()
if parser_instructions:
instructions.append(f"解析器{i+1}: {parser_instructions}")
return "\n\n".join(instructions) if instructions else ""
# 顺序输出解析器
class SequentialOutputParser(BaseOutputParser):
"""顺序应用多个输出解析器"""
def __init__(self, parsers: List[BaseOutputParser], stop_on_error: bool = True):
"""初始化顺序解析器"""
self.parsers = parsers
self.stop_on_error = stop_on_error
def parse(self, text: str) -> List[Any]:
"""顺序应用每个解析器"""
results = []
for parser in self.parsers:
try:
result = parser.parse(text)
results.append(result)
except Exception as e:
if self.stop_on_error:
raise e
else:
# 记录错误但继续
results.append({"error": str(e)})
return results
# 条件输出解析器
class ConditionalOutputParser(BaseOutputParser):
"""条件输出解析器"""
def __init__(self, condition_func: Callable[[str], bool],
true_parser: BaseOutputParser,
false_parser: BaseOutputParser):
"""初始化条件解析器"""
self.condition_func = condition_func
self.true_parser = true_parser
self.false_parser = false_parser
def parse(self, text: str) -> Any:
"""根据条件选择解析器"""
if self.condition_func(text):
return self.true_parser.parse(text)
else:
return self.false_parser.parse(text)
def get_format_instructions(self) -> str:
"""返回条件格式说明"""
true_instructions = self.true_parser.get_format_instructions()
false_instructions = self.false_parser.get_format_instructions()
return f"如果条件满足: {true_instructions}\n否则: {false_instructions}"
# 组合解析器使用示例
combining_parser_examples = '''
# 组合多个解析器
parsers = [
JsonOutputParser(),
StrOutputParser(),
CommaSeparatedListOutputParser()
]
combining_parser = CombiningOutputParser(parsers=parsers)
# 测试组合解析
text = '{"name": "test", "value": 123}, item1, item2, item3'
result = combining_parser.parse(text)
print(result)
# {'parser_0': {'name': 'test', 'value': 123}, 'parser_1': 'item1, item2, item3', 'parser_2': ['item1', 'item2', 'item3']}
# 顺序解析器
sequential_parser = SequentialOutputParser(parsers=[
StrOutputParser(),
JsonOutputParser()
])
result = sequential_parser.parse('{"test": "value"}')
print(result) # ['{"test": "value"}', {'test': 'value'}]
# 条件解析器
def is_json(text: str) -> bool:
return text.strip().startswith('{')
conditional_parser = ConditionalOutputParser(
condition_func=is_json,
true_parser=JsonOutputParser(),
false_parser=StrOutputParser()
)
result1 = conditional_parser.parse('{"key": "value"}') # 使用JSON解析器
result2 = conditional_parser.parse('plain text') # 使用字符串解析器
'''
输出格式化系统性能优化
1. 缓存和重用优化
# 文件: libs/langchain-core/output_parsers/cache.py
class OutputParserCache:
"""输出解析器缓存系统"""
def __init__(self, max_size: int = 1000, ttl: int = 3600):
self.max_size = max_size
self.ttl = ttl
self.cache: Dict[str, tuple[Any, float]] = {} # (result, timestamp)
self.access_count: Dict[str, int] = {}
def cache_result(self, key: str, result: Any) -> None:
"""缓存解析结果"""
if len(self.cache) >= self.max_size:
self._evict_lru()
import time
self.cache[key] = (result, time.time())
self.access_count[key] = 0
def get_cached_result(self, key: str) -> Optional[Any]:
"""获取缓存的解析结果"""
if key in self.cache:
result, timestamp = self.cache[key]
# 检查是否过期
import time
if time.time() - timestamp > self.ttl:
del self.cache[key]
del self.access_count[key]
return None
self.access_count[key] += 1
return result
return None
def _evict_lru(self) -> None:
"""淘汰最近最少使用的缓存"""
if not self.access_count:
return
# 找到最少使用的项
lru_key = min(self.access_count.keys(), key=lambda k: self.access_count[k])
# 移除该项
del self.cache[lru_key]
del self.access_count[lru_key]
# 带缓存的输出解析器
class CachedOutputParser(BaseOutputParser):
"""带缓存功能的输出解析器装饰器"""
def __init__(self, base_parser: BaseOutputParser, cache: Optional[OutputParserCache] = None):
self.base_parser = base_parser
self.cache = cache or OutputParserCache()
def parse(self, text: str) -> Any:
"""带缓存的解析"""
# 生成缓存键
cache_key = f"{self.base_parser._type}_{hash(text)}"
# 检查缓存
cached_result = self.cache.get_cached_result(cache_key)
if cached_result is not None:
return cached_result
# 执行解析
result = self.base_parser.parse(text)
# 缓存结果
self.cache.cache_result(cache_key, result)
return result
def get_format_instructions(self) -> str:
"""返回基础解析器的格式说明"""
return self.base_parser.get_format_instructions()
@property
def _type(self) -> str:
return f"cached_{self.base_parser._type}"
2. 异步解析优化
# 文件: libs/langchain-core/output_parsers/async_utils.py
import asyncio
from typing import AsyncIterator, List, Any
class AsyncOutputParser(BaseOutputParser):
"""异步输出解析器基类"""
async def aparse(self, text: str) -> Any:
"""异步解析文本"""
# 默认实现:在executor中运行同步解析
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.parse, text)
async def astream_parse(self, text: str) -> AsyncIterator[Any]:
"""异步流式解析"""
# 基础实现:一次性解析后yield
result = await self.aparse(text)
yield result
class AsyncBatchOutputParser:
"""异步批量输出解析器"""
def __init__(self, base_parser: BaseOutputParser, max_concurrent: int = 10):
self.base_parser = base_parser
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def parse_batch_async(self, texts: List[str]) -> List[Any]:
"""异步批量解析"""
tasks = []
for text in texts:
task = self._parse_single_async(text)
tasks.append(task)
# 并发执行所有任务
results = await asyncio.gather(*tasks)
return results
async def _parse_single_async(self, text: str) -> Any:
"""异步解析单个文本"""
async with self.semaphore:
if isinstance(self.base_parser, AsyncOutputParser):
return await self.base_parser.aparse(text)
else:
# 对于非异步解析器,在executor中运行
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.base_parser.parse, text)
async def stream_parse_batch_async(self, texts: List[str]) -> AsyncIterator[Any]:
"""异步流式批量解析"""
for text in texts:
if isinstance(self.base_parser, AsyncOutputParser):
async for result in self.base_parser.astream_parse(text):
yield result
else:
result = await self._parse_single_async(text)
yield result
# 异步使用示例
async_usage_example = '''
# 创建异步解析器
class AsyncJsonOutputParser(JsonOutputParser, AsyncOutputParser):
"""异步JSON输出解析器"""
async def aparse(self, text: str) -> Any:
"""异步解析JSON"""
# 模拟异步操作
await asyncio.sleep(0.01)
return super().parse(text)
# 使用异步批量解析
async def batch_parse_async():
parser = AsyncJsonOutputParser()
batch_parser = AsyncBatchOutputParser(parser, max_concurrent=5)
texts = [
'{"name": "test1", "value": 1}',
'{"name": "test2", "value": 2}',
'{"name": "test3", "value": 3}'
]
results = await batch_parser.parse_batch_async(texts)
for i, result in enumerate(results):
print(f"结果{i+1}: {result}")
# 运行异步测试
# asyncio.run(batch_parse_async())
'''
实际应用案例分析
1. 智能数据分析系统
class IntelligentDataAnalysisSystem:
"""智能数据分析系统"""
def __init__(self):
self.setup_parsers()
def setup_parsers(self):
"""设置各种输出解析器"""
# 统计结果解析器
self.stats_parser = JsonOutputParser()
# 图表数据解析器
self.chart_data_parser = StructuredOutputParser(
response_schemas=[
ResponseSchema(name="chart_type", description="图表类型", type="string"),
ResponseSchema(name="data", description="图表数据", type="array"),
ResponseSchema(name="labels", description="数据标签", type="array"),
ResponseSchema(name="title", description="图表标题", type="string")
]
)
# 洞察提取解析器
self.insight_parser = JsonOutputParser()
# 报告生成解析器
self.report_parser = StructuredOutputParser(
response_schemas=[
ResponseSchema(name="executive_summary", description="执行摘要", type="string"),
ResponseSchema(name="key_findings", description="关键发现", type="array"),
ResponseSchema(name="recommendations", description="建议", type="array"),
ResponseSchema(name="technical_details", description="技术细节", type="string")
]
)
# 时间序列解析器
self.time_series_parser = DatetimeOutputParserAdvanced(
format="%Y-%m-%d %H:%M:%S",
timezone="UTC"
)
def parse_statistical_results(self, raw_output: str) -> dict:
"""解析统计分析结果"""
try:
result = self.stats_parser.parse(raw_output)
return {
"type": "statistics",
"data": result,
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_chart_data(self, raw_output: str) -> dict:
"""解析图表数据"""
try:
result = self.chart_data_parser.parse(raw_output)
return {
"type": "chart_data",
"chart_type": result["chart_type"],
"data": result["data"],
"labels": result["labels"],
"title": result["title"],
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_insights(self, raw_output: str) -> dict:
"""解析数据洞察"""
try:
result = self.insight_parser.parse(raw_output)
return {
"type": "insights",
"insights": result.get("insights", []),
"confidence": result.get("confidence", 0.0),
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_time_series_data(self, raw_output: str) -> dict:
"""解析时间序列数据"""
try:
# 假设输出格式为: "时间: 2024-01-15 10:30:00, 数值: 123.45"
lines = raw_output.strip().split('\n')
time_series_data = []
for line in lines:
if '时间:' in line and '数值:' in line:
# 提取时间和数值
time_match = re.search(r'时间:\s*([^\n,]+)', line)
value_match = re.search(r'数值:\s*([^\n,]+)', line)
if time_match and value_match:
time_str = time_match.group(1).strip()
value_str = value_match.group(1).strip()
# 解析时间
try:
timestamp = self.time_series_parser.parse(time_str)
value = float(value_str)
time_series_data.append({
"timestamp": timestamp.isoformat(),
"value": value
})
except Exception as e:
print(f"解析时间序列数据失败: {e}")
continue
return {
"type": "time_series",
"data": time_series_data,
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
# 使用示例
analysis_system = IntelligentDataAnalysisSystem()
# 测试各种解析功能
test_outputs = {
"statistics": '{"mean": 25.5, "std": 5.2, "count": 1000}',
"chart_data": "chart_type: bar\ndata: [10, 20, 30, 40, 50]\nlabels: ['A', 'B', 'C', 'D', 'E']\ntitle: 销售数据分析",
"insights": '{"insights": ["销售呈上升趋势", "季节性波动明显"], "confidence": 0.85}',
"time_series": "时间: 2024-01-15 10:30:00, 数值: 123.45\n时间: 2024-01-15 11:30:00, 数值: 145.67"
}
for output_type, raw_output in test_outputs.items():
if output_type == "statistics":
result = analysis_system.parse_statistical_results(raw_output)
elif output_type == "chart_data":
result = analysis_system.parse_chart_data(raw_output)
elif output_type == "insights":
result = analysis_system.parse_insights(raw_output)
elif output_type == "time_series":
result = analysis_system.parse_time_series_data(raw_output)
print(f"\n{output_type.upper()} 解析结果:")
print(json.dumps(result, indent=2, ensure_ascii=False, default=str))
2. 多语言内容解析系统
class MultilingualContentParser:
"""多语言内容解析系统"""
def __init__(self):
self.setup_multilingual_parsers()
def setup_multilingual_parsers(self):
"""设置多语言解析器"""
# 语言检测解析器
self.language_detection_parser = JsonOutputParser()
# 实体提取解析器
self.entity_extraction_parser = StructuredOutputParser(
response_schemas=[
ResponseSchema(name="entities", description="提取的实体列表", type="array"),
ResponseSchema(name="entity_types", description="实体类型", type="array"),
ResponseSchema(name="confidence_scores", description="置信度分数", type="array")
]
)
# 情感分析解析器
self.sentiment_analysis_parser = JsonOutputParser()
# 关键词提取解析器
self.keyword_extraction_parser = CommaSeparatedListOutputParser()
# 摘要生成解析器
self.summary_parser = JsonOutputParser()
def parse_language_detection(self, raw_output: str) -> dict:
"""解析语言检测结果"""
try:
result = self.language_detection_parser.parse(raw_output)
return {
"type": "language_detection",
"detected_languages": result.get("languages", []),
"confidence_scores": result.get("confidence", {}),
"primary_language": result.get("primary_language", "unknown"),
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_entity_extraction(self, raw_output: str) -> dict:
"""解析实体提取结果"""
try:
result = self.entity_extraction_parser.parse(raw_output)
return {
"type": "entity_extraction",
"entities": result["entities"],
"entity_types": result["entity_types"],
"confidence_scores": result["confidence_scores"],
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_sentiment_analysis(self, raw_output: str) -> dict:
"""解析情感分析结果"""
try:
result = self.sentiment_analysis_parser.parse(raw_output)
return {
"type": "sentiment_analysis",
"overall_sentiment": result.get("overall_sentiment", "neutral"),
"sentiment_scores": result.get("sentiment_scores", {}),
"language_specific": result.get("language_specific", {}),
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_keywords(self, raw_output: str) -> dict:
"""解析关键词提取结果"""
try:
keywords = self.keyword_extraction_parser.parse(raw_output)
return {
"type": "keyword_extraction",
"keywords": keywords,
"keyword_count": len(keywords),
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
def parse_summary(self, raw_output: str) -> dict:
"""解析摘要生成结果"""
try:
result = self.summary_parser.parse(raw_output)
return {
"type": "summary",
"summary": result.get("summary", ""),
"key_points": result.get("key_points", []),
"length": result.get("length", 0),
"language": result.get("language", "unknown"),
"parsed_at": datetime.now().isoformat()
}
except Exception as e:
return {
"type": "error",
"error": str(e),
"raw_output": raw_output
}
# 使用示例
multilingual_parser = MultilingualContentParser()
# 测试多语言解析
multilingual_outputs = {
"language_detection": '{"languages": ["zh", "en"], "confidence": {"zh": 0.8, "en": 0.6}, "primary_language": "zh"}',
"entity_extraction": "entities: [\"北京\", \"上海\", \"深圳\"]\nentity_types: [\"城市\", \"城市\", \"城市\"]\nconfidence_scores: [0.95, 0.92, 0.88]",
"sentiment_analysis": '{"overall_sentiment": "positive", "sentiment_scores": {"positive": 0.7, "neutral": 0.2, "negative": 0.1}, "language_specific": {"zh": "positive", "en": "neutral"}}',
"keywords": "人工智能, 机器学习, 深度学习, 自然语言处理",
"summary": '{"summary": "这是一篇关于人工智能发展的文章", "key_points": ["AI技术快速发展", "应用场景不断扩大", "面临挑战和机遇"], "length": 150, "language": "zh"}'
}
for output_type, raw_output in multilingual_outputs.items():
if output_type == "language_detection":
result = multilingual_parser.parse_language_detection(raw_output)
elif output_type == "entity_extraction":
result = multilingual_parser.parse_entity_extraction(raw_output)
elif output_type == "sentiment_analysis":
result = multilingual_parser.parse_sentiment_analysis(raw_output)
elif output_type == "keywords":
result = multilingual_parser.parse_keywords(raw_output)
elif output_type == "summary":
result = multilingual_parser.parse_summary(raw_output)
print(f"\n{output_type.upper()} 多语言解析结果:")
print(json.dumps(result, indent=2, ensure_ascii=False, default=str))
输出格式化系统最佳实践
1. 设计原则
输出格式化设计原则 = '''
1. 健壮性原则:
- 容错处理各种格式错误
- 提供详细的错误信息
- 支持降级和回退机制
2. 灵活性原则:
- 支持多种输出格式
- 可配置的解析选项
- 易于扩展和定制
3. 性能原则:
- 最小化解析开销
- 支持缓存和重用
- 异步处理支持
4. 类型安全原则:
- 明确的类型注解
- 运行时类型检查
- 编译时类型验证
5. 可维护性原则:
- 模块化设计
- 清晰的错误处理
- 完善的文档和测试
'''
2. 性能优化技巧
输出格式化性能优化 = '''
1. 解析优化:
- 使用高效的解析算法
- 避免重复解析
- 支持增量解析
2. 缓存策略:
- 缓存解析结果
- 缓存格式说明
- 使用智能缓存淘汰
3. 异步处理:
- 支持异步解析
- 批量异步处理
- 流式解析支持
4. 内存优化:
- 及时释放大对象
- 使用生成器
- 避免内存泄漏
5. 错误处理优化:
- 快速失败机制
- 错误恢复策略
- 详细的错误日志
'''
小结
通过本章的深入学习,我们全面掌握了LangChain格式化输出系统的源码实现:
核心架构理解
- 抽象层次设计:BaseOutputParser → 具体解析器实现的分层架构
- 解析器类型体系:从简单的字符串解析到复杂的结构化数据解析
- 格式支持能力:JSON、结构化、日期时间、枚举等多种输出格式
源码实现细节
- 基础解析器实现:StrOutputParser、List解析器等基础功能的完整实现
- JSON解析器深度分析:清理、提取、验证、修复的完整处理流程
- Pydantic集成:类型安全、模型验证、错误处理的高级功能实现
高级功能掌握
- 结构化解析:正则表达式模式匹配、字段类型转换、灵活格式支持
- 日期时间解析:多种格式支持、相对时间、自然语言处理
- 组合和条件解析:多解析器协作、条件选择、链式处理
实际应用能力
- 系统设计:能够设计复杂的输出格式化系统
- 性能优化:具备解析器性能调优和问题解决能力
- 扩展开发:能够基于源码进行功能扩展和架构改进
实际项目经验
- 智能数据分析系统:完整的结构化数据解析和处理流程
- 多语言内容解析:支持复杂多语言场景的解析系统
- 性能优化实践:缓存、异步、批量处理的高级优化技巧
格式化输出系统是AI应用的"结果翻译器",深入理解其源码让我们能够将LLM的原始文本输出转化为结构化的、程序可处理的数据,为构建智能化的数据处理流水线提供强大支撑! 🎯✨
在下一章中,我们将深入分析LangChain模型链(Chains)的源码实现,探索如何构建复杂的工作流和任务处理系统!
更多推荐
所有评论(0)