04. Langchain格式化输出的源码分析

格式化输出架构概览

格式化输出系统是LangChain的结果处理器,它负责将LLM的原始文本输出转换为结构化的、易于程序处理的数据格式。让我们先理解其整体架构:

# 格式化输出架构图
output_formatting_architecture = '''
LLM原始输出 → 输出解析器 → 格式验证 → 数据转换 → 结构化结果 → 应用程序
     ↓            ↓           ↓          ↓           ↓           ↓
  文本字符串   解析算法   类型检查   对象转换   Python对象   业务逻辑
'''

# 核心组件关系
component_relationships = '''
BaseOutputParser (抽象基类)
        ↑
StrOutputParser ←→ JsonOutputParser ←→ PydanticOutputParser
        ↑               ↑                    ↑
   具体实现类        具体实现类          具体实现类
        ↑               ↑                    ↑
PandasOutputParser ←→ DatetimeOutputParser ←→ EnumOutputParser

格式化输出架构图与核心组件关系图

格式化输出架构图

查看大图:鼠标右键 → “在新标签页打开图片” → 浏览器自带放大

格式化输出架构图

graph TD
    subgraph "格式化输出整体架构"
        LLMRawOutput["LLM原始输出<br/>Raw Text"]
        OutputParser["输出解析器<br/>Output Parser"]
        FormatValidation["格式验证<br/>Format Validation"]
        DataTransformation["数据转换<br/>Data Transformation"]
        StructuredResult["结构化结果<br/>Structured Result"]
        Application["应用程序<br/>Application"]
    end
    
    subgraph "处理流程层"
        TextString["文本字符串"]
        ParsingAlgorithm["解析算法"]
        TypeChecking["类型检查"]
        ObjectConversion["对象转换"]
        PythonObject["Python对象"]
        BusinessLogic["业务逻辑"]
    end
    
    subgraph "解析器类型层"
        StrOutputParser["StrOutputParser<br/>字符串解析器"]
        JsonOutputParser["JsonOutputParser<br/>JSON解析器"]
        PydanticOutputParser["PydanticOutputParser<br/>Pydantic解析器"]
        StructuredOutputParser["StructuredOutputParser<br/>结构化解析器"]
        DatetimeOutputParser["DatetimeOutputParser<br/>日期时间解析器"]
        EnumOutputParser["EnumOutputParser<br/>枚举解析器"]
    end
    
    subgraph "高级功能层"
        JsonCleaning["JSON清理"]
        MarkdownExtraction["Markdown提取"]
        PydanticValidation["Pydantic验证"]
        RegexPatternMatching["正则表达式匹配"]
        DateTimeParsing["日期时间解析"]
        TypeConversion["类型转换"]
    end
    
    subgraph "组合与优化层"
        CombiningParser["CombiningParser<br/>组合解析器"]
        SequentialParser["SequentialParser<br/>顺序解析器"]
        ConditionalParser["ConditionalParser<br/>条件解析器"]
        CachedParser["CachedParser<br/>缓存解析器"]
        AsyncParser["AsyncParser<br/>异步解析器"]
        BatchParser["BatchParser<br/>批量解析器"]
    end
    
    subgraph "性能与监控层"
        ParserCache["解析器缓存"]
        PerformanceMetrics["性能指标"]
        ErrorHandling["错误处理"]
        ValidationResults["验证结果"]
        AsyncProcessing["异步处理"]
        BatchProcessing["批量处理"]
    end
    
    %% 主流程
    LLMRawOutput --> OutputParser
    OutputParser --> FormatValidation
    FormatValidation --> DataTransformation
    DataTransformation --> StructuredResult
    StructuredResult --> Application
    
    %% 处理流程反馈
    TextString --> ParsingAlgorithm
    ParsingAlgorithm --> TypeChecking
    TypeChecking --> ObjectConversion
    ObjectConversion --> PythonObject
    PythonObject --> BusinessLogic
    
    %% 解析器类型选择
    OutputParser --> StrOutputParser
    OutputParser --> JsonOutputParser
    OutputParser --> PydanticOutputParser
    OutputParser --> StructuredOutputParser
    OutputParser --> DatetimeOutputParser
    OutputParser --> EnumOutputParser
    
    %% 高级功能处理
    JsonOutputParser --> JsonCleaning
    JsonOutputParser --> MarkdownExtraction
    PydanticOutputParser --> PydanticValidation
    StructuredOutputParser --> RegexPatternMatching
    DatetimeOutputParser --> DateTimeParsing
    EnumOutputParser --> TypeConversion
    
    %% 组合优化处理
    OutputParser --> CombiningParser
    OutputParser --> SequentialParser
    OutputParser --> ConditionalParser
    OutputParser --> CachedParser
    OutputParser --> AsyncParser
    OutputParser --> BatchParser
    
    %% 性能监控连接
    OutputParser --> ParserCache
    FormatValidation --> PerformanceMetrics
    DataTransformation --> ErrorHandling
    StructuredResult --> ValidationResults
    OutputParser --> AsyncProcessing
    OutputParser --> BatchProcessing
    
    %% 双向关系
    ParserCache -.-> OutputParser
    PerformanceMetrics -.-> FormatValidation
    ErrorHandling -.-> DataTransformation
    ValidationResults -.-> StructuredResult
    AsyncProcessing -.-> OutputParser
    BatchProcessing -.-> OutputParser
    
    %% 样式定义
    classDef mainFlow fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    classDef processFlow fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
    classDef parserType fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    classDef advancedFeature fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
    classDef compositionLayer fill:#fce4ec,stroke:#c2185b,stroke-width:2px
    classDef performanceLayer fill:#e0f2f1,stroke:#00695c,stroke-width:2px
    
    class LLMRawOutput,OutputParser,FormatValidation,DataTransformation,StructuredResult,Application mainFlow
    class TextString,ParsingAlgorithm,TypeChecking,ObjectConversion,PythonObject,BusinessLogic processFlow
    class StrOutputParser,JsonOutputParser,PydanticOutputParser,StructuredOutputParser,DatetimeOutputParser,EnumOutputParser parserType
    class JsonCleaning,MarkdownExtraction,PydanticValidation,RegexPatternMatching,DateTimeParsing,TypeConversion advancedFeature
    class CombiningParser,SequentialParser,ConditionalParser,CachedParser,AsyncParser,BatchParser compositionLayer
    class ParserCache,PerformanceMetrics,ErrorHandling,ValidationResults,AsyncProcessing,BatchProcessing performanceLayer

格式化输出核心组件关系图

查看大图:鼠标右键 → “在新标签页打开图片” → 浏览器自带放大

格式化输出核心组件关系图

graph TD
    subgraph "抽象基类层"
        BaseOutputParser["BaseOutputParser<br/>抽象基类"]
    end
    
    subgraph "基础解析器"
        StrOutputParser["StrOutputParser<br/>字符串解析器"]
        CommaSeparatedListOutputParser["CommaSeparatedListOutputParser<br/>逗号分隔列表解析器"]
    end
    
    subgraph "JSON解析器家族"
        JsonOutputParser["JsonOutputParser<br/>JSON解析器"]
        JsonOutputParserAdvanced["JsonOutputParserAdvanced<br/>高级JSON解析器"]
        PydanticOutputParser["PydanticOutputParser<br/>Pydantic解析器"]
        PydanticOutputParserAdvanced["PydanticOutputParserAdvanced<br/>高级Pydantic解析器"]
    end
    
    subgraph "结构化解析器"
        StructuredOutputParser["StructuredOutputParser<br/>结构化解析器"]
        ResponseSchema["ResponseSchema<br/>响应模式"]
        DatetimeOutputParser["DatetimeOutputParser<br/>日期时间解析器"]
        DatetimeOutputParserAdvanced["DatetimeOutputParserAdvanced<br/>高级日期时间解析器"]
    end
    
    subgraph "组合解析器"
        CombiningOutputParser["CombiningOutputParser<br/>组合解析器"]
        SequentialOutputParser["SequentialOutputParser<br/>顺序解析器"]
        ConditionalOutputParser["ConditionalOutputParser<br/>条件解析器"]
    end
    
    subgraph "缓存与优化"
        OutputParserCache["OutputParserCache<br/>解析器缓存"]
        CachedOutputParser["CachedOutputParser<br/>缓存解析器"]
        AsyncOutputParser["AsyncOutputParser<br/>异步解析器"]
        AsyncBatchOutputParser["AsyncBatchOutputParser<br/>异步批量解析器"]
    end
    
    subgraph "核心依赖"
        BaseModel["BaseModel<br/>Pydantic模型"]
        ValidationError["ValidationError<br/>验证错误"]
        PromptValue["PromptValue<br/>提示词值"]
        CallbackManagerForChainRun["CallbackManagerForChainRun<br/>回调管理器"]
    end
    
    subgraph "实际应用系统"
        IntelligentDataAnalysisSystem["IntelligentDataAnalysisSystem<br/>智能数据分析系统"]
        MultilingualContentParser["MultilingualContentParser<br/>多语言内容解析器"]
    end
    
    %% 继承关系
    BaseOutputParser --> StrOutputParser
    BaseOutputParser --> JsonOutputParser
    BaseOutputParser --> StructuredOutputParser
    BaseOutputParser --> DatetimeOutputParser
    
    %% JSON解析器层次
    JsonOutputParser --> JsonOutputParserAdvanced
    JsonOutputParser --> PydanticOutputParser
    PydanticOutputParser --> PydanticOutputParserAdvanced
    
    %% 结构化解析器层次
    StructuredOutputParser --> ResponseSchema
    DatetimeOutputParser --> DatetimeOutputParserAdvanced
    
    %% 组合解析器关系
    CombiningOutputParser --> JsonOutputParser
    CombiningOutputParser --> StrOutputParser
    SequentialOutputParser --> JsonOutputParser
    ConditionalOutputParser --> JsonOutputParser
    
    %% 缓存优化关系
    CachedOutputParser --> OutputParserCache
    CachedOutputParser --> JsonOutputParser
    AsyncBatchOutputParser --> AsyncOutputParser
    AsyncBatchOutputParser --> JsonOutputParser
    
    %% 核心依赖关系
    PydanticOutputParser -.-> BaseModel
    PydanticOutputParser -.-> ValidationError
    JsonOutputParser -.-> PromptValue
    BaseOutputParser -.-> CallbackManagerForChainRun
    
    %% 实际应用关系
    IntelligentDataAnalysisSystem --> JsonOutputParser
    IntelligentDataAnalysisSystem --> StructuredOutputParser
    MultilingualContentParser --> JsonOutputParser
    MultilingualContentParser --> StructuredOutputParser
    
    %% 样式定义
    classDef abstractClass fill:#e1f5fe,stroke:#01579b,stroke-width:2px
    classDef basicParser fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
    classDef jsonParser fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    classDef structuredParser fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
    classDef combiningParser fill:#fce4ec,stroke:#c2185b,stroke-width:2px
    classDef cacheOptimization fill:#e0f2f1,stroke:#00695c,stroke-width:2px
    classDef coreDependency fill:#f1f8e9,stroke:#827717,stroke-width:2px
    classDef realApplication fill:#fff8e1,stroke:#f57c00,stroke-width:2px
    
    class BaseOutputParser abstractClass
    class StrOutputParser,CommaSeparatedListOutputParser basicParser
    class JsonOutputParser,JsonOutputParserAdvanced,PydanticOutputParser,PydanticOutputParserAdvanced jsonParser
    class StructuredOutputParser,ResponseSchema,DatetimeOutputParser,DatetimeOutputParserAdvanced structuredParser
    class CombiningOutputParser,SequentialOutputParser,ConditionalOutputParser combiningParser
    class OutputParserCache,CachedOutputParser,AsyncOutputParser,AsyncBatchOutputParser cacheOptimization
    class BaseModel,ValidationError,PromptValue,CallbackManagerForChainRun coreDependency
    class IntelligentDataAnalysisSystem,MultilingualContentParser realApplication

核心抽象基类分析

1. BaseOutputParser - 输出解析器根接口

# 文件: libs/langchain-core/output_parsers/base.py

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, Union
from pydantic import BaseModel, Field

class BaseOutputParser(BaseModel, ABC):
    """所有输出解析器的抽象基类"""
    
    @abstractmethod
    def parse(self, text: str) -> Any:
        """解析文本输出 - 必须实现"""
        pass
    
    def parse_with_prompt(self, completion: str, prompt: PromptValue) -> Any:
        """结合提示词解析输出(可选实现)"""
        return self.parse(completion)
    
    def get_format_instructions(self) -> str:
        """获取格式说明(可选实现)"""
        return ""
    
    @property
    def _type(self) -> str:
        """返回解析器类型"""
        return "base"
    
    def dict(self, **kwargs: Any) -> Dict[str, Any]:
        """序列化为字典"""
        output_parser_dict = super().dict(**kwargs)
        output_parser_dict["_type"] = self._type
        return output_parser_dict

# 设计分析
base_design_analysis = '''
1. 最小接口设计:
   - 只定义了最核心的parse()方法
   - 其他方法都有默认实现
   - 保持接口的简洁性和灵活性

2. 可扩展性:
   - 支持自定义格式说明
   - 支持结合提示词解析
   - 易于添加新的解析器类型

3. 类型安全:
   - 使用抽象基类确保子类实现
   - 支持泛型类型注解
   - 运行时类型检查

4. 序列化支持:
   - 继承BaseModel获得Pydantic能力
   - 支持JSON序列化
   - 便于配置和存储
'''

2. 基础输出解析器实现

# 文件: libs/langchain-core/output_parsers/string.py

class StrOutputParser(BaseOutputParser):
    """最简单的字符串输出解析器"""
    
    def parse(self, text: str) -> str:
        """直接返回文本,只去除首尾空白"""
        return text.strip()
    
    @property
    def _type(self) -> str:
        return "str"

# 文件: libs/langchain-core/output_parsers/list.py

class CommaSeparatedListOutputParser(BaseOutputParser):
    """逗号分隔列表输出解析器"""
    
    def parse(self, text: str) -> List[str]:
        """解析逗号分隔的列表"""
        # 按逗号分割并清理空白
        items = [item.strip() for item in text.split(",")]
        
        # 过滤空项
        return [item for item in items if item]
    
    def get_format_instructions(self) -> str:
        """返回格式说明"""
        return "你的回答应该是一个逗号分隔的列表,例如:项目1, 项目2, 项目3"
    
    @property
    def _type(self) -> str:
        return "comma_separated_list"

# 基础解析器特点
basic_parser_features = '''
1. 简单直接:
   - StrOutputParser: 零处理成本
   - List解析器: 简单字符串分割
   - 适合基础格式化需求

2. 鲁棒性强:
   - 空白字符处理
   - 空值过滤
   - 错误容忍度高

3. 性能优异:
   - 最小化处理开销
   - 无复杂计算
   - 内存使用高效

4. 可预测性:
   - 行为简单明确
   - 易于测试验证
   - 结果稳定可靠
'''

JSON输出解析器深度分析

1. JsonOutputParser完整实现

# 文件: libs/langchain-core/output_parsers/json.py

import json
import re
from typing import Any, Dict, List, Optional, Type

class JsonOutputParser(BaseOutputParser):
    """JSON格式输出解析器"""
    
    def __init__(self, pydantic_object: Optional[Type[BaseModel]] = None):
        """初始化JSON解析器"""
        self.pydantic_object = pydantic_object
    
    def parse(self, text: str) -> Any:
        """解析JSON格式的文本输出"""
        try:
            # 清理和提取JSON
            cleaned_text = self._clean_json_text(text)
            
            # 解析JSON
            json_data = json.loads(cleaned_text)
            
            # 如果指定了Pydantic模型,进行验证
            if self.pydantic_object:
                return self.pydantic_object(**json_data)
            
            return json_data
            
        except json.JSONDecodeError as e:
            raise ValueError(f"无效的JSON格式: {e}\n原始文本: {text}")
        except Exception as e:
            raise ValueError(f"JSON解析失败: {e}\n原始文本: {text}")
    
    def _clean_json_text(self, text: str) -> str:
        """清理和提取JSON文本"""
        # 移除Markdown代码块标记
        text = re.sub(r'```json\s*', '', text, flags=re.IGNORECASE)
        text = re.sub(r'\s*```', '', text)
        
        # 查找JSON对象或数组
        json_patterns = [
            r'\{[^{}]*\}',      # 简单对象
            r'\[[^\[\]]*\]',    # 简单数组
            r'\{.*\}',          # 复杂对象(非贪婪)
            r'\[.*\]',          # 复杂数组(非贪婪)
        ]
        
        for pattern in json_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                return match.group(0)
        
        # 如果没有找到,返回清理后的文本
        return text.strip()
    
    def get_format_instructions(self) -> str:
        """返回JSON格式说明"""
        if self.pydantic_object:
            # 如果有Pydantic模型,返回模型架构
            schema = self.pydantic_object.schema()
            return f"你的回答必须是有效的JSON格式,符合以下架构:\n{json.dumps(schema, indent=2, ensure_ascii=False)}"
        else:
            return "你的回答必须是有效的JSON格式"
    
    @property
    def _type(self) -> str:
        return "json"

# JSON解析器高级功能
class JsonOutputParserAdvanced(JsonOutputParser):
    """高级JSON输出解析器"""
    
    def __init__(self, pydantic_object: Optional[Type[BaseModel]] = None, 
                 validation_mode: str = "strict",
                 repair_json: bool = True):
        super().__init__(pydantic_object)
        self.validation_mode = validation_mode
        self.repair_json = repair_json
    
    def parse(self, text: str) -> Any:
        """增强的JSON解析,支持修复和验证"""
        try:
            # 尝试直接解析
            return super().parse(text)
        except ValueError as e:
            if self.repair_json:
                # 尝试修复JSON
                repaired_text = self._repair_json(text)
                if repaired_text:
                    return super().parse(repaired_text)
            
            # 如果修复失败,抛出原始错误
            raise e
    
    def _repair_json(self, text: str) -> Optional[str]:
        """尝试修复损坏的JSON"""
        # 常见的JSON修复策略
        
        # 1. 修复引号
        text = re.sub(r'"([^"]*)"', r'"\1"', text)  # 标准化引号
        text = re.sub(r"'([^']*)'", r'"\1"', text)  # 替换单引号为双引号
        
        # 2. 修复缺失的引号
        text = re.sub(r'(\w+):', r'"\1":', text)  # 给键添加引号
        
        # 3. 修复缺失的逗号
        text = re.sub(r'}\s*{', r'}, {', text)  # 对象之间添加逗号
        text = re.sub(r']\s*\[', r'], [', text)  # 数组之间添加逗号
        
        # 4. 修复不完整的结构
        if not text.strip().startswith(('{', '[')):
            # 尝试提取JSON内容
            json_match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)
            if json_match:
                text = json_match.group(1)
        
        # 验证修复后的JSON
        try:
            json.loads(text)
            return text
        except json.JSONDecodeError:
            return None

# JSON解析器使用示例
json_parser_examples = '''
# 基础JSON解析
json_parser = JsonOutputParser()
result = json_parser.parse('{"name": "John", "age": 30}')
print(result)  # {'name': 'John', 'age': 30}

# 带Markdown代码块的JSON
markdown_json = '''
    ```json
    {
        "product": "LangChain",
        "features": ["prompts", "chains", "agents"],
        "rating": 4.8
    }
    ```
'''
result = json_parser.parse(markdown_json)
print(result)  # 解析成功

# 使用Pydantic模型
from pydantic import BaseModel

class Product(BaseModel):
    name: str
    features: List[str]
    rating: float

pydantic_parser = JsonOutputParser(pydantic_object=Product)
result = pydantic_parser.parse('{"name": "LangChain", "features": ["RAG"], "rating": 4.9}')
print(result)  # Product(name='LangChain', features=['RAG'], rating=4.9)
'''

2. Pydantic模型集成

# 文件: libs/langchain-core/output_parsers/pydantic.py

from pydantic import BaseModel, ValidationError
from typing import Type, Any

class PydanticOutputParser(JsonOutputParser):
    """Pydantic模型输出解析器"""
    
    def __init__(self, pydantic_object: Type[BaseModel]):
        """初始化Pydantic解析器"""
        super().__init__(pydantic_object=pydantic_object)
        self.pydantic_object = pydantic_object
    
    def parse(self, text: str) -> BaseModel:
        """解析并验证Pydantic模型"""
        try:
            # 先使用父类方法解析JSON
            json_data = super().parse(text)
            
            # 使用Pydantic模型验证
            return self.pydantic_object(**json_data)
            
        except ValidationError as e:
            # 提供详细的验证错误信息
            error_details = []
            for error in e.errors():
                field = " -> ".join(str(loc) for loc in error["loc"])
                error_details.append(f"字段 '{field}': {error['msg']}")
            
            raise ValueError(f"Pydantic验证失败:\n" + "\n".join(error_details))
        
        except Exception as e:
            raise ValueError(f"Pydantic输出解析失败: {e}")
    
    def get_format_instructions(self) -> str:
        """返回Pydantic模型的格式说明"""
        schema = self.pydantic_object.schema()
        
        # 生成详细的格式说明
        instructions = f"""
        你的回答必须是有效的JSON格式,并且符合以下Pydantic模型架构:
        
        模型名称: {self.pydantic_object.__name__}
        
        字段说明:
        """
        
        # 添加字段说明
        if "properties" in schema:
            for field_name, field_info in schema["properties"].items():
                field_type = field_info.get("type", "unknown")
                field_description = field_info.get("description", "")
                is_required = field_name in schema.get("required", [])
                
                instructions += f"\n{field_name} ({field_type})"
                if field_description:
                    instructions += f": {field_description}"
                if not is_required:
                    instructions += " (可选)"
        
        instructions += "\n\n请确保所有必填字段都有值,并且数据类型正确。"
        
        return instructions
    
    @property
    def _type(self) -> str:
        return f"pydantic_{self.pydantic_object.__name__}"

# Pydantic解析器高级功能
class PydanticOutputParserAdvanced(PydanticOutputParser):
    """高级Pydantic输出解析器"""
    
    def __init__(self, pydantic_object: Type[BaseModel], 
                 validation_mode: str = "strict",
                 field_descriptions: Optional[Dict[str, str]] = None):
        super().__init__(pydantic_object)
        self.validation_mode = validation_mode
        self.field_descriptions = field_descriptions or {}
    
    def parse(self, text: str) -> BaseModel:
        """增强的Pydantic解析"""
        try:
            # 基础解析
            result = super().parse(text)
            
            # 根据验证模式进行额外验证
            if self.validation_mode == "strict":
                self._strict_validation(result)
            elif self.validation_mode == "lenient":
                self._lenient_validation(result)
            
            return result
            
        except Exception as e:
            # 尝试修复和重新验证
            if self.validation_mode == "repair":
                return self._repair_and_validate(text)
            
            raise e
    
    def _strict_validation(self, result: BaseModel) -> None:
        """严格验证模式"""
        # 检查所有必填字段
        for field_name, field_info in self.pydantic_object.schema()["properties"].items():
            if field_name in self.pydantic_object.schema().get("required", []):
                value = getattr(result, field_name)
                if value is None or (isinstance(value, str) and not value.strip()):
                    raise ValueError(f"必填字段 '{field_name}' 不能为空")
    
    def _lenient_validation(self, result: BaseModel) -> None:
        """宽松验证模式"""
        # 允许一些字段为空或格式不完全正确
        # 主要验证数据类型和基本格式
        pass
    
    def _repair_and_validate(self, text: str) -> BaseModel:
        """修复并重新验证"""
        # 尝试修复JSON数据
        repaired_data = self._repair_json_data(text)
        
        if repaired_data:
            try:
                return self.pydantic_object(**repaired_data)
            except ValidationError:
                pass
        
        # 如果修复失败,尝试部分验证
        return self._partial_validation(text)
    
    def _repair_json_data(self, text: str) -> Optional[Dict[str, Any]]:
        """修复JSON数据"""
        try:
            json_data = json.loads(text)
            
            # 获取模型架构
            schema = self.pydantic_object.schema()
            
            # 修复每个字段
            repaired_data = {}
            for field_name, field_info in schema["properties"].items():
                if field_name in json_data:
                    value = json_data[field_name]
                    repaired_value = self._repair_field_value(field_name, value, field_info)
                    repaired_data[field_name] = repaired_value
            
            return repaired_data
            
        except Exception:
            return None
    
    def _repair_field_value(self, field_name: str, value: Any, field_info: Dict[str, Any]) -> Any:
        """修复字段值"""
        field_type = field_info.get("type", "string")
        
        if field_type == "string":
            return str(value) if value is not None else ""
        elif field_type == "integer":
            try:
                return int(value)
            except (ValueError, TypeError):
                return 0
        elif field_type == "number":
            try:
                return float(value)
            except (ValueError, TypeError):
                return 0.0
        elif field_type == "boolean":
            return bool(value)
        elif field_type == "array":
            return list(value) if isinstance(value, (list, tuple)) else []
        else:
            return value

# Pydantic解析器使用示例
pydantic_parser_examples = '''
# 定义Pydantic模型
from pydantic import BaseModel, Field
from typing import List, Optional

class Product(BaseModel):
    name: str = Field(description="产品名称")
    features: List[str] = Field(description="产品特性")
    price: float = Field(description="价格")
    in_stock: bool = Field(description="是否有库存")
    tags: Optional[List[str]] = Field(None, description="标签")

# 创建解析器
parser = PydanticOutputParser(pydantic_object=Product)

# 测试解析
json_text = '''
{
    "name": "LangChain Pro",
    "features": ["RAG", "Agents", "Memory"],
    "price": 99.99,
    "in_stock": true,
    "tags": ["AI", "NLP"]
}
'''

result = parser.parse(json_text)
print(result)  # Product对象,字段已验证

# 获取格式说明
instructions = parser.get_format_instructions()
print(instructions)  # 详细的JSON格式要求
'''

高级输出解析器分析

1. 结构化输出解析器

# 文件: libs/langchain-core/output_parsers/structured.py

class StructuredOutputParser(BaseOutputParser):
    """结构化输出解析器"""
    
    def __init__(self, response_schemas: List[ResponseSchema]):
        """初始化结构化解析器"""
        self.response_schemas = response_schemas
    
    def parse(self, text: str) -> Dict[str, Any]:
        """解析结构化输出"""
        # 构建解析正则表达式
        pattern = self._build_parsing_pattern()
        
        # 使用正则表达式提取结构化数据
        match = re.search(pattern, text, re.DOTALL)
        
        if match:
            result = {}
            for schema in self.response_schemas:
                field_name = schema.name
                if field_name in match.groupdict():
                    field_value = match.group(field_name)
                    
                    # 应用字段类型转换
                    if schema.type == "integer":
                        field_value = int(field_value)
                    elif schema.type == "number":
                        field_value = float(field_value)
                    elif schema.type == "boolean":
                        field_value = field_value.lower() in ("true", "1", "yes")
                    
                    result[field_name] = field_value
            
            return result
        else:
            raise ValueError(f"无法从文本中提取结构化数据。文本: {text}")
    
    def _build_parsing_pattern(self) -> str:
        """构建解析正则表达式模式"""
        patterns = []
        
        for schema in self.response_schemas:
            field_pattern = self._build_field_pattern(schema)
            patterns.append(field_pattern)
        
        # 组合所有字段模式
        return r'\n'.join(patterns)
    
    def _build_field_pattern(self, schema: ResponseSchema) -> str:
        """构建单个字段的正则表达式模式"""
        field_name = schema.name
        field_description = schema.description
        
        # 基础模式:字段名和值
        base_pattern = rf"{field_name}:\s*(?P<{field_name}>.*?)"
        
        # 根据字段类型调整模式
        if schema.type == "integer":
            base_pattern = rf"{field_name}:\s*(?P<{field_name}>\d+)"
        elif schema.type == "number":
            base_pattern = rf"{field_name}:\s*(?P<{field_name}>\d+\.?\d*)"
        elif schema.type == "boolean":
            base_pattern = rf"{field_name}:\s*(?P<{field_name}>(true|false|yes|no|1|0))"
        
        return base_pattern
    
    def get_format_instructions(self) -> str:
        """返回格式说明"""
        instructions = "你的回答必须严格遵循以下格式:\n\n"
        
        for schema in self.response_schemas:
            instructions += f"{schema.name}: {schema.description}\n"
        
        return instructions

# ResponseSchema定义
class ResponseSchema(BaseModel):
    """响应模式定义"""
    
    name: str = Field(description="字段名称")
    description: str = Field(description="字段描述")
    type: str = Field(default="string", description="字段类型")
    
    class Config:
        schema_extra = {
            "example": {
                "name": "answer",
                "description": "问题的答案",
                "type": "string"
            }
        }

# 结构化解析器使用示例
structured_parser_examples = '''
# 定义响应模式
response_schemas = [
    ResponseSchema(name="answer", description="问题的答案"),
    ResponseSchema(name="confidence", description="回答的置信度", type="number"),
    ResponseSchema(name="sources", description="信息来源列表", type="array")
]

# 创建解析器
parser = StructuredOutputParser(response_schemas=response_schemas)

# 测试解析
structured_text = '''
answer: LangChain是一个用于构建LLM应用的框架
confidence: 0.95
sources: ["官方文档", "GitHub仓库", "社区教程"]
'''

result = parser.parse(structured_text)
print(result)  # {'answer': 'LangChain是一个用于构建LLM应用的框架', 'confidence': 0.95, 'sources': ["官方文档", "GitHub仓库", "社区教程"]}

# 获取格式说明
instructions = parser.get_format_instructions()
print(instructions)
'''

2. 日期时间输出解析器

# 文件: libs/langchain-core/output_parsers/datetime.py

from datetime import datetime
import dateutil.parser as parser

class DatetimeOutputParser(BaseOutputParser):
    """日期时间输出解析器"""
    
    def __init__(self, format: str = "%Y-%m-%d %H:%M:%S", 
                 timezone: Optional[str] = None):
        """初始化日期时间解析器"""
        self.format = format
        self.timezone = timezone
    
    def parse(self, text: str) -> datetime:
        """解析日期时间文本"""
        try:
            # 使用dateutil解析各种日期格式
            dt = parser.parse(text)
            
            # 处理时区
            if self.timezone:
                import pytz
                tz = pytz.timezone(self.timezone)
                if dt.tzinfo is None:
                    dt = tz.localize(dt)
                else:
                    dt = dt.astimezone(tz)
            
            return dt
            
        except (ValueError, TypeError) as e:
            raise ValueError(f"无法解析日期时间: '{text}'. 错误: {e}")
    
    def get_format_instructions(self) -> str:
        """返回日期时间格式说明"""
        if self.format:
            return f"你的回答必须是有效的日期时间,格式为: {self.format}"
        else:
            return "你的回答必须是有效的日期时间,支持多种常见格式"
    
    @property
    def _type(self) -> str:
        return "datetime"

# 高级日期时间解析器
class DatetimeOutputParserAdvanced(DatetimeOutputParser):
    """高级日期时间输出解析器"""
    
    def __init__(self, format: str = "%Y-%m-%d %H:%M:%S", 
                 timezone: Optional[str] = None,
                 relative_base: Optional[datetime] = None):
        super().__init__(format, timezone)
        self.relative_base = relative_base or datetime.now()
    
    def parse(self, text: str) -> datetime:
        """增强的日期时间解析"""
        try:
            # 尝试直接解析
            return super().parse(text)
        except ValueError:
            # 尝试解析相对时间
            return self._parse_relative_time(text)
    
    def _parse_relative_time(self, text: str) -> datetime:
        """解析相对时间表达"""
        text_lower = text.lower().strip()
        
        # 常见相对时间模式
        relative_patterns = {
            r'(\d+)\s*seconds?\s*ago': lambda m: self.relative_base - timedelta(seconds=int(m.group(1))),
            r'(\d+)\s*minutes?\s*ago': lambda m: self.relative_base - timedelta(minutes=int(m.group(1))),
            r'(\d+)\s*hours?\s*ago': lambda m: self.relative_base - timedelta(hours=int(m.group(1))),
            r'(\d+)\s*days?\s*ago': lambda m: self.relative_base - timedelta(days=int(m.group(1))),
            r'(\d+)\s*weeks?\s*ago': lambda m: self.relative_base - timedelta(weeks=int(m.group(1))),
            r'in\s*(\d+)\s*seconds?': lambda m: self.relative_base + timedelta(seconds=int(m.group(1))),
            r'in\s*(\d+)\s*minutes?': lambda m: self.relative_base + timedelta(minutes=int(m.group(1))),
            r'in\s*(\d+)\s*hours?': lambda m: self.relative_base + timedelta(hours=int(m.group(1))),
            r'in\s*(\d+)\s*days?': lambda m: self.relative_base + timedelta(days=int(m.group(1))),
            r'in\s*(\d+)\s*weeks?': lambda m: self.relative_base + timedelta(weeks=int(m.group(1))),
        }
        
        import re
        from datetime import timedelta
        
        for pattern, handler in relative_patterns.items():
            match = re.search(pattern, text_lower)
            if match:
                return handler(match)
        
        # 尝试解析自然语言日期
        return self._parse_natural_language_date(text)
    
    def _parse_natural_language_date(self, text: str) -> datetime:
        """解析自然语言日期表达"""
        text_lower = text.lower().strip()
        
        # 常见自然语言模式
        natural_patterns = {
            'today': lambda: self.relative_base.replace(hour=0, minute=0, second=0, microsecond=0),
            'tomorrow': lambda: (self.relative_base + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0),
            'yesterday': lambda: (self.relative_base - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0),
            'now': lambda: self.relative_base,
            'next week': lambda: (self.relative_base + timedelta(weeks=1)).replace(hour=0, minute=0, second=0, microsecond=0),
            'last week': lambda: (self.relative_base - timedelta(weeks=1)).replace(hour=0, minute=0, second=0, microsecond=0),
        }
        
        if text_lower in natural_patterns:
            return natural_patterns[text_lower]()
        
        # 如果都不匹配,回退到基础解析
        return super().parse(text)

# 日期时间解析器使用示例
datetime_parser_examples = '''
# 基础日期时间解析
datetime_parser = DatetimeOutputParser()
result = datetime_parser.parse("2024-01-15 14:30:00")
print(result)  # datetime(2024, 1, 15, 14, 30, 0)

# 相对时间解析
advanced_parser = DatetimeOutputParserAdvanced()
result = advanced_parser.parse("2 hours ago")
print(result)  # 当前时间减去2小时

# 自然语言解析
result = advanced_parser.parse("tomorrow")
print(result)  # 明天的日期,时间为00:00:00

# 复杂格式
result = advanced_parser.parse("in 3 days and 2 hours")
print(result)  # 当前时间加上3天2小时
'''

输出解析器链和组合

1. 组合输出解析器

# 文件: libs/langchain-core/output_parsers/combining.py

class CombiningOutputParser(BaseOutputParser):
    """组合多个输出解析器"""
    
    def __init__(self, parsers: List[BaseOutputParser]):
        """初始化组合解析器"""
        self.parsers = parsers
        
        # 验证输入变量不冲突
        all_input_variables = []
        for parser in parsers:
            if hasattr(parser, 'input_variables'):
                all_input_variables.extend(parser.input_variables)
        
        # 检查重复变量
        if len(all_input_variables) != len(set(all_input_variables)):
            raise ValueError("组合解析器中的输入变量存在重复")
    
    def parse(self, text: str) -> Dict[str, Any]:
        """使用所有解析器解析文本"""
        results = {}
        
        for i, parser in enumerate(self.parsers):
            try:
                # 尝试使用当前解析器
                result = parser.parse(text)
                results[f"parser_{i}"] = result
            except Exception as e:
                # 记录解析失败,但继续尝试其他解析器
                results[f"parser_{i}_error"] = str(e)
        
        return results
    
    def get_format_instructions(self) -> str:
        """返回所有解析器的格式说明"""
        instructions = []
        
        for i, parser in enumerate(self.parsers):
            parser_instructions = parser.get_format_instructions()
            if parser_instructions:
                instructions.append(f"解析器{i+1}: {parser_instructions}")
        
        return "\n\n".join(instructions) if instructions else ""

# 顺序输出解析器
class SequentialOutputParser(BaseOutputParser):
    """顺序应用多个输出解析器"""
    
    def __init__(self, parsers: List[BaseOutputParser], stop_on_error: bool = True):
        """初始化顺序解析器"""
        self.parsers = parsers
        self.stop_on_error = stop_on_error
    
    def parse(self, text: str) -> List[Any]:
        """顺序应用每个解析器"""
        results = []
        
        for parser in self.parsers:
            try:
                result = parser.parse(text)
                results.append(result)
                
            except Exception as e:
                if self.stop_on_error:
                    raise e
                else:
                    # 记录错误但继续
                    results.append({"error": str(e)})
        
        return results

# 条件输出解析器
class ConditionalOutputParser(BaseOutputParser):
    """条件输出解析器"""
    
    def __init__(self, condition_func: Callable[[str], bool], 
                 true_parser: BaseOutputParser,
                 false_parser: BaseOutputParser):
        """初始化条件解析器"""
        self.condition_func = condition_func
        self.true_parser = true_parser
        self.false_parser = false_parser
    
    def parse(self, text: str) -> Any:
        """根据条件选择解析器"""
        if self.condition_func(text):
            return self.true_parser.parse(text)
        else:
            return self.false_parser.parse(text)
    
    def get_format_instructions(self) -> str:
        """返回条件格式说明"""
        true_instructions = self.true_parser.get_format_instructions()
        false_instructions = self.false_parser.get_format_instructions()
        
        return f"如果条件满足: {true_instructions}\n否则: {false_instructions}"

# 组合解析器使用示例
combining_parser_examples = '''
# 组合多个解析器
parsers = [
    JsonOutputParser(),
    StrOutputParser(),
    CommaSeparatedListOutputParser()
]

combining_parser = CombiningOutputParser(parsers=parsers)

# 测试组合解析
text = '{"name": "test", "value": 123}, item1, item2, item3'
result = combining_parser.parse(text)
print(result)
# {'parser_0': {'name': 'test', 'value': 123}, 'parser_1': 'item1, item2, item3', 'parser_2': ['item1', 'item2', 'item3']}

# 顺序解析器
sequential_parser = SequentialOutputParser(parsers=[
    StrOutputParser(),
    JsonOutputParser()
])

result = sequential_parser.parse('{"test": "value"}')
print(result)  # ['{"test": "value"}', {'test': 'value'}]

# 条件解析器
def is_json(text: str) -> bool:
    return text.strip().startswith('{')

conditional_parser = ConditionalOutputParser(
    condition_func=is_json,
    true_parser=JsonOutputParser(),
    false_parser=StrOutputParser()
)

result1 = conditional_parser.parse('{"key": "value"}')  # 使用JSON解析器
result2 = conditional_parser.parse('plain text')        # 使用字符串解析器
'''

输出格式化系统性能优化

1. 缓存和重用优化

# 文件: libs/langchain-core/output_parsers/cache.py

class OutputParserCache:
    """输出解析器缓存系统"""
    
    def __init__(self, max_size: int = 1000, ttl: int = 3600):
        self.max_size = max_size
        self.ttl = ttl
        self.cache: Dict[str, tuple[Any, float]] = {}  # (result, timestamp)
        self.access_count: Dict[str, int] = {}
    
    def cache_result(self, key: str, result: Any) -> None:
        """缓存解析结果"""
        if len(self.cache) >= self.max_size:
            self._evict_lru()
        
        import time
        self.cache[key] = (result, time.time())
        self.access_count[key] = 0
    
    def get_cached_result(self, key: str) -> Optional[Any]:
        """获取缓存的解析结果"""
        if key in self.cache:
            result, timestamp = self.cache[key]
            
            # 检查是否过期
            import time
            if time.time() - timestamp > self.ttl:
                del self.cache[key]
                del self.access_count[key]
                return None
            
            self.access_count[key] += 1
            return result
        
        return None
    
    def _evict_lru(self) -> None:
        """淘汰最近最少使用的缓存"""
        if not self.access_count:
            return
        
        # 找到最少使用的项
        lru_key = min(self.access_count.keys(), key=lambda k: self.access_count[k])
        
        # 移除该项
        del self.cache[lru_key]
        del self.access_count[lru_key]

# 带缓存的输出解析器
class CachedOutputParser(BaseOutputParser):
    """带缓存功能的输出解析器装饰器"""
    
    def __init__(self, base_parser: BaseOutputParser, cache: Optional[OutputParserCache] = None):
        self.base_parser = base_parser
        self.cache = cache or OutputParserCache()
    
    def parse(self, text: str) -> Any:
        """带缓存的解析"""
        # 生成缓存键
        cache_key = f"{self.base_parser._type}_{hash(text)}"
        
        # 检查缓存
        cached_result = self.cache.get_cached_result(cache_key)
        if cached_result is not None:
            return cached_result
        
        # 执行解析
        result = self.base_parser.parse(text)
        
        # 缓存结果
        self.cache.cache_result(cache_key, result)
        
        return result
    
    def get_format_instructions(self) -> str:
        """返回基础解析器的格式说明"""
        return self.base_parser.get_format_instructions()
    
    @property
    def _type(self) -> str:
        return f"cached_{self.base_parser._type}"

2. 异步解析优化

# 文件: libs/langchain-core/output_parsers/async_utils.py

import asyncio
from typing import AsyncIterator, List, Any

class AsyncOutputParser(BaseOutputParser):
    """异步输出解析器基类"""
    
    async def aparse(self, text: str) -> Any:
        """异步解析文本"""
        # 默认实现:在executor中运行同步解析
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self.parse, text)
    
    async def astream_parse(self, text: str) -> AsyncIterator[Any]:
        """异步流式解析"""
        # 基础实现:一次性解析后yield
        result = await self.aparse(text)
        yield result

class AsyncBatchOutputParser:
    """异步批量输出解析器"""
    
    def __init__(self, base_parser: BaseOutputParser, max_concurrent: int = 10):
        self.base_parser = base_parser
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def parse_batch_async(self, texts: List[str]) -> List[Any]:
        """异步批量解析"""
        tasks = []
        
        for text in texts:
            task = self._parse_single_async(text)
            tasks.append(task)
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
        
        return results
    
    async def _parse_single_async(self, text: str) -> Any:
        """异步解析单个文本"""
        async with self.semaphore:
            if isinstance(self.base_parser, AsyncOutputParser):
                return await self.base_parser.aparse(text)
            else:
                # 对于非异步解析器,在executor中运行
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(None, self.base_parser.parse, text)
    
    async def stream_parse_batch_async(self, texts: List[str]) -> AsyncIterator[Any]:
        """异步流式批量解析"""
        for text in texts:
            if isinstance(self.base_parser, AsyncOutputParser):
                async for result in self.base_parser.astream_parse(text):
                    yield result
            else:
                result = await self._parse_single_async(text)
                yield result

# 异步使用示例
async_usage_example = '''
# 创建异步解析器
class AsyncJsonOutputParser(JsonOutputParser, AsyncOutputParser):
    """异步JSON输出解析器"""
    
    async def aparse(self, text: str) -> Any:
        """异步解析JSON"""
        # 模拟异步操作
        await asyncio.sleep(0.01)
        return super().parse(text)

# 使用异步批量解析
async def batch_parse_async():
    parser = AsyncJsonOutputParser()
    batch_parser = AsyncBatchOutputParser(parser, max_concurrent=5)
    
    texts = [
        '{"name": "test1", "value": 1}',
        '{"name": "test2", "value": 2}',
        '{"name": "test3", "value": 3}'
    ]
    
    results = await batch_parser.parse_batch_async(texts)
    
    for i, result in enumerate(results):
        print(f"结果{i+1}: {result}")

# 运行异步测试
# asyncio.run(batch_parse_async())
'''

实际应用案例分析

1. 智能数据分析系统

class IntelligentDataAnalysisSystem:
    """智能数据分析系统"""
    
    def __init__(self):
        self.setup_parsers()
    
    def setup_parsers(self):
        """设置各种输出解析器"""
        
        # 统计结果解析器
        self.stats_parser = JsonOutputParser()
        
        # 图表数据解析器
        self.chart_data_parser = StructuredOutputParser(
            response_schemas=[
                ResponseSchema(name="chart_type", description="图表类型", type="string"),
                ResponseSchema(name="data", description="图表数据", type="array"),
                ResponseSchema(name="labels", description="数据标签", type="array"),
                ResponseSchema(name="title", description="图表标题", type="string")
            ]
        )
        
        # 洞察提取解析器
        self.insight_parser = JsonOutputParser()
        
        # 报告生成解析器
        self.report_parser = StructuredOutputParser(
            response_schemas=[
                ResponseSchema(name="executive_summary", description="执行摘要", type="string"),
                ResponseSchema(name="key_findings", description="关键发现", type="array"),
                ResponseSchema(name="recommendations", description="建议", type="array"),
                ResponseSchema(name="technical_details", description="技术细节", type="string")
            ]
        )
        
        # 时间序列解析器
        self.time_series_parser = DatetimeOutputParserAdvanced(
            format="%Y-%m-%d %H:%M:%S",
            timezone="UTC"
        )
    
    def parse_statistical_results(self, raw_output: str) -> dict:
        """解析统计分析结果"""
        try:
            result = self.stats_parser.parse(raw_output)
            return {
                "type": "statistics",
                "data": result,
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }
    
    def parse_chart_data(self, raw_output: str) -> dict:
        """解析图表数据"""
        try:
            result = self.chart_data_parser.parse(raw_output)
            return {
                "type": "chart_data",
                "chart_type": result["chart_type"],
                "data": result["data"],
                "labels": result["labels"],
                "title": result["title"],
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }
    
    def parse_insights(self, raw_output: str) -> dict:
        """解析数据洞察"""
        try:
            result = self.insight_parser.parse(raw_output)
            return {
                "type": "insights",
                "insights": result.get("insights", []),
                "confidence": result.get("confidence", 0.0),
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }
    
    def parse_time_series_data(self, raw_output: str) -> dict:
        """解析时间序列数据"""
        try:
            # 假设输出格式为: "时间: 2024-01-15 10:30:00, 数值: 123.45"
            lines = raw_output.strip().split('\n')
            time_series_data = []
            
            for line in lines:
                if '时间:' in line and '数值:' in line:
                    # 提取时间和数值
                    time_match = re.search(r'时间:\s*([^\n,]+)', line)
                    value_match = re.search(r'数值:\s*([^\n,]+)', line)
                    
                    if time_match and value_match:
                        time_str = time_match.group(1).strip()
                        value_str = value_match.group(1).strip()
                        
                        # 解析时间
                        try:
                            timestamp = self.time_series_parser.parse(time_str)
                            value = float(value_str)
                            
                            time_series_data.append({
                                "timestamp": timestamp.isoformat(),
                                "value": value
                            })
                        except Exception as e:
                            print(f"解析时间序列数据失败: {e}")
                            continue
            
            return {
                "type": "time_series",
                "data": time_series_data,
                "parsed_at": datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }

# 使用示例
analysis_system = IntelligentDataAnalysisSystem()

# 测试各种解析功能
test_outputs = {
    "statistics": '{"mean": 25.5, "std": 5.2, "count": 1000}',
    "chart_data": "chart_type: bar\ndata: [10, 20, 30, 40, 50]\nlabels: ['A', 'B', 'C', 'D', 'E']\ntitle: 销售数据分析",
    "insights": '{"insights": ["销售呈上升趋势", "季节性波动明显"], "confidence": 0.85}',
    "time_series": "时间: 2024-01-15 10:30:00, 数值: 123.45\n时间: 2024-01-15 11:30:00, 数值: 145.67"
}

for output_type, raw_output in test_outputs.items():
    if output_type == "statistics":
        result = analysis_system.parse_statistical_results(raw_output)
    elif output_type == "chart_data":
        result = analysis_system.parse_chart_data(raw_output)
    elif output_type == "insights":
        result = analysis_system.parse_insights(raw_output)
    elif output_type == "time_series":
        result = analysis_system.parse_time_series_data(raw_output)
    
    print(f"\n{output_type.upper()} 解析结果:")
    print(json.dumps(result, indent=2, ensure_ascii=False, default=str))

2. 多语言内容解析系统

class MultilingualContentParser:
    """多语言内容解析系统"""
    
    def __init__(self):
        self.setup_multilingual_parsers()
    
    def setup_multilingual_parsers(self):
        """设置多语言解析器"""
        
        # 语言检测解析器
        self.language_detection_parser = JsonOutputParser()
        
        # 实体提取解析器
        self.entity_extraction_parser = StructuredOutputParser(
            response_schemas=[
                ResponseSchema(name="entities", description="提取的实体列表", type="array"),
                ResponseSchema(name="entity_types", description="实体类型", type="array"),
                ResponseSchema(name="confidence_scores", description="置信度分数", type="array")
            ]
        )
        
        # 情感分析解析器
        self.sentiment_analysis_parser = JsonOutputParser()
        
        # 关键词提取解析器
        self.keyword_extraction_parser = CommaSeparatedListOutputParser()
        
        # 摘要生成解析器
        self.summary_parser = JsonOutputParser()
    
    def parse_language_detection(self, raw_output: str) -> dict:
        """解析语言检测结果"""
        try:
            result = self.language_detection_parser.parse(raw_output)
            return {
                "type": "language_detection",
                "detected_languages": result.get("languages", []),
                "confidence_scores": result.get("confidence", {}),
                "primary_language": result.get("primary_language", "unknown"),
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }
    
    def parse_entity_extraction(self, raw_output: str) -> dict:
        """解析实体提取结果"""
        try:
            result = self.entity_extraction_parser.parse(raw_output)
            return {
                "type": "entity_extraction",
                "entities": result["entities"],
                "entity_types": result["entity_types"],
                "confidence_scores": result["confidence_scores"],
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }
    
    def parse_sentiment_analysis(self, raw_output: str) -> dict:
        """解析情感分析结果"""
        try:
            result = self.sentiment_analysis_parser.parse(raw_output)
            return {
                "type": "sentiment_analysis",
                "overall_sentiment": result.get("overall_sentiment", "neutral"),
                "sentiment_scores": result.get("sentiment_scores", {}),
                "language_specific": result.get("language_specific", {}),
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }
    
    def parse_keywords(self, raw_output: str) -> dict:
        """解析关键词提取结果"""
        try:
            keywords = self.keyword_extraction_parser.parse(raw_output)
            return {
                "type": "keyword_extraction",
                "keywords": keywords,
                "keyword_count": len(keywords),
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
        return {
            "type": "error",
            "error": str(e),
            "raw_output": raw_output
        }
    
    def parse_summary(self, raw_output: str) -> dict:
        """解析摘要生成结果"""
        try:
            result = self.summary_parser.parse(raw_output)
            return {
                "type": "summary",
                "summary": result.get("summary", ""),
                "key_points": result.get("key_points", []),
                "length": result.get("length", 0),
                "language": result.get("language", "unknown"),
                "parsed_at": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "type": "error",
                "error": str(e),
                "raw_output": raw_output
            }

# 使用示例
multilingual_parser = MultilingualContentParser()

# 测试多语言解析
multilingual_outputs = {
    "language_detection": '{"languages": ["zh", "en"], "confidence": {"zh": 0.8, "en": 0.6}, "primary_language": "zh"}',
    "entity_extraction": "entities: [\"北京\", \"上海\", \"深圳\"]\nentity_types: [\"城市\", \"城市\", \"城市\"]\nconfidence_scores: [0.95, 0.92, 0.88]",
    "sentiment_analysis": '{"overall_sentiment": "positive", "sentiment_scores": {"positive": 0.7, "neutral": 0.2, "negative": 0.1}, "language_specific": {"zh": "positive", "en": "neutral"}}',
    "keywords": "人工智能, 机器学习, 深度学习, 自然语言处理",
    "summary": '{"summary": "这是一篇关于人工智能发展的文章", "key_points": ["AI技术快速发展", "应用场景不断扩大", "面临挑战和机遇"], "length": 150, "language": "zh"}'
}

for output_type, raw_output in multilingual_outputs.items():
    if output_type == "language_detection":
        result = multilingual_parser.parse_language_detection(raw_output)
    elif output_type == "entity_extraction":
        result = multilingual_parser.parse_entity_extraction(raw_output)
    elif output_type == "sentiment_analysis":
        result = multilingual_parser.parse_sentiment_analysis(raw_output)
    elif output_type == "keywords":
        result = multilingual_parser.parse_keywords(raw_output)
    elif output_type == "summary":
        result = multilingual_parser.parse_summary(raw_output)
    
    print(f"\n{output_type.upper()} 多语言解析结果:")
    print(json.dumps(result, indent=2, ensure_ascii=False, default=str))

输出格式化系统最佳实践

1. 设计原则

输出格式化设计原则 = '''
1. 健壮性原则:
   - 容错处理各种格式错误
   - 提供详细的错误信息
   - 支持降级和回退机制

2. 灵活性原则:
   - 支持多种输出格式
   - 可配置的解析选项
   - 易于扩展和定制

3. 性能原则:
   - 最小化解析开销
   - 支持缓存和重用
   - 异步处理支持

4. 类型安全原则:
   - 明确的类型注解
   - 运行时类型检查
   - 编译时类型验证

5. 可维护性原则:
   - 模块化设计
   - 清晰的错误处理
   - 完善的文档和测试
'''

2. 性能优化技巧

输出格式化性能优化 = '''
1. 解析优化:
   - 使用高效的解析算法
   - 避免重复解析
   - 支持增量解析

2. 缓存策略:
   - 缓存解析结果
   - 缓存格式说明
   - 使用智能缓存淘汰

3. 异步处理:
   - 支持异步解析
   - 批量异步处理
   - 流式解析支持

4. 内存优化:
   - 及时释放大对象
   - 使用生成器
   - 避免内存泄漏

5. 错误处理优化:
   - 快速失败机制
   - 错误恢复策略
   - 详细的错误日志
'''

小结

通过本章的深入学习,我们全面掌握了LangChain格式化输出系统的源码实现:

核心架构理解

  1. 抽象层次设计:BaseOutputParser → 具体解析器实现的分层架构
  2. 解析器类型体系:从简单的字符串解析到复杂的结构化数据解析
  3. 格式支持能力:JSON、结构化、日期时间、枚举等多种输出格式

源码实现细节

  1. 基础解析器实现:StrOutputParser、List解析器等基础功能的完整实现
  2. JSON解析器深度分析:清理、提取、验证、修复的完整处理流程
  3. Pydantic集成:类型安全、模型验证、错误处理的高级功能实现

高级功能掌握

  1. 结构化解析:正则表达式模式匹配、字段类型转换、灵活格式支持
  2. 日期时间解析:多种格式支持、相对时间、自然语言处理
  3. 组合和条件解析:多解析器协作、条件选择、链式处理

实际应用能力

  1. 系统设计:能够设计复杂的输出格式化系统
  2. 性能优化:具备解析器性能调优和问题解决能力
  3. 扩展开发:能够基于源码进行功能扩展和架构改进

实际项目经验

  1. 智能数据分析系统:完整的结构化数据解析和处理流程
  2. 多语言内容解析:支持复杂多语言场景的解析系统
  3. 性能优化实践:缓存、异步、批量处理的高级优化技巧

格式化输出系统是AI应用的"结果翻译器",深入理解其源码让我们能够将LLM的原始文本输出转化为结构化的、程序可处理的数据,为构建智能化的数据处理流水线提供强大支撑! 🎯✨

在下一章中,我们将深入分析LangChain模型链(Chains)的源码实现,探索如何构建复杂的工作流和任务处理系统!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐