dify 本地源码启动
Ollama 安装部署
dify 智能体实践

dify 源码分析(一)功能概述
dify 源码分析(二)源码结构
dify 源码分析(三)agent
dify 源码分析(四)tools
dify 源码分析(五)chatflow
dify 源码分析(六)event
dify 源码分析(七)ratelimiter

1. 平滑限流策略(RateLimiter )

1.1. 令牌桶算法 (Token Bucket)

  • python
import time
import threading
from typing import Optional

class TokenBucketRateLimiter:
    def __init__(self, capacity: int, fill_rate: float):
        """
        令牌桶限流器
        
        Args:
            capacity: 桶容量
            fill_rate: 每秒填充的令牌数
        """
        self.capacity           = float(capacity)
        self.available_tokens   = float(capacity)
        self.fill_rate          = fill_rate
        self.last_time          = time.time()
        self.lock               = threading.Lock()
    
    def consume(self, request_tokens: int = 1) -> bool:
        """
        消费令牌
        
        Args:
            tokens: 需要的令牌数
            
        Returns:
            bool: 是否成功获取令牌
        """
        with self.lock:
            current_now = time.time()
            # 计算时间间隔内应该添加的令牌
            # 计算时间间隔,单位秒
            elapsed = current_now - self.last_time
            # 令牌桶容量 和 计算的桶容量,取最小值
            self.available_tokens = min(self.capacity, self.available_tokens + elapsed*self.fill_rate)
            self.last_time = current_now
            # print(f"Available tokens:{self.available_tokens} request tokens:{request_tokens}")
            
            if self.available_tokens >= request_tokens:
                self.available_tokens -= request_tokens
                return True
            return False
    
    def wait_until_ready(self, tokens: int = 1):
        """等待直到有足够的令牌"""
        while not self.consume(tokens):
            time.sleep(0.01)  # 短暂等待后重试

if __name__ == "__main__":
    # 使用示例
    limiter = TokenBucketRateLimiter(capacity=10, fill_rate=2)  # 最大10个令牌,每秒补充2个

    # 测试
    for i in range(20):
        if limiter.consume():
            print(f"{time.strftime('%H:%M:%S')} - 请求通过")
        else:
            print(f"{time.strftime('%H:%M:%S')} - 请求被限流")
        time.sleep(0.1)

运行结果:

> python.exe .\bucket.py
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求被限流
15:53:23 - 请求被限流
15:53:23 - 请求通过
15:53:23 - 请求被限流
15:53:23 - 请求被限流
15:53:24 - 请求被限流
15:53:24 - 请求被限流
15:53:24 - 请求通过
  • java
public class TokenBucketRateLimiter {
    private final int capacity;          // 桶容量
    private final double refillRate;     // 令牌填充速率 (令牌/毫秒)
    private double tokens;               // 当前令牌数量
    private long lastRefillTimestamp;    // 上次填充时间戳
    
    public TokenBucketRateLimiter(int capacity, int refillsPerSecond) {
        this.capacity = capacity;
        this.refillRate = refillsPerSecond / 1000.0;
        this.tokens = capacity;
        this.lastRefillTimestamp = System.currentTimeMillis();
    }
    
    public synchronized boolean tryAcquire(int permits) {
        refillTokens();
        if (tokens >= permits) {
            tokens -= permits;
            return true;
        }
        return false;
    }
    
    public synchronized long acquire(int permits) {
        long waitTime = 0;
        while (true) {
            refillTokens();
            if (tokens >= permits) {
                tokens -= permits;
                return waitTime;
            }
            
            // 计算需要等待的时间
            double missingTokens = permits - tokens;
            long requiredWait = (long) (missingTokens / refillRate);
            
            try {
                Thread.sleep(requiredWait);
                waitTime += requiredWait;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return -1;
            }
        }
    }
    
    private void refillTokens() {
        long currentTime = System.currentTimeMillis();
        if (currentTime > lastRefillTimestamp) {
            long timeElapsed = currentTime - lastRefillTimestamp;
            double tokensToAdd = timeElapsed * refillRate;
            tokens = Math.min(capacity, tokens + tokensToAdd);
            lastRefillTimestamp = currentTime;
        }
    }
    
    // 获取当前状态
    public synchronized double getAvailableTokens() {
        refillTokens();
        return tokens;
    }
}

1.2. 漏桶算法 (Leaky Bucket)

漏桶算法(Leaky Bucket Algorithm)是一种常用的流量整形和速率限制算法。它的工作原理是:

有一个固定容量的"桶",用于存储请求
请求以任意速率进入桶中
桶以固定速率"漏水"(处理请求)
如果桶满了,新的请求会被丢弃或拒绝

  • python
import time
import threading

class LeakyBucket:
    def __init__(self, capacity, leak_rate: float):
        """
        漏桶算法 (Leaky Bucket)

        capacity: 桶的容量
        leak_rate: 漏水速率(请求/秒)
        """
        self.capacity   = capacity
        self.leak_rate  = leak_rate
        self.water      = 0  # 当前桶中的水量(请求数量)
        self.last_time  = time.time()
        self._lock      = threading.Lock()
    
    def _leak(self):
        """漏水操作"""
        with self._lock:
            current_now = time.time()
            time_passed = current_now - self.last_time
            # 需要漏多少水
            leaked_amount = time_passed * self.leak_rate
            
            if leaked_amount > 0:
                self.water = max(0, self.water - leaked_amount)
                self.last_time = current_now
    
    def allow_request(self, weight: float = 1.0):
        """
        检查是否允许权重为 weight 的请求通过
        weight: 请求的权重
        """
        self._leak()  # 先执行漏水(消费需求)
        
        with self._lock:
            if self.water + weight < self.capacity:
                # 新增一个需求,水位加一
                self.water += weight
                return True
            return False

if __name__ == "__main__":
    # 使用示例
    bucket = LeakyBucket(capacity=10, leak_rate=2)  # 容量10,每秒处理2个请求

    for i in range(20):
        weight = 1
        if i == 5:
            weight = 10
        if bucket.allow_request(weight):
            print(f"请求 {i} 通过")
        else:
            print(f"请求 {i} 被限制")
        time.sleep(0.1)  # 模拟请求间隔

运行结果

> python.exe .\leaky.py 
请求 0 通过
请求 1 通过
请求 2 通过
请求 3 通过
请求 4 通过
请求 5 被限制 (如果是正常权重,此处应该是通过;这里权重提高为10,则不通过)
请求 6 通过
请求 7 通过
请求 8 通过
请求 9 通过
请求 10 通过
请求 11 通过
请求 12 通过
请求 13 被限制
请求 14 通过
请求 15 被限制
请求 16 被限制
请求 17 被限制
请求 18 被限制
请求 19 通过
  • java
public class LeakyBucketRateLimiter {
    private final int capacity;          // 桶容量
    private final long leakInterval;     // 漏水间隔 (毫秒)
    private double waterLevel;           // 当前水位
    private long lastLeakTimestamp;      // 上次漏水时间
    
    public LeakyBucketRateLimiter(int capacity, int leaksPerSecond) {
        this.capacity = capacity;
        this.leakInterval = 1000 / leaksPerSecond;
        this.waterLevel = 0;
        this.lastLeakTimestamp = System.currentTimeMillis();
    }
    
    public synchronized boolean tryAcquire(int permits) {
        leakWater();
        if (waterLevel + permits <= capacity) {
            waterLevel += permits;
            return true;
        }
        return false;
    }
    
    public synchronized long acquire(int permits) {
        leakWater();
        
        if (waterLevel + permits <= capacity) {
            waterLevel += permits;
            return 0;
        }
        
        // 计算需要等待的时间
        double overflow = (waterLevel + permits) - capacity;
        long waitTime = (long) (overflow * leakInterval);
        
        try {
            Thread.sleep(waitTime);
            waterLevel += permits - (overflow / leakInterval);
            return waitTime;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return -1;
        }
    }
    
    private void leakWater() {
        long currentTime = System.currentTimeMillis();
        long timeElapsed = currentTime - lastLeakTimestamp;
        
        if (timeElapsed >= leakInterval) {
            long leaks = timeElapsed / leakInterval;
            waterLevel = Math.max(0, waterLevel - leaks);
            lastLeakTimestamp = currentTime;
        }
    }
}

1.3. 滑动窗口限流

时间片滑动窗口算法是一种常用的限流算法,它将时间划分为固定大小的窗口,并在每个窗口内维护请求计数。

1.3.1. 时间片滑动窗口

  • python
import time
import threading
from typing import Dict, List

class SlidingWindow:
    def __init__(self, window_size: int, max_requests: int):
        """
        滑动窗口 (Sliding Window)

        window_size: 窗口大小(秒)
        max_requests: 窗口内最大请求数
        """
        self.window_size    = window_size
        self.max_requests   = max_requests
        self.windows: Dict[int, int] = {}  # 时间片 -> 计数
        self._lock          = threading.Lock()
    
    def _get_current_window(self) -> int:
        """获取当前时间片"""
        return int(time.time() // self.window_size)
    
    def _clean_old_windows(self):
        """清理过期的窗口"""
        current_window = self._get_current_window()
        expired_windows = [w for w in self.windows.keys() if w < current_window]
        for w in expired_windows:
            del self.windows[w]
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        with self._lock:
            current_window = self._get_current_window()
            print(current_window, self.windows)
            self._clean_old_windows()
            
            # 计算当前窗口内的总请求数
            total_requests = sum(self.windows.values())
            if total_requests < self.max_requests:
                # 增加当前窗口计数
                self.windows[current_window] = self.windows.get(current_window, 0) + 1
                return True
            return False
    
    def get_current_count(self) -> int:
        """获取当前窗口内的请求总数"""
        with self._lock:
            self._clean_old_windows()
            return sum(self.windows.values())

# 使用示例
if __name__ == "__main__":
    window = SlidingWindow(window_size=10, max_requests=5)  # 10秒内最多5个请求
    
    for i in range(10):
        if window.allow_request():
            print(f"请求 {i} 通过,当前计数: {window.get_current_count()}")
        else:
            print(f"请求 {i} 被限制,当前计数: {window.get_current_count()}")
        time.sleep(1)

运行结果:

> python.exe .\sliding.py
176190177 {}
请求 0 通过,当前计数: 1
176190177 {176190177: 1}
请求 1 通过,当前计数: 2
176190178 {176190177: 2}
请求 2 通过,当前计数: 1
176190178 {176190178: 1}
请求 3 通过,当前计数: 2
176190178 {176190178: 2}
请求 4 通过,当前计数: 3
176190178 {176190178: 3}
请求 5 通过,当前计数: 4
176190178 {176190178: 4}
请求 6 通过,当前计数: 5
176190178 {176190178: 5}
请求 7 被限制,当前计数: 5
176190178 {176190178: 5}
请求 8 被限制,当前计数: 5
176190178 {176190178: 5}
请求 9 被限制,当前计数: 5
  • java
public class SlidingWindowRateLimiter {
    private final int maxRequests;       // 时间窗口内最大请求数
    private final long windowSizeInMillis; // 时间窗口大小(毫秒)
    private final int segments;          // 窗口分段数
    
    private final long[] timestamps;     // 时间戳数组
    private final int[] counters;        // 计数器数组
    private int currentSegment;          // 当前段索引
    
    public SlidingWindowRateLimiter(int maxRequests, long windowSizeInMillis, int segments) {
        this.maxRequests = maxRequests;
        this.windowSizeInMillis = windowSizeInMillis;
        this.segments = segments;
        this.timestamps = new long[segments];
        this.counters = new int[segments];
        this.currentSegment = 0;
        
        long currentTime = System.currentTimeMillis();
        for (int i = 0; i < segments; i++) {
            timestamps[i] = currentTime;
        }
    }
    
    public synchronized boolean tryAcquire(int permits) {
        long currentTime = System.currentTimeMillis();
        updateWindow(currentTime);
        
        // 计算当前窗口内的总请求数
        int totalRequests = 0;
        for (int i = 0; i < segments; i++) {
            totalRequests += counters[i];
        }
        
        if (totalRequests + permits <= maxRequests) {
            counters[currentSegment] += permits;
            return true;
        }
        return false;
    }
    
    private void updateWindow(long currentTime) {
        long segmentSize = windowSizeInMillis / segments;
        long currentSegmentStart = currentTime - (currentTime % segmentSize);
        
        // 如果当前时间已经进入新的时间段
        if (currentSegmentStart > timestamps[currentSegment]) {
            int segmentsToAdvance = (int) ((currentSegmentStart - timestamps[currentSegment]) / segmentSize);
            
            for (int i = 1; i <= segmentsToAdvance; i++) {
                int newSegment = (currentSegment + i) % segments;
                timestamps[newSegment] = currentSegmentStart - (segmentsToAdvance - i) * segmentSize;
                counters[newSegment] = 0;
            }
            
            currentSegment = (currentSegment + segmentsToAdvance) % segments;
        }
    }
    
    public synchronized int getCurrentRequests() {
        updateWindow(System.currentTimeMillis());
        int total = 0;
        for (int counter : counters) {
            total += counter;
        }
        return total;
    }
}

1.3.2. 分布式滑动窗口 (Redis实现)

@Component
public class RedisSlidingWindowRateLimiter {
    private final RedisTemplate<String, String> redisTemplate;
    private final StringRedisTemplate stringRedisTemplate;
    
    private static final String LUA_SCRIPT = 
        "local key = KEYS[1]\n" +
        "local now = tonumber(ARGV[1])\n" +
        "local window = tonumber(ARGV[2])\n" +
        "local limit = tonumber(ARGV[3])\n" +
        "local clearBefore = now - window\nn" +
        "\n" +
        "redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore)\n" +
        "local current = redis.call('ZCARD', key)\n" +
        "\n" +
        "if current < limit then\n" +
        "    redis.call('ZADD', key, now, now)\n" +
        "    redis.call('EXPIRE', key, window/1000)\n" +
        "    return 1\n" +
        "else\n" +
        "    return 0\n" +
        "end";
    
    public RedisSlidingWindowRateLimiter(RedisTemplate<String, String> redisTemplate,
                                       StringRedisTemplate stringRedisTemplate) {
        this.redisTemplate = redisTemplate;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    public boolean tryAcquire(String key, int maxRequests, long windowInMillis) {
        long now = System.currentTimeMillis();
        
        Long result = stringRedisTemplate.execute(
            new DefaultRedisScript<>(LUA_SCRIPT, Long.class),
            Collections.singletonList(key),
            String.valueOf(now),
            String.valueOf(windowInMillis),
            String.valueOf(maxRequests)
        );
        
        return result != null && result == 1;
    }
}

1.4. 自适应限流策略

1.4.1. 基于系统负载的自适应限流

  • python
    安装依赖
pip install psutil
import time
import threading
import psutil
import numpy as np
from collections import deque

class LoadBasedAdaptiveLimiter:
    def __init__(self, 
                 initial_rate: int = 100,
                 min_rate: int = 10,
                 max_rate: int = 1000,
                 window_size: int = 60):
        """
        基于系统负载的自适应限流器
        """
        self.current_rate   = initial_rate
        self.min_rate       = min_rate
        self.max_rate       = max_rate
        self.window_size    = window_size
        
        # 监控数据
        self.cpu_usages     = deque(maxlen=window_size)
        self.memory_usages  = deque(maxlen=window_size)
        self.request_counts = deque(maxlen=window_size)
        
        self.lock           = threading.Lock()
        self.last_adjust_time = time.time()
        
    def _collect_metrics(self):
        """收集系统指标"""
        cpu_percent = psutil.cpu_percent(interval=0.1)
        memory_percent = psutil.virtual_memory().percent
        
        self.cpu_usages.append(cpu_percent)
        self.memory_usages.append(memory_percent)
    
    def _calculate_system_health(self) -> float:
        """计算系统健康度 (0-1, 1表示最健康)"""
        if not self.cpu_usages or not self.memory_usages:
            return 1.0
        
        avg_cpu = np.mean(list(self.cpu_usages))
        avg_memory = np.mean(list(self.memory_usages))
        print("cpu:", avg_cpu, "memory:", avg_memory)
        
        # 健康度计算(CPU和内存使用率越低,健康度越高)
        cpu_health = max(0, 1 - avg_cpu / 100)
        memory_health = max(0, 1 - avg_memory / 100)
        
        # 加权平均
        system_health = 0.6 * cpu_health + 0.4 * memory_health
        return system_health
    
    def _adjust_rate(self):
        """根据系统健康度调整限流速率"""
        system_health = self._calculate_system_health()
        
        if system_health > 0.8:
            # 系统健康,适当增加速率
            new_rate = min(self.max_rate, int(self.current_rate * 1.2))
        elif system_health > 0.6:
            # 系统正常,保持当前速率
            new_rate = self.current_rate
        elif system_health > 0.4:
            # 系统压力较大,适当降低速率
            new_rate = max(self.min_rate, int(self.current_rate * 0.8))
        else:
            # 系统压力很大,大幅降低速率
            new_rate = max(self.min_rate, int(self.current_rate * 0.5))
        
        self.current_rate = new_rate
        print(f"系统健康度: {system_health:.2f}, 调整限流速率: {self.current_rate}")
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        with self.lock:
            # 收集系统指标
            self._collect_metrics()
            
            # 每5秒调整一次速率
            current_time = time.time()
            if current_time - self.last_adjust_time >= 5:
                # 根据系统健康度调整限流速率
                self._adjust_rate()
                self.last_adjust_time = current_time
            
            # 简单的令牌桶实现
            if len(self.request_counts) < self.current_rate:
                self.request_counts.append(current_time)
                return True
            return False
    
    def get_current_rate(self) -> int:
        """获取当前限流速率"""
        return self.current_rate

# 使用示例
if __name__ == "__main__":
    limiter = LoadBasedAdaptiveLimiter(initial_rate=50, min_rate=10, max_rate=200)
    
    # 模拟不同负载情况
    for i in range(100):
        if limiter.allow_request():
            print(f"请求 {i:2} 通过, 当前速率: {limiter.get_current_rate()}")
        else:
            print(f"请求 {i:2} 被限制, 当前速率: {limiter.get_current_rate()}")
        time.sleep(0.1)

运行结果:

> python.exe .\adjust.py
请求  0 通过, 当前速率: 50
请求  1 通过, 当前速率: 50
请求  2 通过, 当前速率: 50
请求  3 通过, 当前速率: 50
请求  4 通过, 当前速率: 50
请求  5 通过, 当前速率: 50
请求  6 通过, 当前速率: 50
请求  7 通过, 当前速率: 50
请求  8 通过, 当前速率: 50
请求  9 通过, 当前速率: 50
请求 10 通过, 当前速率: 50
请求 11 通过, 当前速率: 50
请求 12 通过, 当前速率: 50
请求 13 通过, 当前速率: 50
请求 14 通过, 当前速率: 50
请求 15 通过, 当前速率: 50
请求 16 通过, 当前速率: 50
请求 17 通过, 当前速率: 50
请求 18 通过, 当前速率: 50
请求 19 通过, 当前速率: 50
请求 20 通过, 当前速率: 50
请求 21 通过, 当前速率: 50
请求 22 通过, 当前速率: 50
cpu: 10.1125 memory: 64.25833333333334
系统健康度: 0.68, 调整限流速率: 50
请求 23 通过, 当前速率: 50
请求 24 通过, 当前速率: 50
请求 25 通过, 当前速率: 50
请求 26 通过, 当前速率: 50
请求 27 通过, 当前速率: 50
请求 28 通过, 当前速率: 50
请求 29 通过, 当前速率: 50
请求 30 通过, 当前速率: 50
请求 31 通过, 当前速率: 50
请求 32 通过, 当前速率: 50
请求 33 通过, 当前速率: 50
请求 34 通过, 当前速率: 50
请求 35 通过, 当前速率: 50
请求 36 通过, 当前速率: 50
请求 37 通过, 当前速率: 50
请求 38 通过, 当前速率: 50
请求 39 通过, 当前速率: 50
请求 40 通过, 当前速率: 50
请求 41 通过, 当前速率: 50
请求 42 通过, 当前速率: 50
请求 43 通过, 当前速率: 50
请求 44 通过, 当前速率: 50
请求 45 通过, 当前速率: 50
cpu: 10.172340425531914 memory: 64.25744680851062
系统健康度: 0.68, 调整限流速率: 50
请求 46 通过, 当前速率: 50
请求 47 通过, 当前速率: 50
请求 48 通过, 当前速率: 50
请求 49 通过, 当前速率: 50
请求 50 被限制, 当前速率: 50
请求 51 被限制, 当前速率: 50
请求 52 被限制, 当前速率: 50
请求 53 被限制, 当前速率: 50
请求 54 被限制, 当前速率: 50
请求 55 被限制, 当前速率: 50
请求 56 被限制, 当前速率: 50
请求 57 被限制, 当前速率: 50
请求 58 被限制, 当前速率: 50
请求 59 被限制, 当前速率: 50
请求 60 被限制, 当前速率: 50
请求 61 被限制, 当前速率: 50
请求 62 被限制, 当前速率: 50
请求 63 被限制, 当前速率: 50
请求 64 被限制, 当前速率: 50
请求 65 被限制, 当前速率: 50
请求 66 被限制, 当前速率: 50
请求 67 被限制, 当前速率: 50
请求 68 被限制, 当前速率: 50
cpu: 9.268333333333334 memory: 64.22666666666666
系统健康度: 0.69, 调整限流速率: 50
请求 69 被限制, 当前速率: 50
请求 70 被限制, 当前速率: 50
请求 71 被限制, 当前速率: 50
请求 72 被限制, 当前速率: 50
请求 73 被限制, 当前速率: 50
请求 74 被限制, 当前速率: 50
请求 75 被限制, 当前速率: 50
请求 76 被限制, 当前速率: 50
请求 77 被限制, 当前速率: 50
请求 78 被限制, 当前速率: 50
请求 79 被限制, 当前速率: 50
请求 80 被限制, 当前速率: 50
请求 81 被限制, 当前速率: 50
请求 82 被限制, 当前速率: 50
请求 83 被限制, 当前速率: 50
请求 84 被限制, 当前速率: 50
请求 85 被限制, 当前速率: 50
请求 86 被限制, 当前速率: 50
请求 87 被限制, 当前速率: 50
请求 88 被限制, 当前速率: 50
请求 89 被限制, 当前速率: 50
请求 90 被限制, 当前速率: 50
请求 91 被限制, 当前速率: 50
cpu: 6.281666666666667 memory: 64.16
系统健康度: 0.71, 调整限流速率: 50
请求 92 被限制, 当前速率: 50
请求 93 被限制, 当前速率: 50
请求 94 被限制, 当前速率: 50
请求 95 被限制, 当前速率: 50
请求 96 被限制, 当前速率: 50
请求 97 被限制, 当前速率: 50
请求 98 被限制, 当前速率: 50
请求 99 被限制, 当前速率: 50
  • java
@Component
public class AdaptiveRateLimiter {
    private final TokenBucketRateLimiter baseLimiter;
    private final int baseRate;          // 基础限流速率
    private final int minRate;           // 最小限流速率
    private final int maxRate;           // 最大限流速率
    
    // 系统指标监控
    private double systemLoad = 0.0;
    private long lastUpdateTime = System.currentTimeMillis();
    private final double loadThreshold = 0.8; // 系统负载阈值
    
    public AdaptiveRateLimiter(int baseRate, int minRate, int maxRate) {
        this.baseRate = baseRate;
        this.minRate = minRate;
        this.maxRate = maxRate;
        this.baseLimiter = new TokenBucketRateLimiter(baseRate, baseRate);
    }
    
    public synchronized boolean tryAcquire(int permits) {
        updateRateBasedOnSystemLoad();
        return baseLimiter.tryAcquire(permits);
    }
    
    private void updateRateBasedOnSystemLoad() {
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastUpdateTime < 1000) { // 每秒更新一次
            return;
        }
        
        // 获取系统负载 (需要根据具体环境实现)
        double currentLoad = getSystemLoad();
        this.systemLoad = 0.7 * this.systemLoad + 0.3 * currentLoad; // 平滑处理
        
        // 根据系统负载调整限流速率
        int newRate;
        if (systemLoad > loadThreshold) {
            // 系统负载高,降低限流速率
            double reductionFactor = 1.0 - (systemLoad - loadThreshold) / (1.0 - loadThreshold);
            newRate = (int) (baseRate * Math.max(0.1, reductionFactor));
            newRate = Math.max(minRate, newRate);
        } else {
            // 系统负载低,可以适当提高限流速率
            double increaseFactor = 1.0 + (loadThreshold - systemLoad) / loadThreshold;
            newRate = (int) (baseRate * Math.min(2.0, increaseFactor));
            newRate = Math.min(maxRate, newRate);
        }
        
        // 更新限流器 (需要TokenBucketRateLimiter支持动态调整)
        updateLimiterRate(newRate);
        lastUpdateTime = currentTime;
    }
    
    private double getSystemLoad() {
        // 实现获取系统负载的逻辑
        // 这里使用CPU负载作为示例
        OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
        if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
            return ((com.sun.management.OperatingSystemMXBean) osBean).getSystemCpuLoad();
        }
        return 0.5; // 默认值
    }
    
    private void updateLimiterRate(int newRate) {
        // 动态更新限流器速率
        // 需要TokenBucketRateLimiter支持速率调整
    }
    
    // 基于响应时间的自适应调整
    public void recordResponseTime(long responseTime, long threshold) {
        if (responseTime > threshold) {
            // 响应时间过长,触发限流调整
            adjustRateForSlowResponse();
        }
    }
    
    private synchronized void adjustRateForSlowResponse() {
        // 根据响应时间调整限流策略
    }
}

1.4.2. 基于响应时间的自适应限流

import time
import threading
from collections import deque
import numpy as np

class ResponseTimeAdaptiveLimiter:
    def __init__(self,
                 initial_rate: int = 100,
                 min_rate: int = 10,
                 max_rate: int = 1000,
                 target_response_time: float = 1.0,
                 window_size: int = 100):
        """
        基于响应时间的自适应限流器
        """
        self.current_rate       = initial_rate
        self.min_rate           = min_rate
        self.max_rate           = max_rate
        self.target_response_time = target_response_time
        self.window_size        = window_size
        
        self.response_times = deque(maxlen=window_size)
        self.request_times  = deque(maxlen=window_size)
        self.lock = threading.Lock()
        
    def record_response_time(self, response_time: float):
        """记录响应时间"""
        with self.lock:
            self.response_times.append(response_time)
    
    def _calculate_percentile_response_time(self, percentile: float = 95.0) -> float:
        """计算百分位响应时间"""
        if not self.response_times:
            return 0.0
        
        return np.percentile(list(self.response_times), percentile)
    
    def _adjust_rate_based_on_response_time(self):
        """基于响应时间调整限流速率"""
        if len(self.response_times) < 10:  # 样本不足
            return
        
        p95_response_time = self._calculate_percentile_response_time(95)
        p50_response_time = self._calculate_percentile_response_time(50)
        print(f"P50响应时间: {p50_response_time:.3f}s, P95响应时间: {p95_response_time:.3f}s")
        
        if p95_response_time > self.target_response_time * 2:
            # 响应时间严重超标,大幅降级
            new_rate = max(self.min_rate, int(self.current_rate * 0.6))
        elif p95_response_time > self.target_response_time * 1.5:
            # 响应时间超标,适度降级
            new_rate = max(self.min_rate, int(self.current_rate * 0.8))
        elif p95_response_time < self.target_response_time * 0.7:
            # 响应时间很好,适度提升
            new_rate = min(self.max_rate, int(self.current_rate * 1.2))
        elif p95_response_time < self.target_response_time * 0.5:
            # 响应时间非常好,大幅提升
            new_rate = min(self.max_rate, int(self.current_rate * 1.5))
        else:
            # 响应时间在可接受范围,保持当前速率
            new_rate = self.current_rate
        
        if new_rate != self.current_rate:
            print(f"响应时间自适应调整: {self.current_rate} -> {new_rate}")
            self.current_rate = new_rate
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        with self.lock:
            current_time = time.time()
            
            # 每处理10个请求调整一次
            if len(self.response_times) % 10 == 0:
                self._adjust_rate_based_on_response_time()
            
            # 滑动窗口限流
            window_start = current_time - 1.0  # 1秒窗口
            recent_requests = [t for t in self.request_times if t > window_start]
            
            if len(recent_requests) < self.current_rate:
                self.request_times.append(current_time)
                return True
            return False
    
    def get_current_rate(self) -> int:
        return self.current_rate

# 使用示例
if __name__ == "__main__":
    limiter = ResponseTimeAdaptiveLimiter(initial_rate=50, target_response_time=0.2)

    base_delay = 0.1
    for i in range(200):
        if limiter.allow_request():
            # 模拟响应时间变化
            if i < 50:
                # 低负载阶段
                response_time = base_delay + np.random.normal(0.05, 0.02)
            elif i < 100:
                # 高负载阶段
                response_time = base_delay * 3 + np.random.normal(0.1, 0.05)
            else:
                # 恢复正常
                response_time = base_delay + np.random.normal(0.05, 0.02)
            
            limiter.record_response_time(response_time)
            print(f"请求 {i:3} 完成, 响应时间: {response_time:.3f}s, 当前速率: {limiter.get_current_rate()}")
        else:
            print(f"请求 {i:3} 被限流, 当前速率: {limiter.get_current_rate()}")
        
        time.sleep(0.05)

运行结果:

> python.exe .\adjust2.py
请求   0 完成, 响应时间: 0.139s, 当前速率: 50
请求   1 完成, 响应时间: 0.178s, 当前速率: 50
请求   2 完成, 响应时间: 0.167s, 当前速率: 50
请求   3 完成, 响应时间: 0.163s, 当前速率: 50
请求   4 完成, 响应时间: 0.166s, 当前速率: 50
请求   5 完成, 响应时间: 0.125s, 当前速率: 50
请求   6 完成, 响应时间: 0.163s, 当前速率: 50
请求   7 完成, 响应时间: 0.118s, 当前速率: 50
请求   8 完成, 响应时间: 0.147s, 当前速率: 50
请求   9 完成, 响应时间: 0.132s, 当前速率: 50
P50响应时间: 0.155s, P95响应时间: 0.173s
请求  10 完成, 响应时间: 0.164s, 当前速率: 50
请求  11 完成, 响应时间: 0.129s, 当前速率: 50
请求  12 完成, 响应时间: 0.124s, 当前速率: 50
请求  13 完成, 响应时间: 0.135s, 当前速率: 50
请求  14 完成, 响应时间: 0.140s, 当前速率: 50
请求  15 完成, 响应时间: 0.162s, 当前速率: 50
请求  16 完成, 响应时间: 0.149s, 当前速率: 50
请求  17 完成, 响应时间: 0.161s, 当前速率: 50
请求  18 完成, 响应时间: 0.144s, 当前速率: 50
请求  19 完成, 响应时间: 0.139s, 当前速率: 50
P50响应时间: 0.145s, P95响应时间: 0.168s
请求  20 完成, 响应时间: 0.101s, 当前速率: 50
请求  21 完成, 响应时间: 0.141s, 当前速率: 50
请求  22 完成, 响应时间: 0.135s, 当前速率: 50
请求  23 完成, 响应时间: 0.156s, 当前速率: 50
请求  24 完成, 响应时间: 0.167s, 当前速率: 50
请求  25 完成, 响应时间: 0.172s, 当前速率: 50
请求  26 完成, 响应时间: 0.162s, 当前速率: 50
请求  27 完成, 响应时间: 0.169s, 当前速率: 50
请求  28 完成, 响应时间: 0.134s, 当前速率: 50
请求  29 完成, 响应时间: 0.135s, 当前速率: 50
P50响应时间: 0.145s, P95响应时间: 0.171s
请求  30 完成, 响应时间: 0.148s, 当前速率: 50
请求  31 完成, 响应时间: 0.166s, 当前速率: 50
请求  32 完成, 响应时间: 0.145s, 当前速率: 50
请求  33 完成, 响应时间: 0.144s, 当前速率: 50
请求  34 完成, 响应时间: 0.159s, 当前速率: 50
请求  35 完成, 响应时间: 0.153s, 当前速率: 50
请求  36 完成, 响应时间: 0.144s, 当前速率: 50
请求  37 完成, 响应时间: 0.127s, 当前速率: 50
请求  38 完成, 响应时间: 0.153s, 当前速率: 50
请求  39 完成, 响应时间: 0.148s, 当前速率: 50
P50响应时间: 0.147s, P95响应时间: 0.170s
请求  40 完成, 响应时间: 0.155s, 当前速率: 50
请求  41 完成, 响应时间: 0.124s, 当前速率: 50
请求  42 完成, 响应时间: 0.159s, 当前速率: 50
请求  43 完成, 响应时间: 0.152s, 当前速率: 50
请求  44 完成, 响应时间: 0.167s, 当前速率: 50
请求  45 完成, 响应时间: 0.161s, 当前速率: 50
请求  46 完成, 响应时间: 0.152s, 当前速率: 50
请求  47 完成, 响应时间: 0.151s, 当前速率: 50
请求  48 完成, 响应时间: 0.155s, 当前速率: 50
请求  49 完成, 响应时间: 0.165s, 当前速率: 50
P50响应时间: 0.152s, P95响应时间: 0.168s
请求  50 完成, 响应时间: 0.359s, 当前速率: 50
请求  51 完成, 响应时间: 0.396s, 当前速率: 50
请求  52 完成, 响应时间: 0.400s, 当前速率: 50
请求  53 完成, 响应时间: 0.305s, 当前速率: 50
请求  54 完成, 响应时间: 0.356s, 当前速率: 50
请求  55 完成, 响应时间: 0.267s, 当前速率: 50
请求  56 完成, 响应时间: 0.426s, 当前速率: 50
请求  57 完成, 响应时间: 0.361s, 当前速率: 50
请求  58 完成, 响应时间: 0.475s, 当前速率: 50
请求  59 完成, 响应时间: 0.449s, 当前速率: 50
P50响应时间: 0.155s, P95响应时间: 0.402s
响应时间自适应调整: 50 -> 30
请求  60 完成, 响应时间: 0.370s, 当前速率: 30
请求  61 完成, 响应时间: 0.408s, 当前速率: 30
请求  62 完成, 响应时间: 0.336s, 当前速率: 30
请求  63 完成, 响应时间: 0.313s, 当前速率: 30
请求  64 完成, 响应时间: 0.452s, 当前速率: 30
请求  65 完成, 响应时间: 0.453s, 当前速率: 30
请求  66 完成, 响应时间: 0.401s, 当前速率: 30
请求  67 完成, 响应时间: 0.430s, 当前速率: 30
请求  68 完成, 响应时间: 0.421s, 当前速率: 30
请求  69 完成, 响应时间: 0.348s, 当前速率: 30
P50响应时间: 0.161s, P95响应时间: 0.440s
响应时间自适应调整: 30 -> 18
请求  70 完成, 响应时间: 0.284s, 当前速率: 18
请求  71 完成, 响应时间: 0.440s, 当前速率: 18
请求  72 完成, 响应时间: 0.466s, 当前速率: 18
请求  73 完成, 响应时间: 0.486s, 当前速率: 18
请求  74 完成, 响应时间: 0.403s, 当前速率: 18
请求  75 完成, 响应时间: 0.525s, 当前速率: 18
请求  76 完成, 响应时间: 0.443s, 当前速率: 18
请求  77 完成, 响应时间: 0.460s, 当前速率: 18
请求  78 完成, 响应时间: 0.430s, 当前速率: 18
请求  79 完成, 响应时间: 0.401s, 当前速率: 18
P50响应时间: 0.164s, P95响应时间: 0.461s
响应时间自适应调整: 18 -> 10
请求  80 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  81 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  82 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  83 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  84 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  85 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  86 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求  87 完成, 响应时间: 0.443s, 当前速率: 10
请求  88 完成, 响应时间: 0.368s, 当前速率: 10
请求  89 完成, 响应时间: 0.398s, 当前速率: 10
请求  90 完成, 响应时间: 0.338s, 当前速率: 10
请求  91 完成, 响应时间: 0.385s, 当前速率: 10
请求  92 完成, 响应时间: 0.314s, 当前速率: 10
请求  93 完成, 响应时间: 0.400s, 当前速率: 10
请求  94 完成, 响应时间: 0.430s, 当前速率: 10
请求  95 完成, 响应时间: 0.441s, 当前速率: 10
请求  96 完成, 响应时间: 0.422s, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求  97 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求  98 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求  99 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 100 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 101 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 102 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 103 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 104 完成, 响应时间: 0.106s, 当前速率: 10
请求 105 完成, 响应时间: 0.164s, 当前速率: 10
请求 106 完成, 响应时间: 0.149s, 当前速率: 10
请求 107 完成, 响应时间: 0.144s, 当前速率: 10
请求 108 完成, 响应时间: 0.112s, 当前速率: 10
请求 109 完成, 响应时间: 0.156s, 当前速率: 10
请求 110 完成, 响应时间: 0.172s, 当前速率: 10
请求 111 完成, 响应时间: 0.165s, 当前速率: 10
请求 112 完成, 响应时间: 0.133s, 当前速率: 10
请求 113 完成, 响应时间: 0.163s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 114 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 115 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 116 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 117 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 118 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 119 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 120 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 121 完成, 响应时间: 0.204s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 122 完成, 响应时间: 0.145s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 123 完成, 响应时间: 0.153s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 124 完成, 响应时间: 0.149s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 125 完成, 响应时间: 0.136s, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.454s
请求 126 完成, 响应时间: 0.134s, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.454s
请求 127 完成, 响应时间: 0.165s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 128 完成, 响应时间: 0.161s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 129 完成, 响应时间: 0.141s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 130 完成, 响应时间: 0.137s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 131 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 132 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 133 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 134 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 135 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 136 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 137 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 138 完成, 响应时间: 0.123s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 139 完成, 响应时间: 0.107s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 140 完成, 响应时间: 0.138s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 141 完成, 响应时间: 0.175s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 142 完成, 响应时间: 0.160s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 143 完成, 响应时间: 0.150s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 144 完成, 响应时间: 0.135s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 145 完成, 响应时间: 0.145s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 146 完成, 响应时间: 0.154s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 147 完成, 响应时间: 0.165s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 148 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 149 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 150 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 151 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 152 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 153 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 154 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 155 完成, 响应时间: 0.148s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 156 完成, 响应时间: 0.189s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 157 完成, 响应时间: 0.146s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 158 完成, 响应时间: 0.131s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 159 完成, 响应时间: 0.178s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 160 完成, 响应时间: 0.159s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 161 完成, 响应时间: 0.159s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 162 完成, 响应时间: 0.138s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 163 完成, 响应时间: 0.163s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 164 完成, 响应时间: 0.162s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 165 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 166 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 167 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 168 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 169 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 170 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 171 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 172 完成, 响应时间: 0.174s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 173 完成, 响应时间: 0.137s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 174 完成, 响应时间: 0.143s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 175 完成, 响应时间: 0.169s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 176 完成, 响应时间: 0.138s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 177 完成, 响应时间: 0.173s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 178 完成, 响应时间: 0.136s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 179 完成, 响应时间: 0.141s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 180 完成, 响应时间: 0.131s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 181 完成, 响应时间: 0.164s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 182 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 183 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 184 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 185 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 186 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 187 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 188 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 189 完成, 响应时间: 0.170s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 190 完成, 响应时间: 0.161s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 191 完成, 响应时间: 0.134s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 192 完成, 响应时间: 0.149s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 193 完成, 响应时间: 0.143s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 194 完成, 响应时间: 0.166s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 195 完成, 响应时间: 0.126s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 196 完成, 响应时间: 0.117s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 197 完成, 响应时间: 0.186s, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.454s
请求 198 完成, 响应时间: 0.123s, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.454s
请求 199 被限流, 当前速率: 10

1.4.3. AIMD (加性增/乘性减) 算法

import time
import threading
from collections import deque

class AIMDLimiter:
    def __init__(self,
                 initial_rate: int = 10,
                 min_rate: int = 1,
                 max_rate: int = 1000,
                 ai_factor: float = 1.0,    # 加性增加
                 md_factor: float = 0.5,    # 乘性减少
                 window_size: int = 10):
        """
        AIMD (Additive Increase Multiplicative Decrease) 限流器
        """
        self.current_rate   = initial_rate
        self.min_rate       = min_rate
        self.max_rate       = max_rate
        self.ai_factor      = ai_factor
        self.md_factor      = md_factor
        self.window_size    = window_size
        
        self.error_rates    = deque(maxlen=window_size)
        self.request_count  = 0
        self.error_count    = 0
        self.lock = threading.Lock()
        
    def record_result(self, success: bool):
        """记录请求结果"""
        with self.lock:
            self.request_count += 1
            if not success:
                self.error_count += 1
            
            # 每window_size个请求计算一次错误率
            if self.request_count >= self.window_size:
                error_rate = self.error_count / self.request_count
                self.error_rates.append(error_rate)
                self._adjust_rate(error_rate)
                
                # 重置计数器
                self.request_count = 0
                self.error_count = 0
    
    def _adjust_rate(self, error_rate: float):
        """根据错误率调整速率"""
        error_threshold = 0.1  # 10%错误率阈值
        
        if error_rate > error_threshold:
            # 乘性减少
            new_rate = max(self.min_rate, int(self.current_rate * self.md_factor))
            print(f"错误率 {error_rate:.2%} 过高,乘性减少: {self.current_rate} -> {new_rate}")
        else:
            # 加性增加
            new_rate = min(self.max_rate, self.current_rate + self.ai_factor)
            print(f"错误率 {error_rate:.2%} 正常,加性增加: {self.current_rate} -> {new_rate}")
        
        self.current_rate = new_rate
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        with self.lock:
            # 简单的计数器限流
            if len(self.error_rates) * self.window_size < self.current_rate:
                return True
            return False
    
    def get_current_rate(self) -> int:
        return self.current_rate

# 使用示例
if __name__ == "__main__":
    limiter = AIMDLimiter(initial_rate=10, ai_factor=2, md_factor=0.7)
    
    # 模拟不同错误率的场景
    scenarios = [
        [True] * 8 + [False] * 2,  # 20%错误率
        [True] * 9 + [False] * 1,  # 10%错误率
        [True] * 10,               # 0%错误率
        [True] * 5 + [False] * 5,  # 50%错误率
    ]
    
    for i, scenario in enumerate(scenarios):
        print(f"\n=== 场景 {i+1} ===")
        for success in scenario:
            if limiter.allow_request():
                limiter.record_result(success)
                status = "成功" if success else "失败"
                print(f"请求 {status}, 当前速率: {limiter.get_current_rate()}")
            else:
                print(f"请求被限流, 当前速率: {limiter.get_current_rate()}")
            time.sleep(0.1)

运行结果:

> python.exe .\adjust3.py

=== 场景 1 ===
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 失败, 当前速率: 10
错误率 20.00% 过高,乘性减少: 10 -> 7
请求 失败, 当前速率: 7

=== 场景 2 ===
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7

=== 场景 3 ===
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7

=== 场景 4 ===
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7

1.4.4. 基于机器学习的自适应限流

环境依赖:

pip install scikit-learn -i https://pypi.tuna.tsinghua.edu.cn/simple
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import deque
import threading
import time

class MLAdaptiveLimiter:
    def __init__(self,
                 initial_rate: int = 100,
                 min_rate: int = 10,
                 max_rate: int = 1000,
                 features_window: int = 100):
        """
        基于机器学习的自适应限流器
        """
        self.current_rate   = initial_rate
        self.min_rate       = min_rate
        self.max_rate       = max_rate
        self.features_window = features_window
        
        # 特征存储
        self.features   = deque(maxlen=features_window)
        self.labels     = deque(maxlen=features_window)  # 1正常, -1异常
        
        # 异常检测模型
        self.model = IsolationForest(contamination=0.1, random_state=42)
        self.is_model_trained = False
        
        self.lock = threading.Lock()
        self.metrics_window = deque(maxlen=50)
        
    def extract_features(self) -> np.ndarray:
        """提取当前系统特征"""
        # 这里简化实现,实际中可以从监控系统获取更多指标
        if not self.metrics_window:
            return np.array([[0, 0, 0]])
        
        recent_metrics = list(self.metrics_window)
        
        # 计算统计特征
        request_rates = [m['request_rate'] for m in recent_metrics]
        response_times = [m['response_time'] for m in recent_metrics]
        error_rates = [m['error_rate'] for m in recent_metrics]
        
        features = np.array([[
            np.mean(request_rates),      # 平均请求率
            np.std(request_rates),       # 请求率标准差
            np.mean(response_times),     # 平均响应时间
            np.std(response_times),      # 响应时间标准差
            np.mean(error_rates),        # 平均错误率
        ]])
        
        return features
    
    def record_metrics(self, request_rate: float, response_time: float, error_rate: float):
        """记录系统指标"""
        with self.lock:
            self.metrics_window.append({
                'request_rate': request_rate,
                'response_time': response_time,
                'error_rate': error_rate
            })
            
            # 提取特征并判断是否异常
            features = self.extract_features()
            
            if len(self.features) >= 10:  # 有足够训练数据
                if not self.is_model_trained:
                    # 训练模型
                    X_train = np.array(list(self.features))
                    self.model.fit(X_train)
                    self.is_model_trained = True
                
                # 预测当前状态
                prediction = self.model.predict(features)[0]
                
                # 根据预测结果调整限流
                if prediction == -1:  # 异常
                    self.current_rate = max(self.min_rate, int(self.current_rate * 0.7))
                    print(f"检测到异常状态,降低限流速率: {self.current_rate}")
                else:  # 正常
                    self.current_rate = min(self.max_rate, int(self.current_rate * 1.1))
                    print(f"系统状态正常,提高限流速率: {self.current_rate}")
            
            # 存储特征和标签(简化标签生成)
            label = 1 if error_rate < 0.1 and response_time < 1.0 else -1
            self.features.append(features[0])
            self.labels.append(label)
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        with self.lock:
            # 简单的计数器限流
            current_count = len([m for m in self.metrics_window 
                               if time.time() - m.get('timestamp', 0) < 1.0])
            return current_count < self.current_rate
    
    def get_current_rate(self) -> int:
        return self.current_rate

# 使用示例
if __name__ == "__main__":
    limiter = MLAdaptiveLimiter(initial_rate=50)
    
    # 模拟指标数据
    for i in range(100):
        # 模拟变化的系统指标
        if i < 30:
            # 正常阶段
            request_rate = 50 + np.random.normal(0, 5)
            response_time = 0.1 + np.random.normal(0, 0.02)
            error_rate = 0.02
        elif i < 60:
            # 异常阶段
            request_rate = 80 + np.random.normal(0, 10)
            response_time = 0.5 + np.random.normal(0, 0.1)
            error_rate = 0.15
        else:
            # 恢复阶段
            request_rate = 40 + np.random.normal(0, 5)
            response_time = 0.1 + np.random.normal(0, 0.02)
            error_rate = 0.03
        
        limiter.record_metrics(request_rate, response_time, error_rate)
        
        if limiter.allow_request():
            print(f"请求 {i:2} 通过, 速率: {limiter.get_current_rate()}")
        else:
            print(f"请求 {i:2} 限流, 速率: {limiter.get_current_rate()}")
        
        time.sleep(0.1)

运行结果:

> python.exe .\adjust4.py
请求  0 通过, 速率: 50
请求  1 通过, 速率: 50
请求  2 通过, 速率: 50
请求  3 通过, 速率: 50
请求  4 通过, 速率: 50
请求  5 通过, 速率: 50
请求  6 通过, 速率: 50
请求  7 通过, 速率: 50
请求  8 通过, 速率: 50
请求  9 通过, 速率: 50
系统状态正常,提高限流速率: 55
请求 10 通过, 速率: 55
系统状态正常,提高限流速率: 60
请求 11 通过, 速率: 60
系统状态正常,提高限流速率: 66
请求 12 通过, 速率: 66
系统状态正常,提高限流速率: 72
请求 13 通过, 速率: 72
系统状态正常,提高限流速率: 79
请求 14 通过, 速率: 79
系统状态正常,提高限流速率: 86
请求 15 通过, 速率: 86
系统状态正常,提高限流速率: 94
请求 16 通过, 速率: 94
系统状态正常,提高限流速率: 103
请求 17 通过, 速率: 103
系统状态正常,提高限流速率: 113
请求 18 通过, 速率: 113
系统状态正常,提高限流速率: 124
请求 19 通过, 速率: 124
系统状态正常,提高限流速率: 136
请求 20 通过, 速率: 136
系统状态正常,提高限流速率: 149
请求 21 通过, 速率: 149
系统状态正常,提高限流速率: 163
请求 22 通过, 速率: 163
系统状态正常,提高限流速率: 179
请求 23 通过, 速率: 179
系统状态正常,提高限流速率: 196
请求 24 通过, 速率: 196
系统状态正常,提高限流速率: 215
请求 25 通过, 速率: 215
系统状态正常,提高限流速率: 236
请求 26 通过, 速率: 236
系统状态正常,提高限流速率: 259
请求 27 通过, 速率: 259
系统状态正常,提高限流速率: 284
请求 28 通过, 速率: 284
系统状态正常,提高限流速率: 312
请求 29 通过, 速率: 312
系统状态正常,提高限流速率: 343
请求 30 通过, 速率: 343
系统状态正常,提高限流速率: 377
请求 31 通过, 速率: 377
系统状态正常,提高限流速率: 414
请求 32 通过, 速率: 414
系统状态正常,提高限流速率: 455
请求 33 通过, 速率: 455
系统状态正常,提高限流速率: 500
请求 34 通过, 速率: 500
系统状态正常,提高限流速率: 550
请求 35 通过, 速率: 550
系统状态正常,提高限流速率: 605
请求 36 通过, 速率: 605
系统状态正常,提高限流速率: 665
请求 37 通过, 速率: 665
系统状态正常,提高限流速率: 731
请求 38 通过, 速率: 731
系统状态正常,提高限流速率: 804
请求 39 通过, 速率: 804
系统状态正常,提高限流速率: 884
请求 40 通过, 速率: 884
系统状态正常,提高限流速率: 972
请求 41 通过, 速率: 972
系统状态正常,提高限流速率: 1000
请求 42 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 43 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 44 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 45 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 46 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 47 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 48 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 49 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 50 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 51 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 52 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 53 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 54 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 55 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 56 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 57 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 58 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 59 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 60 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 61 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 62 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 63 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 64 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 65 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 66 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 67 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 68 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 69 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 70 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 71 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 72 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 73 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 74 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 75 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 76 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 77 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 78 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 79 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 80 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 81 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 82 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 83 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 84 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 85 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 86 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 87 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 88 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 89 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 90 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 91 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 92 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 93 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 94 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 95 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 96 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 97 通过, 速率: 1000
检测到异常状态,降低限流速率: 700
请求 98 通过, 速率: 700
检测到异常状态,降低限流速率: 489
请求 99 通过, 速率: 489

1.4.5. 混合自适应限流策略

import time
import threading
from collections import deque
import numpy as np

class HybridAdaptiveLimiter:
    def __init__(self,
                 initial_rate: int = 100,
                 min_rate: int = 10,
                 max_rate: int = 1000):
        """
        混合自适应限流器 - 结合多种策略
        """
        self.current_rate   = initial_rate
        self.min_rate       = min_rate
        self.max_rate       = max_rate
        
        # 多个监控窗口
        # 响应时间
        self.response_times = deque(maxlen=100)
        # 错误率
        self.error_rates    = deque(maxlen=50)
        # 系统负载
        self.system_metrics = deque(maxlen=30)
        
        self.lock = threading.Lock()
        self.last_adjustment = time.time()
        
        # 权重配置
        self.weights = {
            'response_time': 0.4,
            'error_rate': 0.3,
            'system_load': 0.3
        }
    
    def _calculate_adjustment_factors(self) -> dict:
        """计算各个维度的调整因子"""
        factors = {}
        
        # 响应时间因子
        if self.response_times:
            # mean 是 NumPy 库中用于计算平均值的函数,可沿指定轴计算数组元素的算术平均值。
            avg_response_time = np.mean(list(self.response_times))
            print("avg_response_time", avg_response_time, len(self.response_times))
            # 假设目标响应时间为0.2s
            factors['response_time'] = max(0.5, min(2.0, 0.2 / avg_response_time))
        else:
            factors['response_time'] = 1.0
        
        # 错误率因子
        if self.error_rates:
            avg_error_rate = np.mean(list(self.error_rates))
            print("avg_error_rate", avg_error_rate, len(self.error_rates))
            # 目标错误率5%
            factors['error_rate'] = max(0.3, min(2.0, 0.05 / max(avg_error_rate, 0.001)))
        else:
            factors['error_rate'] = 1.0
        
        # 系统负载因子(简化)
        factors['system_load'] = 1.0  # 实际中可以从系统监控获取
        
        return factors
    
    def _calculate_new_rate(self) -> int:
        """计算新的限流速率"""
        factors = self._calculate_adjustment_factors()
        
        # 加权计算总调整因子
        total_factor = 0
        for factor_name, weight in self.weights.items():
            total_factor += factors[factor_name] * weight
        
        # 应用调整
        new_rate = int(self.current_rate * total_factor)
        new_rate = max(self.min_rate, min(self.max_rate, new_rate))
        print(f"速率调整: {self.current_rate} -> {new_rate}")
        print(f"调整因子: {factors}, 总因子: {total_factor:.3f}")
        
        return new_rate
    
    def record_metrics(self, response_time: float = None, success: bool = None, system_load: float = None):
        """记录监控指标"""
        with self.lock:
            if response_time is not None:
                self.response_times.append(response_time)
            
            if success is not None:
                self.error_rates.append(0 if success else 1)
            
            if system_load is not None:
                self.system_metrics.append(system_load)
            
            # 每5秒调整一次
            current_time = time.time()
            if current_time - self.last_adjustment >= 5:
                self.current_rate = self._calculate_new_rate()
                self.last_adjustment = current_time
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        with self.lock:
            # 滑动窗口限流
            current_time = time.time()
            recent_requests = [t for t in self.response_times if current_time - t < 1.0]  # 1秒窗口
            
            return len(recent_requests) < self.current_rate
    
    def get_current_rate(self) -> int:
        return self.current_rate

# 使用示例
if __name__ == "__main__":
    limiter = HybridAdaptiveLimiter(initial_rate=50)
    
    # 模拟不同场景
    scenarios = [
        {'response_time': 0.1, 'success': True, 'load': 0.3},    # 良好状态
        {'response_time': 0.3, 'success': False, 'load': 0.8},   # 压力状态
        {'response_time': 0.5, 'success': False, 'load': 0.9},   # 过载状态
        {'response_time': 0.15, 'success': True, 'load': 0.4},   # 恢复状态
    ]
    
    for i in range(100):
        scenario = scenarios[i % len(scenarios)]
        
        limiter.record_metrics(
            response_time=scenario['response_time'] + np.random.normal(0, 0.02),
            success=scenario['success'],
            system_load=scenario['load'] + np.random.normal(0, 0.1)
        )
        
        if limiter.allow_request():
            print(f"请求 {i:2} 通过, 速率: {limiter.get_current_rate()}")
        else:
            print(f"请求 {i:2} 限流, 速率: {limiter.get_current_rate()}")
        
        time.sleep(0.1)

运行结果:

> python.exe .\adjust5.py
请求  0 通过, 速率: 50
请求  1 通过, 速率: 50
请求  2 通过, 速率: 50
请求  3 通过, 速率: 50
请求  4 通过, 速率: 50
请求  5 通过, 速率: 50
请求  6 通过, 速率: 50
请求  7 通过, 速率: 50
请求  8 通过, 速率: 50
请求  9 通过, 速率: 50
请求 10 通过, 速率: 50
请求 11 通过, 速率: 50
请求 12 通过, 速率: 50
请求 13 通过, 速率: 50
请求 14 通过, 速率: 50
请求 15 通过, 速率: 50
请求 16 通过, 速率: 50
请求 17 通过, 速率: 50
请求 18 通过, 速率: 50
请求 19 通过, 速率: 50
请求 20 通过, 速率: 50
请求 21 通过, 速率: 50
请求 22 通过, 速率: 50
请求 23 通过, 速率: 50
请求 24 通过, 速率: 50
请求 25 通过, 速率: 50
请求 26 通过, 速率: 50
请求 27 通过, 速率: 50
请求 28 通过, 速率: 50
请求 29 通过, 速率: 50
请求 30 通过, 速率: 50
请求 31 通过, 速率: 50
请求 32 通过, 速率: 50
请求 33 通过, 速率: 50
请求 34 通过, 速率: 50
请求 35 通过, 速率: 50
请求 36 通过, 速率: 50
请求 37 通过, 速率: 50
请求 38 通过, 速率: 50
请求 39 通过, 速率: 50
请求 40 通过, 速率: 50
请求 41 通过, 速率: 50
请求 42 通过, 速率: 50
请求 43 通过, 速率: 50
请求 44 通过, 速率: 50
请求 45 通过, 速率: 50
avg_response_time 0.2667061164917195 47
avg_error_rate 0.5106382978723404 47
速率调整: 50 -> 34
调整因子: {'response_time': np.float64(0.7498890637785934), 'error_rate': 0.3, 'system_load': 1.0}, 总因子: 0.690
请求 46 通过, 速率: 34
请求 47 通过, 速率: 34
请求 48 通过, 速率: 34
请求 49 通过, 速率: 34
请求 50 通过, 速率: 34
请求 51 通过, 速率: 34
请求 52 通过, 速率: 34
请求 53 通过, 速率: 34
请求 54 通过, 速率: 34
请求 55 通过, 速率: 34
请求 56 通过, 速率: 34
请求 57 通过, 速率: 34
请求 58 通过, 速率: 34
请求 59 通过, 速率: 34
请求 60 通过, 速率: 34
请求 61 通过, 速率: 34
请求 62 通过, 速率: 34
请求 63 通过, 速率: 34
请求 64 通过, 速率: 34
请求 65 通过, 速率: 34
请求 66 通过, 速率: 34
请求 67 通过, 速率: 34
请求 68 通过, 速率: 34
请求 69 通过, 速率: 34
请求 70 通过, 速率: 34
请求 71 通过, 速率: 34
请求 72 通过, 速率: 34
请求 73 通过, 速率: 34
请求 74 通过, 速率: 34
请求 75 通过, 速率: 34
请求 76 通过, 速率: 34
请求 77 通过, 速率: 34
请求 78 通过, 速率: 34
请求 79 通过, 速率: 34
请求 80 通过, 速率: 34
请求 81 通过, 速率: 34
请求 82 通过, 速率: 34
请求 83 通过, 速率: 34
请求 84 通过, 速率: 34
请求 85 通过, 速率: 34
请求 86 通过, 速率: 34
请求 87 通过, 速率: 34
请求 88 通过, 速率: 34
请求 89 通过, 速率: 34
请求 90 通过, 速率: 34
请求 91 通过, 速率: 34
请求 92 通过, 速率: 34
avg_response_time 0.26144230542232283 94
avg_error_rate 0.5 50
速率调整: 34 -> 23
调整因子: {'response_time': np.float64(0.7649871342624848), 'error_rate': 0.3, 'system_load': 1.0}, 总因子: 0.696
请求 93 通过, 速率: 23
请求 94 通过, 速率: 23
请求 95 通过, 速率: 23
请求 96 通过, 速率: 23
请求 97 通过, 速率: 23
请求 98 通过, 速率: 23
请求 99 通过, 速率: 23

1.5. 流式输出专用限流器

1.5.1. 字符级平滑限流

public class StreamingOutputRateLimiter {
    private final RateLimiter characterLimiter;     // 字符输出限流
    private final RateLimiter chunkLimiter;         // 数据块限流
    private final ScheduledExecutorService scheduler;
    
    // 限流配置
    private final int maxCharsPerSecond;
    private final int maxChunksPerSecond;
    private final int minChunkSize;
    private final int maxChunkSize;
    
    public StreamingOutputRateLimiter(int maxCharsPerSecond, int maxChunksPerSecond) {
        this.maxCharsPerSecond = maxCharsPerSecond;
        this.maxChunksPerSecond = maxChunksPerSecond;
        this.minChunkSize = 1;
        this.maxChunkSize = Math.max(1, maxCharsPerSecond / maxChunksPerSecond);
        
        this.characterLimiter = new TokenBucketRateLimiter(maxCharsPerSecond, maxCharsPerSecond);
        this.chunkLimiter = new TokenBucketRateLimiter(maxChunksPerSecond, maxChunksPerSecond);
        this.scheduler = Executors.newScheduledThreadPool(1);
    }
    
    public CompletableFuture<Void> streamOutput(String content, Consumer<String> outputConsumer) {
        return CompletableFuture.runAsync(() -> {
            try {
                streamContentSmoothly(content, outputConsumer);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("流式输出被中断", e);
            }
        });
    }
    
    private void streamContentSmoothly(String content, Consumer<String> outputConsumer) 
            throws InterruptedException {
        char[] characters = content.toCharArray();
        int position = 0;
        
        while (position < characters.length) {
            // 获取数据块许可
            chunkLimiter.acquire(1);
            
            // 确定当前块的大小
            int chunkSize = calculateChunkSize(characters.length - position);
            
            // 获取字符输出许可
            if (!characterLimiter.tryAcquire(chunkSize)) {
                // 令牌不足,等待并重试
                Thread.sleep(calculateWaitTime(chunkSize));
                continue;
            }
            
            // 输出数据块
            String chunk = new String(characters, position, chunkSize);
            outputConsumer.accept(chunk);
            
            position += chunkSize;
            
            // 添加微小延迟,使输出更平滑
            if (position < characters.length) {
                Thread.sleep(10);
            }
        }
    }
    
    private int calculateChunkSize(int remainingChars) {
        // 动态调整块大小,保持输出平滑
        int idealChunkSize = maxCharsPerSecond / maxChunksPerSecond;
        return Math.min(Math.min(idealChunkSize, maxChunkSize), remainingChars);
    }
    
    private long calculateWaitTime(int requiredTokens) {
        double missingTokens = requiredTokens - characterLimiter.getAvailableTokens();
        return (long) (missingTokens / (maxCharsPerSecond / 1000.0));
    }
    
    // 动态调整限流参数
    public void adjustRate(int newCharsPerSecond, int newChunksPerSecond) {
        // 实现动态调整逻辑
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

1.5.2. 优先级限流策略

public class PriorityRateLimiter {
    private final Map<Integer, RateLimiter> priorityLimiters;
    private final int baseRate;
    
    public PriorityRateLimiter(int baseRate) {
        this.baseRate = baseRate;
        this.priorityLimiters = new ConcurrentHashMap<>();
        
        // 初始化不同优先级的限流器
        // 优先级越高,限流越宽松
        priorityLimiters.put(1, new TokenBucketRateLimiter(baseRate / 4, baseRate / 4)); // 低优先级
        priorityLimiters.put(2, new TokenBucketRateLimiter(baseRate / 2, baseRate / 2)); // 中优先级
        priorityLimiters.put(3, new TokenBucketRateLimiter(baseRate, baseRate));         // 高优先级
        priorityLimiters.put(4, new TokenBucketRateLimiter(baseRate * 2, baseRate * 2)); // 最高优先级
    }
    
    public boolean tryAcquire(int priority) {
        RateLimiter limiter = priorityLimiters.get(priority);
        if (limiter == null) {
            limiter = priorityLimiters.get(2); // 默认中优先级
        }
        return limiter.tryAcquire(1);
    }
    
    public long acquire(int priority) {
        RateLimiter limiter = priorityLimiters.get(priority);
        if (limiter == null) {
            limiter = priorityLimiters.get(2);
        }
        return limiter.acquire(1);
    }
    
    // 为重要数据设置高优先级
    public boolean tryAcquireForImportantData() {
        return tryAcquire(4);
    }
    
    // 为普通数据设置中优先级
    public boolean tryAcquireForNormalData() {
        return tryAcquire(2);
    }
}

1.6. 监控和统计

@Component
public class RateLimitMonitor {
    private final MeterRegistry meterRegistry;
    private final Map<String, Counter> limitCounters = new ConcurrentHashMap<>();
    private final Map<String, Timer> requestTimers = new ConcurrentHashMap<>();
    
    public RateLimitMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordRequest(String endpoint, boolean limited, long duration) {
        // 记录请求指标
        String status = limited ? "limited" : "allowed";
        Counter counter = limitCounters.computeIfAbsent(
            endpoint + "." + status,
            key -> Counter.builder("ratelimit.requests")
                .tag("endpoint", endpoint)
                .tag("status", status)
                .register(meterRegistry)
        );
        counter.increment();
        
        // 记录响应时间
        Timer timer = requestTimers.computeIfAbsent(
            endpoint,
            key -> Timer.builder("ratelimit.duration")
                .tag("endpoint", endpoint)
                .register(meterRegistry)
        );
        timer.record(duration, TimeUnit.MILLISECONDS);
    }
    
    public void recordSystemLoad(double load) {
        Gauge.builder("ratelimit.system.load", () -> load)
            .register(meterRegistry);
    }
    
    public Map<String, Object> getRateLimitStats() {
        Map<String, Object> stats = new HashMap<>();
        // 实现统计信息收集
        return stats;
    }
}

1.7. 配置管理

@Configuration
@ConfigurationProperties(prefix = "ratelimit")
@Data
public class RateLimitConfig {
    private boolean enabled = true;
    private int defaultRate = 100; // 默认每秒100个请求
    private int burstCapacity = 150; // 突发容量
    private long timeoutMillis = 1000; // 超时时间
    private Map<String, Integer> endpoints = new HashMap<>();
    
    // 流式输出特殊配置
    private Streaming streaming = new Streaming();
    
    @Data
    public static class Streaming {
        private int charsPerSecond = 50;  // 每秒字符数
        private int chunksPerSecond = 10; // 每秒数据块数
        private int minChunkSize = 1;     // 最小块大小
        private int maxChunkSize = 10;    // 最大块大小
    }
    
    public int getRateForEndpoint(String endpoint) {
        return endpoints.getOrDefault(endpoint, defaultRate);
    }
}

2. 代码分析

class AppGenerateService:
    system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400)

    @classmethod
    def generate(
        cls,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
    ):
        """
        App Content Generate
        :param app_model: app model
        :param user: user
        :param args: args
        :param invoke_from: invoke from
        :param streaming: streaming
        :return:
        """
        # system level rate limiter
        # 默认 BILLING_ENABLED 为false,计费功能不开启
        if dify_config.BILLING_ENABLED:
            # check if it's free plan
            limit_info = BillingService.get_info(app_model.tenant_id)
            if limit_info["subscription"]["plan"] == "sandbox":
                if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id):
                    raise InvokeRateLimitError(
                        "Rate limit exceeded, please upgrade your plan "
                        f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day"
                    )
                cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id)

        # app level rate limiter
        # 获取平滑逻辑配置,最大请求数
        max_active_request = cls._get_max_active_requests(app_model)
        rate_limit = RateLimit(app_model.id, max_active_request)
        # staticmethod 静态方法。生成uuid
        request_id = RateLimit.gen_request_key()
        try:
            # 执行平滑限流策略
            request_id = rate_limit.enter(request_id)
            if app_model.mode == AppMode.COMPLETION.value:
                # 平滑流式输出
                return rate_limit.generate(
                    CompletionAppGenerator.convert_to_event_stream(
                        CompletionAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
                # 平滑流式输出
                return rate_limit.generate(
                    AgentChatAppGenerator.convert_to_event_stream(
                        AgentChatAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id,
                )
            elif app_model.mode == AppMode.CHAT.value:
                # 平滑流式输出
                return rate_limit.generate(
                    ChatAppGenerator.convert_to_event_stream(
                        ChatAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.ADVANCED_CHAT.value:
                workflow_id = args.get("workflow_id")
                workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
                # 平滑流式输出
                return rate_limit.generate(
                    AdvancedChatAppGenerator.convert_to_event_stream(
                        AdvancedChatAppGenerator().generate(
                            app_model=app_model,
                            workflow=workflow,
                            user=user,
                            args=args,
                            invoke_from=invoke_from,
                            streaming=streaming,
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.WORKFLOW.value:
                workflow_id = args.get("workflow_id")
                workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
                # 平滑流式输出
                return rate_limit.generate(
                    WorkflowAppGenerator.convert_to_event_stream(
                        WorkflowAppGenerator().generate(
                            app_model=app_model,
                            workflow=workflow,
                            user=user,
                            args=args,
                            invoke_from=invoke_from,
                            streaming=streaming,
                            call_depth=0,
                            workflow_thread_pool_id=None,
                        ),
                    ),
                    request_id,
                )
            else:
                raise ValueError(f"Invalid app mode {app_model.mode}")
        except RateLimitError as e:
            raise InvokeRateLimitError(str(e))
        except Exception:
            rate_limit.exit(request_id)
            raise
        finally:
            if not streaming:
                rate_limit.exit(request_id)

2.1. 计费方式

  1. 云服务版 (Dify Cloud)
    如果使用的是官方的 Dify Cloud 服务:
    计费功能默认开启
    在 工作空间设置 → 计费与套餐 中管理
    需要绑定支付方式

  2. 自部署版本
    社区版 (Community Edition)
    社区版默认没有计费功能,需要企业版或自行开发。
    企业版 (Enterprise Edition)
    在企业版中,计费配置通常在以下位置:

  • 环境变量配置
# 启用计费功能
BILLING_ENABLED=true

# Stripe 支付配置(国际)
STRIPE_API_KEY=sk_test_...
STRIPE_WEBHOOK_SECRET=whsec_...
STRIPE_PRICE_ID=price_...

# 或 支付宝/微信支付配置(国内)
ALIPAY_APP_ID=...
ALIPAY_PRIVATE_KEY=...
WECHATPAY_MCH_ID=...
  • 配置文件
    在 docker-compose.yml 或环境配置文件中:
environment:
  - BILLING_ENABLED=true
  - STRIPE_API_KEY=your_stripe_key
  - STRIPE_WEBHOOK_SECRET=your_webhook_secret

2.2. 获取限流配置

class AppGenerateService:
    system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400)

    ......
    @staticmethod
    def _get_max_active_requests(app: App) -> int:
        """
        Get the maximum number of active requests allowed for an app.

        Returns the smaller value between app's custom limit and global config limit.
        A value of 0 means infinite (no limit).

        Args:
            app: The App model instance

        Returns:
            The maximum number of active requests allowed
        """
        app_limit = app.max_active_requests or 0
        config_limit = dify_config.APP_MAX_ACTIVE_REQUESTS

        # Filter out infinite (0) values and return the minimum, or 0 if both are infinite
        limits = [limit for limit in [app_limit, config_limit] if limit > 0]
        return min(limits) if limits else 0

2.3. 平滑限流逻辑

2.3.1. redis 数据存储

$ redis-cli -p 6379 -a difyai123456
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
127.0.0.1:6379> keys *
1) "dify:rate_limit:448e6473-433e-464c-a852-75efd5688eab:max_active_requests"
2) "provider_model_credentials:tenant_id:2e475914-d5b3-44eb-9843-91cdc3763371:id:78c45ce2-78fe-4cb2-9937-c0baecdc3144"
3) "plugin_daemon:cluster-nodes-status-hash-map"
4) "plugin_daemon:plugin_state"
5) "account_refresh_token:626b102f-03c9-485d-b557-724fac2cc527"
6) "refresh_token:967aca877a93cf31b5990e2c365b2ec0c57352e5505f00aad5043b2e97cb246a60c91fc7ed9a1e7a05bb733ac43a9d9634303de69e921cf1eb0ed63cd7396ee6"
7) "plugin_daemon:cluster-master-preemption-lock"
127.0.0.1:6379>

2.3.2. 逻辑分析

class RateLimit:
    _MAX_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:max_active_requests"
    _ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:active_requests"
    _UNLIMITED_REQUEST_ID = "unlimited_request_id"
    _REQUEST_MAX_ALIVE_TIME = 10 * 60  # 10 minutes
    _ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60  # recalculate request_count from request_detail every 5 minutes
    _instance_dict: dict[str, "RateLimit"] = {}

    def __new__(cls: type["RateLimit"], client_id: str, max_active_requests: int):
        if client_id not in cls._instance_dict:
            instance = super().__new__(cls)
            cls._instance_dict[client_id] = instance
        return cls._instance_dict[client_id]

    def __init__(self, client_id: str, max_active_requests: int):
        self.max_active_requests = max_active_requests
        # must be called after max_active_requests is set
        if self.disabled():
            return
        if hasattr(self, "initialized"):
            return
        self.initialized = True
        self.client_id = client_id
        self.active_requests_key = self._ACTIVE_REQUESTS_KEY.format(client_id)
        self.max_active_requests_key = self._MAX_ACTIVE_REQUESTS_KEY.format(client_id)
        self.last_recalculate_time = float("-inf")
        self.flush_cache(use_local_value=True)

    def flush_cache(self, use_local_value=False):
        if self.disabled():
            return
        self.last_recalculate_time = time.time()
        # flush max active requests
        if use_local_value or not redis_client.exists(self.max_active_requests_key):
            redis_client.setex(self.max_active_requests_key, timedelta(days=1), self.max_active_requests)
        else:
            self.max_active_requests = int(redis_client.get(self.max_active_requests_key).decode("utf-8"))
            redis_client.expire(self.max_active_requests_key, timedelta(days=1))

        # flush max active requests (in-transit request list)
        if not redis_client.exists(self.active_requests_key):
            return
        request_details = redis_client.hgetall(self.active_requests_key)
        redis_client.expire(self.active_requests_key, timedelta(days=1))
        timeout_requests = [
            k
            for k, v in request_details.items()
            if time.time() - float(v.decode("utf-8")) > RateLimit._REQUEST_MAX_ALIVE_TIME
        ]
        if timeout_requests:
            redis_client.hdel(self.active_requests_key, *timeout_requests)

    # 执行平滑限流策略
    def enter(self, request_id: Optional[str] = None) -> str:
        if self.disabled():
            return RateLimit._UNLIMITED_REQUEST_ID
        if time.time() - self.last_recalculate_time > RateLimit._ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL:
            self.flush_cache()
        if not request_id:
            request_id = RateLimit.gen_request_key()

        # dify:rate_limit:448e6473-433e-464c-a852-75efd5688eab:active_requests 
        active_requests_count = redis_client.hlen(self.active_requests_key)
        if active_requests_count >= self.max_active_requests:
            raise AppInvokeQuotaExceededError(
                f"Too many requests. Please try again later. The current maximum concurrent requests allowed "
                f"for {self.client_id} is {self.max_active_requests}."
            )
        redis_client.hset(self.active_requests_key, request_id, str(time.time()))
        return request_id

    def exit(self, request_id: str):
        if request_id == RateLimit._UNLIMITED_REQUEST_ID:
            return
        redis_client.hdel(self.active_requests_key, request_id)

    def disabled(self):
        return self.max_active_requests <= 0

    # 生成uuid
    @staticmethod
    def gen_request_key() -> str:
        return str(uuid.uuid4())

    # 平滑流式输出
    def generate(self, generator: Union[Generator[str, None, None], Mapping[str, Any]], request_id: str):
        if isinstance(generator, Mapping):
            return generator
        else:
            return RateLimitGenerator(
                rate_limit=self,
                generator=generator,  # ty: ignore [invalid-argument-type]
                request_id=request_id,
            )


class RateLimitGenerator:
    def __init__(self, rate_limit: RateLimit, generator: Generator[str, None, None], request_id: str):
        self.rate_limit = rate_limit
        self.generator = generator
        self.request_id = request_id
        self.closed = False

    def __iter__(self):
        return self

    def __next__(self):
        if self.closed:
            raise StopIteration
        try:
            return next(self.generator)
        except Exception:
            self.close()
            raise

    def close(self):
        if not self.closed:
            self.closed = True
            self.rate_limit.exit(self.request_id)
            if self.generator is not None and hasattr(self.generator, "close"):
                self.generator.close()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐