RAG上下文压缩技术深度剖析-多场景案例实战

简介

在大模型应用日益广泛的今天,RAG(Retrieval-Augmented Generation)作为连接外部知识库与生成能力的桥梁,已经成为许多智能应用的躲不开的技术。然而,随着知识库规模的不断扩大和检索精度要求的提高,一个关键问题逐渐浮现:如何在有限的上下文窗口中最大化信息密度,提升模型的理解和生成质量?

传统的检索系统通常不知道具体的查询需求,这意味着与查询最相关的信息可能被埋藏在包含大量无关文本的文档中。将这些"噪音"直接输入大模型,不仅会增加计算成本,还会稀释关键信息,导致生成质量下降。

上下文压缩技术应运而生,其核心理念是"少即是多"。通过智能化的信息提炼和去噪处理,我们可以在保证信息完整性的同时,大幅提升上下文利用效率。

传统RAG的局限性分析

上下文稀释问题

在传统RAG架构中,检索器通常会返回Top-K个文档块,这些文档块往往包含与查询语义无关的冗余信息。当这些信息被直接填入上下文窗口时,会出现以下问题:

  1. 信息密度低:有效信息被大量噪音稀释
  2. 计算资源浪费:处理无关信息消耗额外的计算资源
  3. 生成质量下降:模型难以聚焦于关键信息
  4. 上下文长度限制:有限的token预算被低价值信息占用

传统RAG流程示意

# 传统RAG的简化流程
def traditional_rag(query: str, retriever, generator):
    # 1. 检索相关文档
    documents = retriever.retrieve(query, top_k=5)
    
    # 2. 直接拼接所有检索到的文档
    context = "\n".join([doc.content for doc in documents])
    
    # 3. 构建提示词并生成答案
    prompt = f"基于以下上下文回答问题:\n{context}\n\n问题:{query}"
    answer = generator.generate(prompt)
    
    return answer

这种方式的问题显而易见:所有检索到的内容都被无差别地输入模型,导致噪音与信号同时传递。

上下文压缩技术原理

核心思想

**上下文压缩的核心思想是:**不立即返回检索到的文档,而是使用给定查询的上下文来压缩这些文档,以便只返回相关信息。这里的"压缩"包含两个维度:

  1. 内容压缩:减少单个文档的冗余内容
  2. 文档过滤:移除完全不相关的文档

技术架构

上下文压缩RAG采用"两阶段处理"架构:

查询 → 检索器 → 原始文档集 → 压缩器 → 精炼文档集 → 生成器 → 最终答案

这种架构让大模型分工明确:

  • 第一阶段:专注于信息提炼,
  • 第二阶段:专注于答案生成。

三种压缩策略深度解析

在这里插入图片描述
在这里插入图片描述

1. 信息过滤策略

信息过滤是最直接的压缩方式,通过语义相似度计算或关键词匹配来移除无关内容。

实现原理
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Tuple

class InformationFilter:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2", threshold: float = 0.5):
        """
        信息过滤器初始化
        
        Args:
            model_name: 用于编码的句子变换器模型
            threshold: 相似度阈值,低于此值的内容将被过滤
        """
        self.encoder = SentenceTransformer(model_name)
        self.threshold = threshold
    
    def filter_sentences(self, query: str, document: str) -> str:
        """
        基于语义相似度过滤文档中的句子
        
        Args:
            query: 用户查询
            document: 待过滤的文档内容
            
        Returns:
            过滤后的文档内容
        """
        # 将文档分割为句子
        sentences = self._split_into_sentences(document)
        
        # 计算查询和每个句子的嵌入向量
        query_embedding = self.encoder.encode([query])
        sentence_embeddings = self.encoder.encode(sentences)
        
        # 计算相似度
        similarities = np.dot(query_embedding, sentence_embeddings.T)[0]
        
        # 过滤低相似度的句子
        filtered_sentences = [
            sentence for sentence, similarity in zip(sentences, similarities)
            if similarity > self.threshold
        ]
        
        return " ".join(filtered_sentences)
    
    def _split_into_sentences(self, text: str) -> List[str]:
        """
        简单的句子分割实现
        """
        import re
        # 使用正则表达式进行句子分割
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if s.strip()]

# 使用示例
filter_compressor = InformationFilter(threshold=0.3)

def filter_based_compression(query: str, documents: List[str]) -> List[str]:
    """
    使用信息过滤策略压缩文档集合
    """
    compressed_docs = []
    for doc in documents:
        filtered_content = filter_compressor.filter_sentences(query, doc)
        if filtered_content:  # 只保留非空的过滤结果
            compressed_docs.append(filtered_content)
    
    return compressed_docs
优势与局限

优势:

  • 计算效率高,适合大规模文档处理
  • 实现简单,易于理解和调试
  • 对硬件要求相对较低

局限:

  • 可能会丢失上下文依赖的重要信息
  • 语义理解能力有限,依赖预训练模型质量
  • 阈值设置需要经验调优

2. 生成摘要策略

生成摘要策略利用大模型的理解能力,对检索到的文档进行智能化概括。

实现方案
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
import torch

class SummaryCompressor:
    def __init__(self, model_name: str = "facebook/bart-large-cnn", max_length: int = 150):
        """
        摘要压缩器初始化
        
        Args:
            model_name: 用于生成摘要的模型名称
            max_length: 摘要的最大长度
        """
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(self.device)
        self.max_length = max_length
    
    def compress_document(self, query: str, document: str) -> str:
        """
        基于查询生成文档的相关摘要
        
        Args:
            query: 用户查询,用于引导摘要生成
            document: 待压缩的文档
            
        Returns:
            生成的摘要内容
        """
        # 构建摘要生成的提示词
        prompt = f"""
        请基于以下查询生成相关的摘要:
        查询:{query}
        
        文档内容:
        {document}
        
        摘要:
        """
        
        # Token化输入
        inputs = self.tokenizer.encode(
            prompt, 
            return_tensors="pt", 
            max_length=1024, 
            truncation=True
        ).to(self.device)
        
        # 生成摘要
        with torch.no_grad():
            summary_ids = self.model.generate(
                inputs,
                max_length=self.max_length,
                min_length=30,
                length_penalty=2.0,
                num_beams=4,
                early_stopping=True
            )
        
        # 解码生成的摘要
        summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        return summary
    
    def batch_compress(self, query: str, documents: List[str]) -> List[str]:
        """
        批量压缩文档集合
        """
        compressed_summaries = []
        
        for doc in documents:
            try:
                summary = self.compress_document(query, doc)
                if len(summary.strip()) > 10:  # 确保摘要有意义
                    compressed_summaries.append(summary)
            except Exception as e:
                print(f"摘要生成失败: {e}")
                continue
        
        return compressed_summaries

# 使用示例
summary_compressor = SummaryCompressor()

def summary_based_compression(query: str, documents: List[str]) -> List[str]:
    """
    使用摘要生成策略压缩文档
    """
    return summary_compressor.batch_compress(query, documents)
高级摘要技术

对于更高质量的摘要生成,我们可以使用指令调优的大模型:

import openai
from typing import List, Optional

class AdvancedSummaryCompressor:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        """
        高级摘要压缩器,使用GPT模型
        """
        openai.api_key = api_key
        self.model = model
    
    def generate_contextual_summary(self, query: str, document: str) -> Optional[str]:
        """
        生成上下文相关的摘要
        """
        prompt = f"""
        作为一个专业的信息提取专家,请根据用户查询从以下文档中提取最相关的信息并生成简洁的摘要。

        用户查询:{query}

        文档内容:
        {document}

        要求:
        1. 只提取与查询直接相关的信息
        2. 保持原文的准确性,不要添加推测
        3. 摘要长度控制在100-200字
        4. 如果文档与查询完全无关,请回复"无相关信息"

        摘要:
        """
        
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个专业的信息提取和摘要生成助手。"},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=300,
                temperature=0.1
            )
            
            summary = response.choices[0].message.content.strip()
            return summary if summary != "无相关信息" else None
            
        except Exception as e:
            print(f"API调用失败: {e}")
            return None

3. 精准信息提取策略

精准信息提取是最具挑战性的压缩策略,它要求模型能够准确识别并提取与查询直接相关的具体信息片段。

实现框架
import spacy
import re
from collections import defaultdict
from typing import Dict, List, Tuple, Set

class PrecisionExtractor:
    def __init__(self, spacy_model: str = "zh_core_web_sm"):
        """
        精准信息提取器
        
        Args:
            spacy_model: SpaCy语言模型名称
        """
        try:
            self.nlp = spacy.load(spacy_model)
        except OSError:
            print(f"未找到模型 {spacy_model},使用基础版本")
            self.nlp = spacy.load("zh_core_web_sm")
        
    def extract_entities_and_relations(self, query: str, document: str) -> Dict[str, List[str]]:
        """
        提取实体和关系信息
        
        Args:
            query: 用户查询
            document: 待处理文档
            
        Returns:
            包含实体和关系的字典
        """
        # 处理查询和文档
        query_doc = self.nlp(query)
        doc_processed = self.nlp(document)
        
        # 提取查询中的关键实体
        query_entities = {ent.text.lower() for ent in query_doc.ents}
        query_keywords = {token.lemma_.lower() for token in query_doc 
                         if not token.is_stop and not token.is_punct and len(token.text) > 1}
        
        extracted_info = defaultdict(list)
        
        # 查找包含查询实体的句子
        for sent in doc_processed.sents:
            sent_text = sent.text.strip()
            sent_lower = sent_text.lower()
            
            # 检查是否包含查询实体
            entity_match = any(entity in sent_lower for entity in query_entities)
            keyword_match = any(keyword in sent_lower for keyword in query_keywords)
            
            if entity_match or keyword_match:
                # 提取句子中的实体
                sentence_entities = [(ent.text, ent.label_) for ent in sent.ents]
                
                if sentence_entities:
                    extracted_info["relevant_sentences"].append(sent_text)
                    extracted_info["entities"].extend(sentence_entities)
        
        return dict(extracted_info)
    
    def extract_key_value_pairs(self, query: str, document: str) -> List[Tuple[str, str]]:
        """
        提取关键值对信息
        """
        # 定义常见的键值对模式
        patterns = [
            r'([^::]+)[::]([^。,,\n]+)',  # 冒号分割
            r'([^是]+)是([^。,,\n]+)',      # "是"连接
            r'([^为]+)为([^。,,\n]+)',      # "为"连接
            r'(\w+)[::](\d+[%元万亿]?)',   # 数字值
        ]
        
        key_value_pairs = []
        
        for pattern in patterns:
            matches = re.findall(pattern, document)
            for key, value in matches:
                key = key.strip()
                value = value.strip()
                
                # 检查是否与查询相关
                if any(word in key.lower() or word in value.lower() 
                       for word in query.lower().split()):
                    key_value_pairs.append((key, value))
        
        return key_value_pairs
    
    def extract_precise_information(self, query: str, document: str) -> str:
        """
        精准提取与查询相关的信息
        """
        # 提取实体和关系
        entity_info = self.extract_entities_and_relations(query, document)
        
        # 提取键值对
        kv_pairs = self.extract_key_value_pairs(query, document)
        
        # 构建精炼的信息摘要
        extracted_content = []
        
        # 添加相关句子(限制数量)
        if entity_info.get("relevant_sentences"):
            extracted_content.extend(entity_info["relevant_sentences"][:3])
        
        # 添加键值对信息
        if kv_pairs:
            kv_text = "; ".join([f"{k}{v}" for k, v in kv_pairs[:5]])
            extracted_content.append(f"关键信息:{kv_text}")
        
        return " ".join(extracted_content) if extracted_content else ""

# 使用示例
precision_extractor = PrecisionExtractor()

def precision_based_compression(query: str, documents: List[str]) -> List[str]:
    """
    使用精准信息提取策略压缩文档
    """
    compressed_info = []
    
    for doc in documents:
        extracted = precision_extractor.extract_precise_information(query, doc)
        if extracted and len(extracted.strip()) > 20:
            compressed_info.append(extracted)
    
    return compressed_info

完整的上下文压缩RAG实现

现在让我们将三种策略整合到一个完整的RAG系统中:

import numpy as np
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from enum import Enum

class CompressionStrategy(Enum):
    """压缩策略枚举类:用于定义不同的上下文压缩思路。
    
    - FILTER:过滤策略。基于规则/相似度/关键字等筛掉与查询无关或价值较低的句子/段落。
    - SUMMARY:摘要策略。对较长文本做抽取式/生成式摘要,保留核心语义,丢弃细节。
    - PRECISION:精度压缩策略。对结构化数据或可量化信息降低精度(例如小数位截断、离散化)。
    - HYBRID:混合策略。按流程组合多种策略(例如先过滤→再摘要→再做精度压缩)。
    
    选型建议(经验法则):
    - 实时问答、只需要关键信息:优先 FILTER 或 PRECISION
    - 需要对长文档快速把握大意:优先 SUMMARY
    - 数据来源复杂、长度不定:优先 HYBRID
    """
    FILTER = "filter"
    SUMMARY = "summary" 
    PRECISION = "precision"
    HYBRID = "hybrid"

@dataclass
class Document:
    """文档数据结构:
    - content:文本内容
    - metadata:可选的元信息,如来源、时间戳、文档ID、得分等。用于生成可追溯答案。
    - score:检索得分(可用于二次排序或可信度展示)
    """
    content: str
    metadata: Dict = None
    score: float = 0.0

class ContextCompressionRAG:
    def __init__(self, 
                 retriever,
                 generator, 
                 compression_strategy: CompressionStrategy = CompressionStrategy.HYBRID):

        """
        上下文压缩 RAG 系统入口。
        
        Args:
            retriever: 文档检索器(需实现 RetrieverProtocol)
            generator: 答案生成器(需实现 GeneratorProtocol)
            compression_strategy: 压缩策略(默认 HYBRID,实战中更稳妥)
        
        内部成员:
           - filter_compressor:信息过滤器,需具备 filter_sentences(query, doc_text) -> str
           - summary_compressor:摘要器,需具备 compress_document(query, doc_text) -> str
            - precision_extractor:精键信息提取/精度压缩器,需具备 extract_precise_information(query, doc_text) -> str
        """
        self.retriever = retriever
        self.generator = generator
        self.compression_strategy = compression_strategy
        
        # 初始化各种压缩器
        self.filter_compressor = InformationFilter()
        self.summary_compressor = SummaryCompressor()
        self.precision_extractor = PrecisionExtractor()
    
    def retrieve_and_compress(self, query: str, top_k: int = 5) -> List[str]:
        """
        检索并压缩文档。
        
        处理流程:
        1) 调用 retriever 检索与 query 相关的原始文档(raw_documents)
        2) 仅取其 content 字段形成列表(document_contents)
        3) 根据预设压缩策略对文档列表做压缩,得到压缩后的文本列表
        
        Args:
            query: 用户查询
            top_k: 检索文档数量(默认5;建议根据平均文档长度/模型上下文大小进行动态调整)
            
        Returns:
            压缩后的文档内容列表(每个元素为 str)
            
        注意:
            - 该函数返回的是“压缩后”的上下文片段,不含原文长度。
            - 若需要准确压缩比,建议在此函数中同时返回原始总长度或原文列表。
        """
        # 1. 检索相关文档
        raw_documents = self.retriever.retrieve(query, top_k=top_k)
        document_contents = [doc.content for doc in raw_documents]
        
        # 2. 根据策略进行压缩
        if self.compression_strategy == CompressionStrategy.FILTER:
            return filter_based_compression(query, document_contents)
        
        elif self.compression_strategy == CompressionStrategy.SUMMARY:
            return summary_based_compression(query, document_contents)
        
        elif self.compression_strategy == CompressionStrategy.PRECISION:
            return precision_based_compression(query, document_contents)
        
        elif self.compression_strategy == CompressionStrategy.HYBRID:
            return self._hybrid_compression(query, document_contents)
        
        else:
            return document_contents  # 不压缩
    
    def _hybrid_compression(self, query: str, documents: List[str]) -> List[str]:
        """
        混合压缩策略:结合多种方法的优势。
        
        推荐流程(可按需调整):
            对每个文档 doc:
              1) 先做“信息过滤”,去掉与 query 不相关的句子
              2) 若过滤后仍然较长(阈值可配置,如 500 字),尝试“摘要”
              3) 摘要失败(或不适用)时,退化为“精准提取/精度压缩”
              4) 若内容已足够短,则直接做“精准提取”,保证信息密度
        
        返回:
            每个文档对应一个压缩后的片段(str),过滤后为空的文档会被跳过。
        """
        compressed_results = []
        
        for doc in documents:
            # 首先进行信息过滤
            filtered_content = self.filter_compressor.filter_sentences(query, doc)
            
            if not filtered_content:
                continue
            
            # 如果过滤后内容仍然很长,进行摘要
            if len(filtered_content) > 500:
                try:
                    summary = self.summary_compressor.compress_document(query, filtered_content)
                    compressed_results.append(summary)
                except:
                    # 摘要失败时使用精准提取
                    extracted = self.precision_extractor.extract_precise_information(query, filtered_content)
                    if extracted:
                        compressed_results.append(extracted)
            else:
                # 内容较短时使用精准提取
                extracted = self.precision_extractor.extract_precise_information(query, filtered_content)
                compressed_results.append(extracted if extracted else filtered_content)
        
        return compressed_results
    
    def generate_answer(self, query: str) -> Dict[str, any]:
        """
        基于压缩后的上下文生成最终答案,并返回附带的元信息。
        
        Args:
            query: 用户查询
        
        Returns:
            {
                "answer": str,                # 生成的答案
                "context_used": List[str],    # 使用的压缩上下文片段(便于可解释性/调试)
                "compression_ratio": float,   # 压缩比(示例计算,详见下方“注意”)
                "context_length": int         # 拼接后上下文长度(字符数)
            }
        
        注意(关于 compression_ratio 的计算):
            - 这里的示例实现并不严格:original_length 来自“压缩后的片段求和”,与 context_text 仅差换行符。
              因此比值接近 1,并不能真实反映“压缩前后”的差异。
            - 若要得到真实压缩比,应在 retrieve_and_compress 中同时返回“未压缩前的原始总长度”,
              或者在检索后先缓存原始文档长度,再与压缩后长度比较。
        """
        # 检索并压缩上下文
        compressed_context = self.retrieve_and_compress(query)
        
        if not compressed_context:
            return {
                "answer": "抱歉,没有找到相关信息。",
                "context_used": [],
                "compression_ratio": 0
            }
        
        # 构建最终提示词
        context_text = "\n".join(compressed_context)
        prompt = f"""
        基于以下上下文信息回答用户问题。请确保答案准确、简洁。

        上下文:
        {context_text}

        问题:{query}

        答案:
        """
        
        # 生成答案
        answer = self.generator.generate(prompt)
        
        # 计算压缩比(简化计算)
        original_length = sum(len(doc) for doc in compressed_context)
        compressed_length = len(context_text)
        compression_ratio = compressed_length / max(original_length, 1)
        
        return {
            "answer": answer,
            "context_used": compressed_context,
            "compression_ratio": compression_ratio,
            "context_length": len(context_text)
        }

# 使用示例
def example_usage():
    """
    使用示例
    """
    # 假设已有检索器和生成器
    # retriever = YourRetriever()
    # generator = YourGenerator()
    
    # 创建压缩RAG系统
    # rag_system = ContextCompressionRAG(
    #     retriever=retriever,
    #     generator=generator,
    #     compression_strategy=CompressionStrategy.HYBRID
    # )
    
    # 查询示例
    # query = "什么是上下文压缩技术?"
    # result = rag_system.generate_answer(query)
    
    # print(f"答案:{result['answer']}")
    # print(f"上下文长度:{result['context_length']}")
    # print(f"压缩比:{result['compression_ratio']:.2f}")
    
    pass

性能评估与对比分析

评估指标设计

为了客观评估不同压缩策略的效果,我们设计了多维度的评估框架:

import time
from typing import List, Dict, Tuple
import matplotlib.pyplot as plt
import pandas as pd

class CompressionEvaluator:
    def __init__(self):
        """压缩策略评估器"""
        self.results = []
    
    def evaluate_compression_quality(self, 
                                   query: str,
                                   original_docs: List[str],
                                   compressed_docs: List[str],
                                   ground_truth: str = None) -> Dict[str, float]:
        """
        评估压缩质量
        
        Args:
            query: 原始查询
            original_docs: 原始文档列表
            compressed_docs: 压缩后文档列表
            ground_truth: 标准答案(可选)
            
        Returns:
            评估指标字典
        """
        metrics = {}
        
        # 1. 压缩比计算
        original_length = sum(len(doc) for doc in original_docs)
        compressed_length = sum(len(doc) for doc in compressed_docs)
        compression_ratio = compressed_length / max(original_length, 1)
        metrics['compression_ratio'] = compression_ratio
        
        # 2. 信息保留率(基于关键词)
        query_keywords = set(query.lower().split())
        
        original_keywords = set()
        for doc in original_docs:
            original_keywords.update(doc.lower().split())
        
        compressed_keywords = set()
        for doc in compressed_docs:
            compressed_keywords.update(doc.lower().split())
        
        # 计算与查询相关的关键词保留率
        query_related_original = original_keywords.intersection(query_keywords)
        query_related_compressed = compressed_keywords.intersection(query_keywords)
        
        if query_related_original:
            keyword_retention = len(query_related_compressed) / len(query_related_original)
        else:
            keyword_retention = 0
        
        metrics['keyword_retention'] = keyword_retention
        
        # 3. 语义相似度保留(使用预训练模型)
        try:
            from sentence_transformers import SentenceTransformer
            model = SentenceTransformer('all-MiniLM-L6-v2')
            
            original_embeddings = model.encode([' '.join(original_docs)])
            compressed_embeddings = model.encode([' '.join(compressed_docs)])
            
            semantic_similarity = np.dot(original_embeddings, compressed_embeddings.T)[0][0]
            metrics['semantic_similarity'] = float(semantic_similarity)
        except:
            metrics['semantic_similarity'] = 0.0
        
        return metrics
    
    def benchmark_strategies(self, 
                           test_queries: List[str],
                           test_documents: List[List[str]]) -> pd.DataFrame:
        """
        对比不同压缩策略的性能
        
        Args:
            test_queries: 测试查询列表
            test_documents: 对应的文档列表
            
        Returns:
            性能对比结果DataFrame
        """
        strategies = {
            'Filter': filter_based_compression,
            'Summary': summary_based_compression,
            'Precision': precision_based_compression
        }
        
        results = []
        
        for i, (query, docs) in enumerate(zip(test_queries, test_documents)):
            for strategy_name, strategy_func in strategies.items():
                start_time = time.time()
                
                try:
                    # 执行压缩
                    compressed = strategy_func(query, docs)
                    processing_time = time.time() - start_time
                    
                    # 评估质量
                    metrics = self.evaluate_compression_quality(query, docs, compressed)
                    
                    # 记录结果
                    result = {
                        'query_id': i,
                        'strategy': strategy_name,
                        'processing_time': processing_time,
                        'original_length': sum(len(doc) for doc in docs),
                        'compressed_length': sum(len(doc) for doc in compressed),
                        **metrics
                    }
                    results.append(result)
                    
                except Exception as e:
                    print(f"策略 {strategy_name} 处理查询 {i} 时出错: {e}")
                    continue
        
        return pd.DataFrame(results)
    
    def generate_comparison_report(self, results_df: pd.DataFrame) -> str:
        """
        生成对比分析报告
        """
        report = []
        report.append("# 压缩策略性能对比报告\n")
        
        # 按策略分组统计
        strategy_stats = results_df.groupby('strategy').agg({
            'compression_ratio': ['mean', 'std'],
            'keyword_retention': ['mean', 'std'],
            'semantic_similarity': ['mean', 'std'],
            'processing_time': ['mean', 'std']
        }).round(4)
        
        report.append("## 各策略平均性能指标\n")
        report.append(strategy_stats.to_string())
        report.append("\n")
        
        # 最优策略分析
        best_compression = results_df.loc[results_df['compression_ratio'].idxmin(), 'strategy']
        best_retention = results_df.loc[results_df['keyword_retention'].idxmax(), 'strategy']
        best_semantic = results_df.loc[results_df['semantic_similarity'].idxmax(), 'strategy']
        best_speed = results_df.loc[results_df['processing_time'].idxmin(), 'strategy']
        
        report.append("## 最优策略分析\n")
        report.append(f"- 最高压缩率: {best_compression}\n")
        report.append(f"- 最佳信息保留: {best_retention}\n")
        report.append(f"- 最佳语义保持: {best_semantic}\n")
        report.append(f"- 最快处理速度: {best_speed}\n")
        
        return "\n".join(report)

# 实际测试示例
def run_compression_benchmark():
    """
    运行压缩策略基准测试
    """
    evaluator = CompressionEvaluator()
    
    # 准备测试数据
    test_queries = [
        "什么是机器学习?",
        "深度学习的应用领域有哪些?",
        "如何优化神经网络的性能?",
        "自然语言处理的主要技术是什么?",
        "推荐系统的工作原理是什么?"
    ]
    
    test_documents = [
        [
            "机器学习是人工智能的一个分支,它使计算机能够在没有明确编程的情况下学习。机器学习算法构建数学模型,基于训练数据进行预测或决策。机器学习被广泛应用于各个领域,包括图像识别、语音识别、推荐系统等。常见的机器学习类型包括监督学习、无监督学习和强化学习。",
            "在当今数字化时代,数据是新的石油。机器学习作为数据科学的核心技术,能够从大量数据中发现模式和规律。它不仅改变了我们处理信息的方式,也为商业智能、医疗诊断、金融风险评估等领域带来了革命性的变化。"
        ],
        [
            "深度学习是机器学习的一个子领域,它基于人工神经网络,特别是深度神经网络。深度学习在计算机视觉领域表现出色,能够进行图像分类、目标检测和人脸识别。在自然语言处理方面,深度学习推动了机器翻译、文本生成和情感分析的发展。此外,深度学习还在语音识别、游戏AI、自动驾驶等领域有着广泛应用。",
            "随着GPU计算能力的提升和大数据的普及,深度学习迎来了黄金时代。从AlexNet到Transformer,从卷积神经网络到注意力机制,深度学习的发展日新月异。"
        ],
        [
            "神经网络性能优化是一个复杂的过程,涉及多个方面。首先是网络架构的优化,包括选择合适的层数、神经元数量和激活函数。其次是训练过程的优化,如学习率调整、批次大小选择、正则化技术应用。数据预处理也很重要,包括归一化、数据增强和特征工程。此外,硬件优化如GPU并行计算、模型压缩和量化也能显著提升性能。",
            "现代深度学习框架如TensorFlow和PyTorch提供了许多优化工具。自动混合精度训练可以加速训练过程,梯度累积技术能够处理更大的批次,而分布式训练则能利用多台机器的计算资源。"
        ],
        [
            "自然语言处理(NLP)是人工智能的重要分支,主要技术包括词汇分析、句法分析、语义分析和语用分析。词汇分析涉及分词、词性标注和命名实体识别。句法分析处理语法结构和依存关系。语义分析关注词义消歧和语义角色标注。现代NLP还广泛使用词向量、循环神经网络和Transformer等技术。",
            "近年来,预训练语言模型如BERT、GPT等在NLP领域取得了突破性进展。这些模型通过在大规模语料库上进行预训练,学习到了丰富的语言表示,然后可以在各种下游任务上进行微调。"
        ],
        [
            "推荐系统是信息过滤系统,旨在向用户推荐他们可能感兴趣的项目。主要工作原理包括协同过滤、基于内容的推荐和混合推荐。协同过滤基于用户行为数据,分为用户协同过滤和物品协同过滤。基于内容的推荐分析项目特征和用户偏好。现代推荐系统还融合了深度学习技术,如深度协同过滤、神经协同过滤等。",
            "推荐系统面临的主要挑战包括数据稀疏性、冷启动问题和推荐多样性。为了解决这些问题,研究者们提出了各种创新方法,如矩阵分解、因子分解机、图神经网络等。"
        ]
    ]
    
    # 运行基准测试
    results = evaluator.benchmark_strategies(test_queries, test_documents)
    
    # 生成报告
    report = evaluator.generate_comparison_report(results)
    print(report)
    
    return results

# 可视化分析工具
def visualize_compression_results(results_df: pd.DataFrame):
    """
    可视化压缩结果
    """
    import matplotlib.pyplot as plt
    import seaborn as sns
    
    # 设置中文字体
    plt.rcParams['font.sans-serif'] = ['SimHei']
    plt.rcParams['axes.unicode_minus'] = False
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # 1. 压缩比对比
    ax1 = axes[0, 0]
    compression_data = results_df.groupby('strategy')['compression_ratio'].mean()
    compression_data.plot(kind='bar', ax=ax1, color=['skyblue', 'lightgreen', 'orange'])
    ax1.set_title('各策略平均压缩比')
    ax1.set_ylabel('压缩比')
    ax1.tick_params(axis='x', rotation=45)
    
    # 2. 关键词保留率对比
    ax2 = axes[0, 1]
    retention_data = results_df.groupby('strategy')['keyword_retention'].mean()
    retention_data.plot(kind='bar', ax=ax2, color=['coral', 'lightblue', 'gold'])
    ax2.set_title('各策略关键词保留率')
    ax2.set_ylabel('保留率')
    ax2.tick_params(axis='x', rotation=45)
    
    # 3. 处理时间对比
    ax3 = axes[1, 0]
    time_data = results_df.groupby('strategy')['processing_time'].mean()
    time_data.plot(kind='bar', ax=ax3, color=['lightpink', 'lightcyan', 'wheat'])
    ax3.set_title('各策略平均处理时间')
    ax3.set_ylabel('时间(秒)')
    ax3.tick_params(axis='x', rotation=45)
    
    # 4. 语义相似度对比
    ax4 = axes[1, 1]
    semantic_data = results_df.groupby('strategy')['semantic_similarity'].mean()
    semantic_data.plot(kind='bar', ax=ax4, color=['plum', 'palegreen', 'peachpuff'])
    ax4.set_title('各策略语义相似度保持')
    ax4.set_ylabel('相似度')
    ax4.tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    return fig

实际应用场景与案例分析

场景一:技术文档问答系统

在企业技术文档问答系统中,文档通常包含大量的背景信息、详细说明和示例代码。用户的查询往往只关注特定的技术细节。

class TechDocumentRAG(ContextCompressionRAG):
    """
    专门针对技术文档的RAG系统
    """
    def __init__(self, retriever, generator):
        super().__init__(retriever, generator, CompressionStrategy.PRECISION)
        
        # 技术文档特有的提取模式
        self.tech_patterns = {
            'api_pattern': r'(API|接口|方法)[::]([^。\n]+)',
            'param_pattern': r'参数[::]([^。\n]+)',
            'example_pattern': r'(示例|例子)[::]([^。\n]+)',
            'error_pattern': r'(错误|异常|Error)[::]([^。\n]+)'
        }
    
    def extract_technical_info(self, query: str, document: str) -> str:
        """
        提取技术文档中的关键信息
        """
        extracted_parts = []
        
        # 基于查询类型进行针对性提取
        if any(keyword in query.lower() for keyword in ['api', '接口', '方法']):
            # 提取API相关信息
            api_matches = re.findall(self.tech_patterns['api_pattern'], document)
            if api_matches:
                extracted_parts.extend([f"{match[0]}{match[1]}" for match in api_matches])
        
        if any(keyword in query.lower() for keyword in ['参数', 'parameter', 'param']):
            # 提取参数信息
            param_matches = re.findall(self.tech_patterns['param_pattern'], document)
            if param_matches:
                extracted_parts.extend([f"参数:{match}" for match in param_matches])
        
        if any(keyword in query.lower() for keyword in ['示例', '例子', 'example']):
            # 提取示例信息
            example_matches = re.findall(self.tech_patterns['example_pattern'], document)
            if example_matches:
                extracted_parts.extend([f"{match[0]}{match[1]}" for match in example_matches])
        
        return " ".join(extracted_parts) if extracted_parts else ""

# 使用示例
tech_rag = TechDocumentRAG(retriever=None, generator=None)  # 实际使用时需要传入真实的retriever和generator

场景二:法律文档分析系统

法律文档具有结构化程度高、条款明确的特点,适合使用精准信息提取策略:

class LegalDocumentCompressor:
    """
    法律文档压缩器
    """
    def __init__(self):
        self.legal_patterns = {
            'article_pattern': r'第([一二三四五六七八九十\d]+)条[::]([^第]+)(?=第|$)',
            'clause_pattern': r'[((]([一二三四五六七八九十\d]+)[))]([^((]+)',
            'definition_pattern': r'([^,。]+)是指([^。]+)',
            'penalty_pattern': r'(罚款|处罚|刑期|有期徒刑)([^。]+)',
        }
    
    def extract_legal_provisions(self, query: str, document: str) -> List[str]:
        """
        提取相关法律条款
        """
        extracted_provisions = []
        
        # 提取条文
        articles = re.findall(self.legal_patterns['article_pattern'], document)
        for article_num, article_content in articles:
            if any(keyword in article_content for keyword in query.split()):
                extracted_provisions.append(f"第{article_num}条:{article_content.strip()}")
        
        # 提取定义
        definitions = re.findall(self.legal_patterns['definition_pattern'], document)
        for term, definition in definitions:
            if term.strip() in query or any(keyword in definition for keyword in query.split()):
                extracted_provisions.append(f"{term.strip()}的定义:{definition}")
        
        return extracted_provisions

场景三:医疗知识库系统

医疗领域对准确性要求极高,需要在保证信息完整性的前提下进行压缩:

class MedicalDocumentCompressor:
    """
    医疗文档压缩器
    """
    def __init__(self):
        self.medical_keywords = {
            'symptoms': ['症状', '表现', '临床表现', 'symptom'],
            'diagnosis': ['诊断', '确诊', 'diagnosis'],
            'treatment': ['治疗', '疗法', 'treatment'],
            'medication': ['药物', '用药', 'medication'],
            'prevention': ['预防', 'prevention']
        }
    
    def safe_medical_compression(self, query: str, document: str) -> str:
        """
        安全的医疗文档压缩,确保关键信息不丢失
        """
        # 识别查询类型
        query_type = self.identify_medical_query_type(query)
        
        # 基于查询类型提取相关段落
        relevant_paragraphs = []
        paragraphs = document.split('\n\n')
        
        for paragraph in paragraphs:
            if query_type and any(keyword in paragraph for keyword in self.medical_keywords[query_type]):
                relevant_paragraphs.append(paragraph)
            elif any(word in paragraph.lower() for word in query.lower().split()):
                relevant_paragraphs.append(paragraph)
        
        return '\n'.join(relevant_paragraphs) if relevant_paragraphs else document[:500]  # 保底策略
    
    def identify_medical_query_type(self, query: str) -> str:
        """
        识别医疗查询类型
        """
        query_lower = query.lower()
        for category, keywords in self.medical_keywords.items():
            if any(keyword in query_lower for keyword in keywords):
                return category
        return None

写在最后

在这里插入图片描述

上下文压缩技术作为RAG系统的重要优化手段,通过"少即是多"的理念有效解决了上下文稀释问题。我们详细分析了三种主要的压缩策略:

  1. 信息过滤策略:计算效率高,适合大规模部署,但可能丢失上下文相关信息。

  2. 生成摘要策略:能够保持语义连贯性,适合复杂文档的概括,但计算成本较高。

  3. 精准信息提取策略:准确性最高,适合结构化文档,但实现复杂度较大。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐