一、前言

        现在我们做AI应用、大模型接口开发,基本都会碰到两个特别头疼的问题:一个是响应太慢,用户点一下要等好几秒,体验特别差;另一个就是调用成本太高,同样的问题反复问,每次都要重新调用大模型,Token哗啦哗啦的像流水一样就这么消耗了。尤其在客服问答、智能助手、内容生成、知识库查询这些真实业务里,大量用户的问题其实是重复、相似的,完全没必要每次都让大模型从头计算。

        缓存这个早就被验证过的经典思路,放到大模型场景里,就特别对症,把已经生成过的回答存起来,下次同样或类似的请求过来,直接从缓存里取,不用再麻烦大模型。既能秒级响应,又能大幅省成本,还能减轻大模型服务压力,提升系统稳定性。在实际应用落地过程中,缓存就是最直接、最有效、性价比最高的优化手段。我们前期通过《大模型数据缓存复用方案:从API请求数据累积到智能融合.50》讲解过基于向量化结合本地数据库的实现方案,今天我们换个思路,基于缓存装饰器的角度来实现。

二、缓存技术基础

1. 缓存的本质与价值

        缓存(Cache)本质上是一种数据暂存机制,将高频访问的数据存储在读写速度更快的介质中,从而减少对低速数据源的重复访问。想象一下:我们每天上班都会路过便利店买早餐,如果每次都要重新确认早餐价格、口味,会浪费大量时间;但如果你把常用的早餐信息记在脑子里,相当于缓存,就能快速决策。这就是缓存的核心价值:用空间换时间。

在大模型应用场景中,缓存的价值体现在三个维度:

  • 响应速度:大模型生成文本通常需要数百毫秒甚至数秒,缓存命中可将响应时间降至微秒级
  • 成本控制:大模型 API 调用按 token 计费,重复请求相同 prompt 会产生不必要的费用
  • 系统稳定性:减少对大模型服务的请求量,降低接口限流、服务熔断的风险

2. 大模型缓存的特殊要求

与普通缓存不同,大模型缓存需要满足以下特性:

  • TTL(生存时间):大模型的回答存在时效性问题,可能随时间失效,需设置过期时间
  • 键值设计:prompt 通常包含大量文本,需生成唯一且高效的缓存键
  • 线程安全:生产环境中多线程调用需保证缓存操作的原子性
  • 缓存失效策略:需处理缓存击穿、缓存雪崩等常见问题

3. 缓存在大模型系统中的位置

以下流程图反映了缓存在大模型系统中的核心位置,用户请求首先经过缓存层,命中则直接返回,未命中才调用大模型并更新缓存;

大模型缓存命中率随请求次数变化:

三、缓存装饰器基础

1. 装饰器基础原理

装饰器是 Python 的核心特性之一,用于在不修改函数源代码的前提下增强函数功能。其核心原理基于:

1.1 函数是一等对象

  • 在Python中,函数被视为“一等公民”。这意味着函数可以像整数或字符串一样被赋值给变量、作为参数传递给其他函数,甚至作为返回值从函数中返回。
  • 这种特性是构建高阶函数(如map、filter)和实现回调机制的基础,极大提升了代码的灵活性与复用性。

1.2 闭包

  • 闭包是指嵌套函数中,内部函数引用了外部函数的局部变量,即使外部函数已执行完毕,这些变量仍被内部函数“封闭”保存。
  • 闭包让函数拥有了“记忆”状态的能力,常用于实现装饰器、工厂函数或数据封装,无需使用类即可创建带有私有状态的 callable 对象。

1.3 functools.wraps

  • functools.wraps 是一个装饰器,用于在编写自定义装饰器时,将被包装函数的元数据(如 __name__、__doc__、__module__ 等)复制到包装函数上。
  • 若不使用它,调试或被装饰函数在查看帮助文档时会显示包装器的信息,导致混淆。它是编写规范、透明装饰器的必备工具。

基础装饰器结构:

import functools

def my_decorator(func):
    @functools.wraps(func)  # 保留原函数信息
    def wrapper(*args, **kwargs):
        # 执行前增强逻辑
        result = func(*args, **kwargs)
        # 执行后增强逻辑
        return result
    return wrapper

# 使用装饰器
@my_decorator
def say_hello(name):
    return f"Hello, {name}!"

2. 基础缓存装饰器实现

我们先实现一个最简单的无过期时间的缓存装饰器,理解核心逻辑:

import functools

def simple_cache(func):
    """基础缓存装饰器(无过期时间)"""
    # 缓存存储:键=参数组合,值=函数返回结果
    cache_storage = {}
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # 1. 生成缓存键:将参数转换为字符串作为唯一标识
        cache_key = (args, frozenset(kwargs.items()))  # 使用不可变类型作为键
        
        # 2. 检查缓存是否存在
        if cache_key in cache_storage:
            print(f"✅ 缓存命中:{cache_key}")
            return cache_storage[cache_key]
        
        # 3. 缓存未命中,执行原函数
        print(f"❌ 缓存未命中,执行函数:{cache_key}")
        result = func(*args, **kwargs)
        
        # 4. 将结果存入缓存
        cache_storage[cache_key] = result
        
        return result
    return wrapper

# 测试基础缓存
@simple_cache
def calculate_sum(a, b):
    """简单的求和函数,模拟耗时操作"""
    # 模拟计算耗时
    import time
    time.sleep(1)
    return a + b

# 第一次调用(缓存未命中)
print(calculate_sum(1, 2))  
# 第二次调用(缓存命中)
print(calculate_sum(1, 2))  
# 不同参数调用(缓存未命中)
print(calculate_sum(2, 3))  

代码关键说明:

  • cache_storage:字典结构,用于存储缓存数据,生命周期与装饰器绑定
  • cache_key:使用(args, frozenset(kwargs.items()))生成键,避免 kwargs 顺序影响,如func(a=1,b=2)和func(b=2,a=1)应视为同一键
  • functools.wraps:确保被装饰函数的__name__、__doc__等元信息不丢失

输出结果:

❌ 缓存未命中,执行函数:((1, 2), frozenset())
3
✅ 缓存命中:((1, 2), frozenset())
3
❌ 缓存未命中,执行函数:((2, 3), frozenset())
5

装饰器缓存机制的工作过程:

================== 三次调用的执行情况:==================

第一次调用 calculate_sum(1, 2)

  • 缓存中没有这个参数组合,所以显示 ❌ 缓存未命中
  • 执行函数计算(等待1秒),得到结果 3
  • 将结果存入缓存

第二次调用 calculate_sum(1, 2)

  • 缓存中已有 (1, 2) 的结果,显示 ✅ 缓存命中
  • 直接从缓存返回 3,无需重新计算(立即返回)

第三次调用 calculate_sum(2, 3)

  • 参数不同,缓存中没有这个组合
  • 显示 ❌ 缓存未命中,执行计算得到 5
  • 将新结果存入缓存

3. 增加 TTL 过期机制

基础缓存没有过期时间,无法满足大模型场景的时效性要求。我们在基础版本上增加 TTL(Time-To-Live)特性:

import functools
import time

def ttl_cache(ttl=3600):
    """带过期时间的缓存装饰器
    
    Args:
        ttl: 缓存生存时间(秒),默认1小时
    """
    cache_storage = {}
    
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 1. 生成稳定的缓存键
            args_key = tuple(args)
            kwargs_key = frozenset(kwargs.items())
            cache_key = (args_key, kwargs_key)
            
            # 2. 检查缓存是否有效
            current_time = time.time()
            if cache_key in cache_storage:
                cached_time, cached_result = cache_storage[cache_key]
                # 判断是否过期
                if current_time - cached_time < ttl:
                    print(f"✅ 缓存命中(剩余有效期:{ttl - (current_time - cached_time):.1f}秒)")
                    return cached_result
                else:
                    print(f"⏰ 缓存过期,清除旧数据")
                    del cache_storage[cache_key]
            
            # 3. 执行原函数
            print(f"❌ 缓存未命中,执行函数")
            result = func(*args, **kwargs)
            
            # 4. 存储缓存(记录时间戳+结果)
            cache_storage[cache_key] = (current_time, result)
            
            return result
        return wrapper
    return decorator

# 测试TTL缓存
@ttl_cache(ttl=5)  # 缓存有效期5秒
def generate_greeting(name):
    """模拟生成个性化问候语的大模型函数"""
    time.sleep(1)  # 模拟大模型处理耗时
    from datetime import datetime
    now = datetime.now()
    return f"你好 {name}! 今天是 {now.strftime('%Y-%m-%d %H:%M:%S')}"

# 第一次调用(未命中)
print(generate_greeting("小王"))
# 立即第二次调用(命中)
print(generate_greeting("小王"))
# 等待6秒后调用(过期)
time.sleep(6)
print(generate_greeting("小王"))

代码关键说明:

  • ttl参数:通过外层函数传递,实现缓存过期时间的自定义
  • 缓存值结构:从单纯存储结果改为存储(时间戳, 结果)元组
  • 过期判断:current_time - cached_time < ttl 检查缓存是否在有效期内
  • 过期清理:主动删除过期缓存,释放内存空间

输出结果:

你好 小王! 今天是 2026-03-09 21:51:45
✅ 缓存命中(剩余有效期:4.0秒)
你好 小王! 今天是 2026-03-09 21:51:45
⏰ 缓存过期,清除旧数据
❌ 缓存未命中,执行函数
你好 小王! 今天是 2026-03-09 21:51:52

四、大模型专用缓存装饰器实现

        在基础缓存装饰器我们实现了基础的 TTL 缓存功能,并使用functools.wraps保留函数元信息,支持参数化 TTL 配置,但还是存在一些问题:

  • 1. 缓存键生成方式不健壮:str(args) + str(kwargs) 可能产生重复键,如args=(1,2)和args=(12,)
  • 2. 无线程安全保护:多线程环境下可能出现竞态条件
  • 3. 无缓存容量限制:可能导致内存泄漏
  • 4. 无异常处理:原函数报错时也会缓存错误结果
  • 5. 缓存键未做哈希优化:长 prompt 会导致键过长

基与以上的分析优化,我们实现一个更健壮的大模型专用缓存装饰器:

import functools
import time
import hashlib
import threading
from typing import Any, Callable, Optional

class LLMResponseCache:
    """大模型响应缓存类(生产级实现)"""
    
    def __init__(self, default_ttl: int = 3600, max_size: int = 1000):
        """
        初始化缓存
        
        Args:
            default_ttl: 默认缓存有效期(秒)
            max_size: 缓存最大容量,防止内存溢出
        """
        # 核心缓存存储
        self._cache = {}
        # 缓存访问时间(用于LRU淘汰)
        self._access_times = {}
        # 默认TTL
        self._default_ttl = default_ttl
        # 缓存最大容量
        self._max_size = max_size
        # 线程锁(保证线程安全)
        self._lock = threading.Lock()
    
    def _generate_key(self, args: tuple, kwargs: dict) -> str:
        """生成稳定且高效的缓存键
        
        步骤:
        1. 将参数转换为有序的字符串
        2. 使用MD5哈希生成固定长度的键
        """
        # 处理位置参数
        args_str = "|".join(str(arg) for arg in args)
        # 处理关键字参数(按key排序保证顺序一致)
        kwargs_str = "|".join(f"{k}={v}" for k, v in sorted(kwargs.items()))
        # 组合并哈希
        raw_key = f"{args_str}||{kwargs_str}"
        # 使用MD5生成16进制哈希值(固定长度)
        return hashlib.md5(raw_key.encode('utf-8')).hexdigest()
    
    def _clean_expired(self) -> None:
        """清理所有过期的缓存项(需要在已获取锁的情况下调用)"""
        current_time = time.time()
        expired_keys = []
        
        for key, (timestamp, _) in self._cache.items():
            if current_time - timestamp > self._default_ttl:
                expired_keys.append(key)
        
        # 删除过期项
        for key in expired_keys:
            del self._cache[key]
            if key in self._access_times:
                del self._access_times[key]
    
    def _evict_lru(self) -> None:
        """LRU(最近最少使用)淘汰策略,当缓存满时删除最久未使用的项(需要在已获取锁的情况下调用)"""
        if len(self._cache) <= self._max_size:
            return
        
        if self._access_times:
            # 找到访问时间最早的键
            oldest_key = min(self._access_times.items(), key=lambda x: x[1])[0]
            # 删除最久未使用的项
            del self._cache[oldest_key]
            del self._access_times[oldest_key]
    
    def get(self, args: tuple, kwargs: dict) -> Optional[Any]:
        """获取缓存值(如果存在且有效)"""
        key = self._generate_key(args, kwargs)
        current_time = time.time()
        
        with self._lock:
            # 检查键是否存在
            if key not in self._cache:
                return None
            
            # 检查是否过期
            timestamp, result = self._cache[key]
            if current_time - timestamp > self._default_ttl:
                del self._cache[key]
                if key in self._access_times:
                    del self._access_times[key]
                return None
            
            # 更新访问时间(用于LRU)
            self._access_times[key] = current_time
            return result
    
    def set(self, args: tuple, kwargs: dict, result: Any) -> None:
        """设置缓存值"""
        key = self._generate_key(args, kwargs)
        current_time = time.time()
        
        with self._lock:
            # 清理过期项
            self._clean_expired()
            # 检查容量,必要时淘汰
            self._evict_lru()
            # 设置缓存
            self._cache[key] = (current_time, result)
            self._access_times[key] = current_time
    
    def clear(self) -> None:
        """清空所有缓存"""
        with self._lock:
            self._cache.clear()
            self._access_times.clear()

# 定义缓存装饰器工厂函数
def cache_llm_response(ttl: int = 3600, max_size: int = 1000):
    """大模型响应缓存装饰器(生产级)
    
    Args:
        ttl: 缓存有效期(秒)
        max_size: 缓存最大容量
    """
    # 创建缓存实例
    cache = LLMResponseCache(default_ttl=ttl, max_size=max_size)
    
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 尝试获取缓存
            cached_result = cache.get(args, kwargs)
            if cached_result is not None:
                print(f"✅ 缓存命中,直接返回结果")
                return cached_result
            
            # 缓存未命中,执行原函数(捕获异常避免缓存错误)
            try:
                print(f"❌ 缓存未命中,调用大模型")
                result = func(*args, **kwargs)
            except Exception as e:
                print(f"❌ 函数执行出错:{e}")
                raise  # 抛出异常,不缓存错误结果
            
            # 存入缓存
            cache.set(args, kwargs, result)
            return result
        
        # 为包装函数添加清理缓存的方法
        wrapper.clear_cache = cache.clear
        return wrapper
    
    return decorator

# ===================== 测试代码 =====================
from datetime import datetime

# 模拟大模型生成函数
def generate_text(prompt: str) -> str:
    """模拟大模型文本生成(带耗时)"""
    time.sleep(0.5)  # 模拟大模型处理耗时
    now = datetime.now()
    return f"大模型响应:{prompt} - {now.strftime('%Y-%m-%d %H:%M:%S')}"

# 使用缓存装饰器
@cache_llm_response(ttl=10, max_size=5)
def generate_text_with_cache(prompt: str) -> str:
    return generate_text(prompt)

# 测试流程
if __name__ == "__main__":
    # 第一次调用(未命中)
    print("=" * 50)
    print("测试1: 第一次调用(预期:缓存未命中)")
    print("=" * 50)
    print(generate_text_with_cache("你好"))
    print()
    
    # 第二次调用(命中)
    print("=" * 50)
    print("测试2: 第二次调用相同内容(预期:缓存命中)")
    print("=" * 50)
    print(generate_text_with_cache("你好"))
    print()
    
    # 调用不同prompt(未命中)
    print("=" * 50)
    print("测试3: 调用不同内容(预期:缓存未命中)")
    print("=" * 50)
    print(generate_text_with_cache("今天天气怎么样"))
    print()
    
    # 清空缓存
    print("=" * 50)
    print("测试4: 清空缓存后再次调用(预期:缓存未命中)")
    print("=" * 50)
    generate_text_with_cache.clear_cache()
    print(generate_text_with_cache("你好"))

核心优化说明:

- 1. 缓存键优化

  • 问题:原始代码使用str(args) + str(kwargs)生成键,存在两个问题:
    • 1. 键长度不可控,长 prompt 会导致键过长)
    • 2. 易产生冲突,如args=(1,2)和args=(12,)的字符串拼接结果可能相似
  • 解决方案:使用 MD5 哈希生成固定长度(32 位)的键,同时对 kwargs 排序保证顺序一致性

- 2. 线程安全保障

  • 问题:多线程环境下,同时读写缓存可能导致:
    • 1. 缓存数据损坏
    • 2. 重复调用大模型,缓存被击穿
  • 解决方案:使用threading.Lock()实现互斥锁,确保缓存操作的原子性

- 3. 缓存淘汰策略

  • LRU(最近最少使用):当缓存达到最大容量时,删除最久未使用的项,避免内存溢出
  • 主动过期清理:定期清理过期缓存,释放内存空间

- 4. 异常处理

  • 问题:如果大模型因网络错误调用失败,原始代码会缓存异常结果
  • 解决方案:使用 try-except 捕获异常,仅缓存成功的响应结果

输出结果:

==================================================
测试1: 第一次调用(预期:缓存未命中)
==================================================
❌ 缓存未命中,调用大模型
大模型响应:你好 - 2026-03-09 22:18:06

==================================================
测试2: 第二次调用相同内容(预期:缓存命中)
==================================================
✅ 缓存命中,直接返回结果
大模型响应:你好 - 2026-03-09 22:18:06

==================================================
测试3: 调用不同内容(预期:缓存未命中)
==================================================
❌ 缓存未命中,调用大模型
大模型响应:今天天气怎么样 - 2026-03-09 22:18:07

==================================================
测试4: 清空缓存后再次调用(预期:缓存未命中)
==================================================
❌ 缓存未命中,调用大模型
大模型响应:你好 - 2026-03-09 22:18:07

五、缓存装饰器执行流程

我们以上示例中的generate_text_with_cache("你好")调用过程为例,详细解析每一步执行过程:

步骤 1:装饰器初始化

@cache_llm_response(ttl=10, max_size=5)
def generate_text_with_cache(prompt: str) -> str:
    return generate_text(prompt)
  • 执行cache_llm_response(ttl=10, max_size=5),创建LLMResponseCache实例
  • 返回decorator函数,装饰generate_text_with_cache
  • generate_text_with_cache被替换为wrapper函数

步骤 2:第一次调用(缓存未命中)

  • 1. 调用generate_text_with_cache("你好"),实际执行wrapper("你好")
  • 2. wrapper调用cache.get(("你好",), {})生成缓存键:
    • args_str = "你好"
    • kwargs_str = ""
    • raw_key = "你好 ||"
    • MD5 哈希后得到固定长度的键,如:5eb63bbbe01eeed093cb22bb8f5acdc3
  • 3. 检查缓存中无此键,返回None
  • 4. 执行原函数generate_text("你好"),模拟耗时 0.5 秒后返回结果
  • 5. 调用cache.set(("你好",), {}, 结果):
    • 清理过期缓存(此时为空)
    • 检查缓存容量(未满)
    • 存储缓存项:{键: (时间戳, 结果)}
    • 记录访问时间:{键: 当前时间}
  • 6. 返回大模型响应结果

步骤 3:第二次调用(缓存命中)

  • 1. 再次调用generate_text_with_cache("你好")
  • 2. 生成相同的缓存键
  • 3. 检查缓存存在且未过期
  • 4. 更新访问时间,用于 LRU
  • 5. 直接返回缓存结果,无耗时

步骤 4:缓存过期后调用

  • 1. 等待 10 秒后调用generate_text_with_cache("你好")
  • 2. 生成相同的缓存键
  • 3. 检查缓存存在但已过期
  • 4. 删除过期缓存项
  • 5. 重新执行原函数并更新缓存

核心原理总结:

  • 闭包原理:装饰器通过闭包保存缓存实例,使其生命周期与被装饰函数一致
  • 哈希算法:将可变长度的 prompt 转换为固定长度的哈希值,提高缓存效率
  • 锁机制:保证多线程环境下缓存操作的原子性
  • TTL 机制:通过时间戳对比实现缓存自动过期
  • LRU 算法:基于访问时间的缓存淘汰策略,防止内存溢出

六、大模型缓存最佳实践

1. TTL 设置策略

  • 时效性内容(如新闻、天气):TTL 设置为 5-10 分钟
  • 通用知识内容(如常识、教程):TTL 设置为 1-24 小时
  • 静态内容(如固定模板):TTL 设置为 7 天或更长

2. 缓存键优化

  • 对超长 prompt 进行截断或哈希,避免键过长
  • 忽略无意义的参数,如请求 ID、trace ID
  • 对相似 prompt 进行归一化,如去除空格、大小写统一

3. 缓存失效策略

  • 主动失效:当大模型知识更新时,主动清空相关缓存
  • 被动失效:设置合理的 TTL,让缓存自然过期
  • 增量更新:只更新变化的缓存项,避免全量清空

七、总结

        今天结合缓存装饰器原理讲解大模型应用开发里的缓存技术,主要目的是为了解决大模型响应慢、调用贵这两个老大难问题。首先得明确,缓存的核心逻辑很简单,就是用空间换时间,把用户常问的、重复的请求结果存起来,下次再有人问一样的问题,直接从缓存里拿,不用再重新调用大模型,既省时间又省 Token 成本。

        基于Python 装饰器的基本原理,这是实现缓存的基础,然后从简单的无过期缓存,升级到带 TTL生存时间的缓存,解决了大模型回答时效性的问题,总的来说,缓存就是大模型应用落地的必备优化手段,操作不复杂,但能解决大问题,不管是单机还是分布式部署,都能用得上。

附录:完整代码落地实践

"""
大模型缓存装饰器 - 生产级实现
包含:TTL过期、线程安全、LRU淘汰、异常处理、哈希键生成
"""
import functools
import time
import hashlib
import threading
from typing import Any, Callable, Optional

class LLMResponseCache:
    """大模型响应缓存核心类"""
    
    def __init__(self, default_ttl: int = 3600, max_size: int = 1000):
        self._cache = {}
        self._access_times = {}
        self._default_ttl = default_ttl
        self._max_size = max_size
        self._lock = threading.Lock()
    
    def _generate_key(self, args: tuple, kwargs: dict) -> str:
        """生成稳定的哈希缓存键"""
        args_str = "|".join(str(arg) for arg in args)
        kwargs_str = "|".join(f"{k}={v}" for k, v in sorted(kwargs.items()))
        raw_key = f"{args_str}||{kwargs_str}"
        return hashlib.md5(raw_key.encode('utf-8')).hexdigest()
    
    def _clean_expired(self) -> None:
        """清理过期缓存(需要在已获取锁的情况下调用)"""
        current_time = time.time()
        expired_keys = []

        for key in self._cache:
            if current_time - self._cache[key][0] > self._default_ttl:
                expired_keys.append(key)

        for key in expired_keys:
            del self._cache[key]
            if key in self._access_times:
                del self._access_times[key]

    def _evict_lru(self) -> None:
        """LRU淘汰策略(需要在已获取锁的情况下调用)"""
        if len(self._cache) <= self._max_size:
            return

        if self._access_times:
            oldest_key = min(self._access_times.items(), key=lambda x: x[1])[0]
            del self._cache[oldest_key]
            del self._access_times[oldest_key]
    
    def get(self, args: tuple, kwargs: dict) -> Optional[Any]:
        """获取缓存值"""
        key = self._generate_key(args, kwargs)
        current_time = time.time()
        
        with self._lock:
            if key not in self._cache:
                return None
            
            timestamp, result = self._cache[key]
            if current_time - timestamp > self._default_ttl:
                del self._cache[key]
                if key in self._access_times:
                    del self._access_times[key]
                return None
            
            self._access_times[key] = current_time
            return result
    
    def set(self, args: tuple, kwargs: dict, result: Any) -> None:
        """设置缓存值"""
        key = self._generate_key(args, kwargs)
        current_time = time.time()
        
        with self._lock:
            self._clean_expired()
            self._evict_lru()
            self._cache[key] = (current_time, result)
            self._access_times[key] = current_time
    
    def clear(self) -> None:
        """清空缓存"""
        with self._lock:
            self._cache.clear()
            self._access_times.clear()
    
    def stats(self) -> dict:
        """获取缓存统计信息"""
        with self._lock:
            return {
                "total_items": len(self._cache),
                "max_size": self._max_size,
                "default_ttl": self._default_ttl,
                "cache_hit_rate": self._calculate_hit_rate()
            }
    
    def _calculate_hit_rate(self) -> float:
        """计算缓存命中率(简单实现)"""
        # 实际应用中需记录命中/未命中次数
        return 0.0

def cache_llm_response(ttl: int = 3600, max_size: int = 1000):
    """大模型缓存装饰器
    
    Args:
        ttl: 缓存有效期(秒)
        max_size: 缓存最大容量
    """
    cache = LLMResponseCache(default_ttl=ttl, max_size=max_size)
    
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 尝试获取缓存
            cached_result = cache.get(args, kwargs)
            if cached_result is not None:
                return cached_result
            
            # 执行原函数
            try:
                result = func(*args, **kwargs)
            except Exception as e:
                raise e  # 不缓存异常
            
            # 存入缓存
            cache.set(args, kwargs, result)
            return result
        
        # 暴露缓存操作方法
        wrapper.clear_cache = cache.clear
        wrapper.cache_stats = cache.stats
        return wrapper
    
    return decorator

# ===================== 测试与使用示例 =====================
from datetime import datetime

if __name__ == "__main__":
    # 模拟大模型函数
    def generate_text(prompt: str) -> str:
        """模拟大模型文本生成"""
        time.sleep(0.5)  # 模拟处理耗时
        now = datetime.now()
        return f"Response to: {prompt} (generated at {now.strftime('%Y-%m-%d %H:%M:%S')})"

    # 使用缓存装饰器
    @cache_llm_response(ttl=10, max_size=5)
    def generate_text_with_cache(prompt: str) -> str:
        return generate_text(prompt)

    # 测试流程
    print("=== 第一次调用(未命中) ===")
    start_time = time.time()
    print("执行函数:generate_text_with_cache('你好')")
    print(generate_text_with_cache("你好"))
    print(f"耗时:{time.time() - start_time:.2f}秒")

    print("\n=== 第二次调用(命中) ===")
    start_time = time.time()
    print("执行函数:generate_text_with_cache('你好')")
    print(generate_text_with_cache("你好"))
    print(f"耗时:{time.time() - start_time:.2f}秒")

    print("\n=== 缓存统计信息 ===")
    print("执行函数:generate_text_with_cache.cache_stats()")
    print(generate_text_with_cache.cache_stats())

    print("\n=== 清空缓存后调用(未命中) ===")
    print("执行函数:generate_text_with_cache.clear_cache()")
    generate_text_with_cache.clear_cache()
    start_time = time.time()
    print("执行函数:generate_text_with_cache('你好')")
    print(generate_text_with_cache("你好"))
    print(f"耗时:{time.time() - start_time:.2f}秒")

输出参考:

=== 第一次调用(未命中) ===
执行函数:generate_text_with_cache('你好')
Response to: 你好 (generated at 2026-03-09 23:11:38)
耗时:0.53秒

=== 第二次调用(命中) ===
执行函数:generate_text_with_cache('你好')
Response to: 你好 (generated at 2026-03-09 23:11:38)
耗时:0.00秒

=== 缓存统计信息 ===
执行函数:generate_text_with_cache.cache_stats()
{'total_items': 1, 'max_size': 5, 'default_ttl': 10, 'cache_hit_rate': 0.0}

=== 清空缓存后调用(未命中) ===
执行函数:generate_text_with_cache.clear_cache()
执行函数:generate_text_with_cache('你好')
Response to: 你好 (generated at 2026-03-09 23:11:39)
耗时:0.50秒

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐