dify 源码分析(七)ratelimiter
dify 本地源码启动Ollama 安装部署dify 智能体实践dify 源码分析(一)功能概述dify 源码分析(二)源码结构dify 源码分析(三)agentdify 源码分析(四)toolsdify 源码分析(五)chatflowdify 源码分析(六)eventdify 源码分析(七)ratelimiter运行结果:java1.2. 漏桶算法 (Leaky Bucket)漏桶算法(Leak
dify 本地源码启动
Ollama 安装部署
dify 智能体实践
dify 源码分析(一)功能概述
dify 源码分析(二)源码结构
dify 源码分析(三)agent
dify 源码分析(四)tools
dify 源码分析(五)chatflow
dify 源码分析(六)event
dify 源码分析(七)ratelimiter
1. 平滑限流策略(RateLimiter )
1.1. 令牌桶算法 (Token Bucket)
- python
import time
import threading
from typing import Optional
class TokenBucketRateLimiter:
def __init__(self, capacity: int, fill_rate: float):
"""
令牌桶限流器
Args:
capacity: 桶容量
fill_rate: 每秒填充的令牌数
"""
self.capacity = float(capacity)
self.available_tokens = float(capacity)
self.fill_rate = fill_rate
self.last_time = time.time()
self.lock = threading.Lock()
def consume(self, request_tokens: int = 1) -> bool:
"""
消费令牌
Args:
tokens: 需要的令牌数
Returns:
bool: 是否成功获取令牌
"""
with self.lock:
current_now = time.time()
# 计算时间间隔内应该添加的令牌
# 计算时间间隔,单位秒
elapsed = current_now - self.last_time
# 令牌桶容量 和 计算的桶容量,取最小值
self.available_tokens = min(self.capacity, self.available_tokens + elapsed*self.fill_rate)
self.last_time = current_now
# print(f"Available tokens:{self.available_tokens} request tokens:{request_tokens}")
if self.available_tokens >= request_tokens:
self.available_tokens -= request_tokens
return True
return False
def wait_until_ready(self, tokens: int = 1):
"""等待直到有足够的令牌"""
while not self.consume(tokens):
time.sleep(0.01) # 短暂等待后重试
if __name__ == "__main__":
# 使用示例
limiter = TokenBucketRateLimiter(capacity=10, fill_rate=2) # 最大10个令牌,每秒补充2个
# 测试
for i in range(20):
if limiter.consume():
print(f"{time.strftime('%H:%M:%S')} - 请求通过")
else:
print(f"{time.strftime('%H:%M:%S')} - 请求被限流")
time.sleep(0.1)
运行结果:
> python.exe .\bucket.py
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:22 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求通过
15:53:23 - 请求被限流
15:53:23 - 请求被限流
15:53:23 - 请求通过
15:53:23 - 请求被限流
15:53:23 - 请求被限流
15:53:24 - 请求被限流
15:53:24 - 请求被限流
15:53:24 - 请求通过
- java
public class TokenBucketRateLimiter {
private final int capacity; // 桶容量
private final double refillRate; // 令牌填充速率 (令牌/毫秒)
private double tokens; // 当前令牌数量
private long lastRefillTimestamp; // 上次填充时间戳
public TokenBucketRateLimiter(int capacity, int refillsPerSecond) {
this.capacity = capacity;
this.refillRate = refillsPerSecond / 1000.0;
this.tokens = capacity;
this.lastRefillTimestamp = System.currentTimeMillis();
}
public synchronized boolean tryAcquire(int permits) {
refillTokens();
if (tokens >= permits) {
tokens -= permits;
return true;
}
return false;
}
public synchronized long acquire(int permits) {
long waitTime = 0;
while (true) {
refillTokens();
if (tokens >= permits) {
tokens -= permits;
return waitTime;
}
// 计算需要等待的时间
double missingTokens = permits - tokens;
long requiredWait = (long) (missingTokens / refillRate);
try {
Thread.sleep(requiredWait);
waitTime += requiredWait;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
}
}
private void refillTokens() {
long currentTime = System.currentTimeMillis();
if (currentTime > lastRefillTimestamp) {
long timeElapsed = currentTime - lastRefillTimestamp;
double tokensToAdd = timeElapsed * refillRate;
tokens = Math.min(capacity, tokens + tokensToAdd);
lastRefillTimestamp = currentTime;
}
}
// 获取当前状态
public synchronized double getAvailableTokens() {
refillTokens();
return tokens;
}
}
1.2. 漏桶算法 (Leaky Bucket)
漏桶算法(Leaky Bucket Algorithm)是一种常用的流量整形和速率限制算法。它的工作原理是:
有一个固定容量的"桶",用于存储请求
请求以任意速率进入桶中
桶以固定速率"漏水"(处理请求)
如果桶满了,新的请求会被丢弃或拒绝
- python
import time
import threading
class LeakyBucket:
def __init__(self, capacity, leak_rate: float):
"""
漏桶算法 (Leaky Bucket)
capacity: 桶的容量
leak_rate: 漏水速率(请求/秒)
"""
self.capacity = capacity
self.leak_rate = leak_rate
self.water = 0 # 当前桶中的水量(请求数量)
self.last_time = time.time()
self._lock = threading.Lock()
def _leak(self):
"""漏水操作"""
with self._lock:
current_now = time.time()
time_passed = current_now - self.last_time
# 需要漏多少水
leaked_amount = time_passed * self.leak_rate
if leaked_amount > 0:
self.water = max(0, self.water - leaked_amount)
self.last_time = current_now
def allow_request(self, weight: float = 1.0):
"""
检查是否允许权重为 weight 的请求通过
weight: 请求的权重
"""
self._leak() # 先执行漏水(消费需求)
with self._lock:
if self.water + weight < self.capacity:
# 新增一个需求,水位加一
self.water += weight
return True
return False
if __name__ == "__main__":
# 使用示例
bucket = LeakyBucket(capacity=10, leak_rate=2) # 容量10,每秒处理2个请求
for i in range(20):
weight = 1
if i == 5:
weight = 10
if bucket.allow_request(weight):
print(f"请求 {i} 通过")
else:
print(f"请求 {i} 被限制")
time.sleep(0.1) # 模拟请求间隔
运行结果
> python.exe .\leaky.py
请求 0 通过
请求 1 通过
请求 2 通过
请求 3 通过
请求 4 通过
请求 5 被限制 (如果是正常权重,此处应该是通过;这里权重提高为10,则不通过)
请求 6 通过
请求 7 通过
请求 8 通过
请求 9 通过
请求 10 通过
请求 11 通过
请求 12 通过
请求 13 被限制
请求 14 通过
请求 15 被限制
请求 16 被限制
请求 17 被限制
请求 18 被限制
请求 19 通过
- java
public class LeakyBucketRateLimiter {
private final int capacity; // 桶容量
private final long leakInterval; // 漏水间隔 (毫秒)
private double waterLevel; // 当前水位
private long lastLeakTimestamp; // 上次漏水时间
public LeakyBucketRateLimiter(int capacity, int leaksPerSecond) {
this.capacity = capacity;
this.leakInterval = 1000 / leaksPerSecond;
this.waterLevel = 0;
this.lastLeakTimestamp = System.currentTimeMillis();
}
public synchronized boolean tryAcquire(int permits) {
leakWater();
if (waterLevel + permits <= capacity) {
waterLevel += permits;
return true;
}
return false;
}
public synchronized long acquire(int permits) {
leakWater();
if (waterLevel + permits <= capacity) {
waterLevel += permits;
return 0;
}
// 计算需要等待的时间
double overflow = (waterLevel + permits) - capacity;
long waitTime = (long) (overflow * leakInterval);
try {
Thread.sleep(waitTime);
waterLevel += permits - (overflow / leakInterval);
return waitTime;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
}
private void leakWater() {
long currentTime = System.currentTimeMillis();
long timeElapsed = currentTime - lastLeakTimestamp;
if (timeElapsed >= leakInterval) {
long leaks = timeElapsed / leakInterval;
waterLevel = Math.max(0, waterLevel - leaks);
lastLeakTimestamp = currentTime;
}
}
}
1.3. 滑动窗口限流
时间片滑动窗口算法是一种常用的限流算法,它将时间划分为固定大小的窗口,并在每个窗口内维护请求计数。
1.3.1. 时间片滑动窗口
- python
import time
import threading
from typing import Dict, List
class SlidingWindow:
def __init__(self, window_size: int, max_requests: int):
"""
滑动窗口 (Sliding Window)
window_size: 窗口大小(秒)
max_requests: 窗口内最大请求数
"""
self.window_size = window_size
self.max_requests = max_requests
self.windows: Dict[int, int] = {} # 时间片 -> 计数
self._lock = threading.Lock()
def _get_current_window(self) -> int:
"""获取当前时间片"""
return int(time.time() // self.window_size)
def _clean_old_windows(self):
"""清理过期的窗口"""
current_window = self._get_current_window()
expired_windows = [w for w in self.windows.keys() if w < current_window]
for w in expired_windows:
del self.windows[w]
def allow_request(self) -> bool:
"""检查是否允许请求"""
with self._lock:
current_window = self._get_current_window()
print(current_window, self.windows)
self._clean_old_windows()
# 计算当前窗口内的总请求数
total_requests = sum(self.windows.values())
if total_requests < self.max_requests:
# 增加当前窗口计数
self.windows[current_window] = self.windows.get(current_window, 0) + 1
return True
return False
def get_current_count(self) -> int:
"""获取当前窗口内的请求总数"""
with self._lock:
self._clean_old_windows()
return sum(self.windows.values())
# 使用示例
if __name__ == "__main__":
window = SlidingWindow(window_size=10, max_requests=5) # 10秒内最多5个请求
for i in range(10):
if window.allow_request():
print(f"请求 {i} 通过,当前计数: {window.get_current_count()}")
else:
print(f"请求 {i} 被限制,当前计数: {window.get_current_count()}")
time.sleep(1)
运行结果:
> python.exe .\sliding.py
176190177 {}
请求 0 通过,当前计数: 1
176190177 {176190177: 1}
请求 1 通过,当前计数: 2
176190178 {176190177: 2}
请求 2 通过,当前计数: 1
176190178 {176190178: 1}
请求 3 通过,当前计数: 2
176190178 {176190178: 2}
请求 4 通过,当前计数: 3
176190178 {176190178: 3}
请求 5 通过,当前计数: 4
176190178 {176190178: 4}
请求 6 通过,当前计数: 5
176190178 {176190178: 5}
请求 7 被限制,当前计数: 5
176190178 {176190178: 5}
请求 8 被限制,当前计数: 5
176190178 {176190178: 5}
请求 9 被限制,当前计数: 5
- java
public class SlidingWindowRateLimiter {
private final int maxRequests; // 时间窗口内最大请求数
private final long windowSizeInMillis; // 时间窗口大小(毫秒)
private final int segments; // 窗口分段数
private final long[] timestamps; // 时间戳数组
private final int[] counters; // 计数器数组
private int currentSegment; // 当前段索引
public SlidingWindowRateLimiter(int maxRequests, long windowSizeInMillis, int segments) {
this.maxRequests = maxRequests;
this.windowSizeInMillis = windowSizeInMillis;
this.segments = segments;
this.timestamps = new long[segments];
this.counters = new int[segments];
this.currentSegment = 0;
long currentTime = System.currentTimeMillis();
for (int i = 0; i < segments; i++) {
timestamps[i] = currentTime;
}
}
public synchronized boolean tryAcquire(int permits) {
long currentTime = System.currentTimeMillis();
updateWindow(currentTime);
// 计算当前窗口内的总请求数
int totalRequests = 0;
for (int i = 0; i < segments; i++) {
totalRequests += counters[i];
}
if (totalRequests + permits <= maxRequests) {
counters[currentSegment] += permits;
return true;
}
return false;
}
private void updateWindow(long currentTime) {
long segmentSize = windowSizeInMillis / segments;
long currentSegmentStart = currentTime - (currentTime % segmentSize);
// 如果当前时间已经进入新的时间段
if (currentSegmentStart > timestamps[currentSegment]) {
int segmentsToAdvance = (int) ((currentSegmentStart - timestamps[currentSegment]) / segmentSize);
for (int i = 1; i <= segmentsToAdvance; i++) {
int newSegment = (currentSegment + i) % segments;
timestamps[newSegment] = currentSegmentStart - (segmentsToAdvance - i) * segmentSize;
counters[newSegment] = 0;
}
currentSegment = (currentSegment + segmentsToAdvance) % segments;
}
}
public synchronized int getCurrentRequests() {
updateWindow(System.currentTimeMillis());
int total = 0;
for (int counter : counters) {
total += counter;
}
return total;
}
}
1.3.2. 分布式滑动窗口 (Redis实现)
@Component
public class RedisSlidingWindowRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
private final StringRedisTemplate stringRedisTemplate;
private static final String LUA_SCRIPT =
"local key = KEYS[1]\n" +
"local now = tonumber(ARGV[1])\n" +
"local window = tonumber(ARGV[2])\n" +
"local limit = tonumber(ARGV[3])\n" +
"local clearBefore = now - window\nn" +
"\n" +
"redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore)\n" +
"local current = redis.call('ZCARD', key)\n" +
"\n" +
"if current < limit then\n" +
" redis.call('ZADD', key, now, now)\n" +
" redis.call('EXPIRE', key, window/1000)\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
public RedisSlidingWindowRateLimiter(RedisTemplate<String, String> redisTemplate,
StringRedisTemplate stringRedisTemplate) {
this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean tryAcquire(String key, int maxRequests, long windowInMillis) {
long now = System.currentTimeMillis();
Long result = stringRedisTemplate.execute(
new DefaultRedisScript<>(LUA_SCRIPT, Long.class),
Collections.singletonList(key),
String.valueOf(now),
String.valueOf(windowInMillis),
String.valueOf(maxRequests)
);
return result != null && result == 1;
}
}
1.4. 自适应限流策略
1.4.1. 基于系统负载的自适应限流
- python
安装依赖
pip install psutil
import time
import threading
import psutil
import numpy as np
from collections import deque
class LoadBasedAdaptiveLimiter:
def __init__(self,
initial_rate: int = 100,
min_rate: int = 10,
max_rate: int = 1000,
window_size: int = 60):
"""
基于系统负载的自适应限流器
"""
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.window_size = window_size
# 监控数据
self.cpu_usages = deque(maxlen=window_size)
self.memory_usages = deque(maxlen=window_size)
self.request_counts = deque(maxlen=window_size)
self.lock = threading.Lock()
self.last_adjust_time = time.time()
def _collect_metrics(self):
"""收集系统指标"""
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_percent = psutil.virtual_memory().percent
self.cpu_usages.append(cpu_percent)
self.memory_usages.append(memory_percent)
def _calculate_system_health(self) -> float:
"""计算系统健康度 (0-1, 1表示最健康)"""
if not self.cpu_usages or not self.memory_usages:
return 1.0
avg_cpu = np.mean(list(self.cpu_usages))
avg_memory = np.mean(list(self.memory_usages))
print("cpu:", avg_cpu, "memory:", avg_memory)
# 健康度计算(CPU和内存使用率越低,健康度越高)
cpu_health = max(0, 1 - avg_cpu / 100)
memory_health = max(0, 1 - avg_memory / 100)
# 加权平均
system_health = 0.6 * cpu_health + 0.4 * memory_health
return system_health
def _adjust_rate(self):
"""根据系统健康度调整限流速率"""
system_health = self._calculate_system_health()
if system_health > 0.8:
# 系统健康,适当增加速率
new_rate = min(self.max_rate, int(self.current_rate * 1.2))
elif system_health > 0.6:
# 系统正常,保持当前速率
new_rate = self.current_rate
elif system_health > 0.4:
# 系统压力较大,适当降低速率
new_rate = max(self.min_rate, int(self.current_rate * 0.8))
else:
# 系统压力很大,大幅降低速率
new_rate = max(self.min_rate, int(self.current_rate * 0.5))
self.current_rate = new_rate
print(f"系统健康度: {system_health:.2f}, 调整限流速率: {self.current_rate}")
def allow_request(self) -> bool:
"""检查是否允许请求"""
with self.lock:
# 收集系统指标
self._collect_metrics()
# 每5秒调整一次速率
current_time = time.time()
if current_time - self.last_adjust_time >= 5:
# 根据系统健康度调整限流速率
self._adjust_rate()
self.last_adjust_time = current_time
# 简单的令牌桶实现
if len(self.request_counts) < self.current_rate:
self.request_counts.append(current_time)
return True
return False
def get_current_rate(self) -> int:
"""获取当前限流速率"""
return self.current_rate
# 使用示例
if __name__ == "__main__":
limiter = LoadBasedAdaptiveLimiter(initial_rate=50, min_rate=10, max_rate=200)
# 模拟不同负载情况
for i in range(100):
if limiter.allow_request():
print(f"请求 {i:2} 通过, 当前速率: {limiter.get_current_rate()}")
else:
print(f"请求 {i:2} 被限制, 当前速率: {limiter.get_current_rate()}")
time.sleep(0.1)
运行结果:
> python.exe .\adjust.py
请求 0 通过, 当前速率: 50
请求 1 通过, 当前速率: 50
请求 2 通过, 当前速率: 50
请求 3 通过, 当前速率: 50
请求 4 通过, 当前速率: 50
请求 5 通过, 当前速率: 50
请求 6 通过, 当前速率: 50
请求 7 通过, 当前速率: 50
请求 8 通过, 当前速率: 50
请求 9 通过, 当前速率: 50
请求 10 通过, 当前速率: 50
请求 11 通过, 当前速率: 50
请求 12 通过, 当前速率: 50
请求 13 通过, 当前速率: 50
请求 14 通过, 当前速率: 50
请求 15 通过, 当前速率: 50
请求 16 通过, 当前速率: 50
请求 17 通过, 当前速率: 50
请求 18 通过, 当前速率: 50
请求 19 通过, 当前速率: 50
请求 20 通过, 当前速率: 50
请求 21 通过, 当前速率: 50
请求 22 通过, 当前速率: 50
cpu: 10.1125 memory: 64.25833333333334
系统健康度: 0.68, 调整限流速率: 50
请求 23 通过, 当前速率: 50
请求 24 通过, 当前速率: 50
请求 25 通过, 当前速率: 50
请求 26 通过, 当前速率: 50
请求 27 通过, 当前速率: 50
请求 28 通过, 当前速率: 50
请求 29 通过, 当前速率: 50
请求 30 通过, 当前速率: 50
请求 31 通过, 当前速率: 50
请求 32 通过, 当前速率: 50
请求 33 通过, 当前速率: 50
请求 34 通过, 当前速率: 50
请求 35 通过, 当前速率: 50
请求 36 通过, 当前速率: 50
请求 37 通过, 当前速率: 50
请求 38 通过, 当前速率: 50
请求 39 通过, 当前速率: 50
请求 40 通过, 当前速率: 50
请求 41 通过, 当前速率: 50
请求 42 通过, 当前速率: 50
请求 43 通过, 当前速率: 50
请求 44 通过, 当前速率: 50
请求 45 通过, 当前速率: 50
cpu: 10.172340425531914 memory: 64.25744680851062
系统健康度: 0.68, 调整限流速率: 50
请求 46 通过, 当前速率: 50
请求 47 通过, 当前速率: 50
请求 48 通过, 当前速率: 50
请求 49 通过, 当前速率: 50
请求 50 被限制, 当前速率: 50
请求 51 被限制, 当前速率: 50
请求 52 被限制, 当前速率: 50
请求 53 被限制, 当前速率: 50
请求 54 被限制, 当前速率: 50
请求 55 被限制, 当前速率: 50
请求 56 被限制, 当前速率: 50
请求 57 被限制, 当前速率: 50
请求 58 被限制, 当前速率: 50
请求 59 被限制, 当前速率: 50
请求 60 被限制, 当前速率: 50
请求 61 被限制, 当前速率: 50
请求 62 被限制, 当前速率: 50
请求 63 被限制, 当前速率: 50
请求 64 被限制, 当前速率: 50
请求 65 被限制, 当前速率: 50
请求 66 被限制, 当前速率: 50
请求 67 被限制, 当前速率: 50
请求 68 被限制, 当前速率: 50
cpu: 9.268333333333334 memory: 64.22666666666666
系统健康度: 0.69, 调整限流速率: 50
请求 69 被限制, 当前速率: 50
请求 70 被限制, 当前速率: 50
请求 71 被限制, 当前速率: 50
请求 72 被限制, 当前速率: 50
请求 73 被限制, 当前速率: 50
请求 74 被限制, 当前速率: 50
请求 75 被限制, 当前速率: 50
请求 76 被限制, 当前速率: 50
请求 77 被限制, 当前速率: 50
请求 78 被限制, 当前速率: 50
请求 79 被限制, 当前速率: 50
请求 80 被限制, 当前速率: 50
请求 81 被限制, 当前速率: 50
请求 82 被限制, 当前速率: 50
请求 83 被限制, 当前速率: 50
请求 84 被限制, 当前速率: 50
请求 85 被限制, 当前速率: 50
请求 86 被限制, 当前速率: 50
请求 87 被限制, 当前速率: 50
请求 88 被限制, 当前速率: 50
请求 89 被限制, 当前速率: 50
请求 90 被限制, 当前速率: 50
请求 91 被限制, 当前速率: 50
cpu: 6.281666666666667 memory: 64.16
系统健康度: 0.71, 调整限流速率: 50
请求 92 被限制, 当前速率: 50
请求 93 被限制, 当前速率: 50
请求 94 被限制, 当前速率: 50
请求 95 被限制, 当前速率: 50
请求 96 被限制, 当前速率: 50
请求 97 被限制, 当前速率: 50
请求 98 被限制, 当前速率: 50
请求 99 被限制, 当前速率: 50
- java
@Component
public class AdaptiveRateLimiter {
private final TokenBucketRateLimiter baseLimiter;
private final int baseRate; // 基础限流速率
private final int minRate; // 最小限流速率
private final int maxRate; // 最大限流速率
// 系统指标监控
private double systemLoad = 0.0;
private long lastUpdateTime = System.currentTimeMillis();
private final double loadThreshold = 0.8; // 系统负载阈值
public AdaptiveRateLimiter(int baseRate, int minRate, int maxRate) {
this.baseRate = baseRate;
this.minRate = minRate;
this.maxRate = maxRate;
this.baseLimiter = new TokenBucketRateLimiter(baseRate, baseRate);
}
public synchronized boolean tryAcquire(int permits) {
updateRateBasedOnSystemLoad();
return baseLimiter.tryAcquire(permits);
}
private void updateRateBasedOnSystemLoad() {
long currentTime = System.currentTimeMillis();
if (currentTime - lastUpdateTime < 1000) { // 每秒更新一次
return;
}
// 获取系统负载 (需要根据具体环境实现)
double currentLoad = getSystemLoad();
this.systemLoad = 0.7 * this.systemLoad + 0.3 * currentLoad; // 平滑处理
// 根据系统负载调整限流速率
int newRate;
if (systemLoad > loadThreshold) {
// 系统负载高,降低限流速率
double reductionFactor = 1.0 - (systemLoad - loadThreshold) / (1.0 - loadThreshold);
newRate = (int) (baseRate * Math.max(0.1, reductionFactor));
newRate = Math.max(minRate, newRate);
} else {
// 系统负载低,可以适当提高限流速率
double increaseFactor = 1.0 + (loadThreshold - systemLoad) / loadThreshold;
newRate = (int) (baseRate * Math.min(2.0, increaseFactor));
newRate = Math.min(maxRate, newRate);
}
// 更新限流器 (需要TokenBucketRateLimiter支持动态调整)
updateLimiterRate(newRate);
lastUpdateTime = currentTime;
}
private double getSystemLoad() {
// 实现获取系统负载的逻辑
// 这里使用CPU负载作为示例
OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
return ((com.sun.management.OperatingSystemMXBean) osBean).getSystemCpuLoad();
}
return 0.5; // 默认值
}
private void updateLimiterRate(int newRate) {
// 动态更新限流器速率
// 需要TokenBucketRateLimiter支持速率调整
}
// 基于响应时间的自适应调整
public void recordResponseTime(long responseTime, long threshold) {
if (responseTime > threshold) {
// 响应时间过长,触发限流调整
adjustRateForSlowResponse();
}
}
private synchronized void adjustRateForSlowResponse() {
// 根据响应时间调整限流策略
}
}
1.4.2. 基于响应时间的自适应限流
import time
import threading
from collections import deque
import numpy as np
class ResponseTimeAdaptiveLimiter:
def __init__(self,
initial_rate: int = 100,
min_rate: int = 10,
max_rate: int = 1000,
target_response_time: float = 1.0,
window_size: int = 100):
"""
基于响应时间的自适应限流器
"""
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.target_response_time = target_response_time
self.window_size = window_size
self.response_times = deque(maxlen=window_size)
self.request_times = deque(maxlen=window_size)
self.lock = threading.Lock()
def record_response_time(self, response_time: float):
"""记录响应时间"""
with self.lock:
self.response_times.append(response_time)
def _calculate_percentile_response_time(self, percentile: float = 95.0) -> float:
"""计算百分位响应时间"""
if not self.response_times:
return 0.0
return np.percentile(list(self.response_times), percentile)
def _adjust_rate_based_on_response_time(self):
"""基于响应时间调整限流速率"""
if len(self.response_times) < 10: # 样本不足
return
p95_response_time = self._calculate_percentile_response_time(95)
p50_response_time = self._calculate_percentile_response_time(50)
print(f"P50响应时间: {p50_response_time:.3f}s, P95响应时间: {p95_response_time:.3f}s")
if p95_response_time > self.target_response_time * 2:
# 响应时间严重超标,大幅降级
new_rate = max(self.min_rate, int(self.current_rate * 0.6))
elif p95_response_time > self.target_response_time * 1.5:
# 响应时间超标,适度降级
new_rate = max(self.min_rate, int(self.current_rate * 0.8))
elif p95_response_time < self.target_response_time * 0.7:
# 响应时间很好,适度提升
new_rate = min(self.max_rate, int(self.current_rate * 1.2))
elif p95_response_time < self.target_response_time * 0.5:
# 响应时间非常好,大幅提升
new_rate = min(self.max_rate, int(self.current_rate * 1.5))
else:
# 响应时间在可接受范围,保持当前速率
new_rate = self.current_rate
if new_rate != self.current_rate:
print(f"响应时间自适应调整: {self.current_rate} -> {new_rate}")
self.current_rate = new_rate
def allow_request(self) -> bool:
"""检查是否允许请求"""
with self.lock:
current_time = time.time()
# 每处理10个请求调整一次
if len(self.response_times) % 10 == 0:
self._adjust_rate_based_on_response_time()
# 滑动窗口限流
window_start = current_time - 1.0 # 1秒窗口
recent_requests = [t for t in self.request_times if t > window_start]
if len(recent_requests) < self.current_rate:
self.request_times.append(current_time)
return True
return False
def get_current_rate(self) -> int:
return self.current_rate
# 使用示例
if __name__ == "__main__":
limiter = ResponseTimeAdaptiveLimiter(initial_rate=50, target_response_time=0.2)
base_delay = 0.1
for i in range(200):
if limiter.allow_request():
# 模拟响应时间变化
if i < 50:
# 低负载阶段
response_time = base_delay + np.random.normal(0.05, 0.02)
elif i < 100:
# 高负载阶段
response_time = base_delay * 3 + np.random.normal(0.1, 0.05)
else:
# 恢复正常
response_time = base_delay + np.random.normal(0.05, 0.02)
limiter.record_response_time(response_time)
print(f"请求 {i:3} 完成, 响应时间: {response_time:.3f}s, 当前速率: {limiter.get_current_rate()}")
else:
print(f"请求 {i:3} 被限流, 当前速率: {limiter.get_current_rate()}")
time.sleep(0.05)
运行结果:
> python.exe .\adjust2.py
请求 0 完成, 响应时间: 0.139s, 当前速率: 50
请求 1 完成, 响应时间: 0.178s, 当前速率: 50
请求 2 完成, 响应时间: 0.167s, 当前速率: 50
请求 3 完成, 响应时间: 0.163s, 当前速率: 50
请求 4 完成, 响应时间: 0.166s, 当前速率: 50
请求 5 完成, 响应时间: 0.125s, 当前速率: 50
请求 6 完成, 响应时间: 0.163s, 当前速率: 50
请求 7 完成, 响应时间: 0.118s, 当前速率: 50
请求 8 完成, 响应时间: 0.147s, 当前速率: 50
请求 9 完成, 响应时间: 0.132s, 当前速率: 50
P50响应时间: 0.155s, P95响应时间: 0.173s
请求 10 完成, 响应时间: 0.164s, 当前速率: 50
请求 11 完成, 响应时间: 0.129s, 当前速率: 50
请求 12 完成, 响应时间: 0.124s, 当前速率: 50
请求 13 完成, 响应时间: 0.135s, 当前速率: 50
请求 14 完成, 响应时间: 0.140s, 当前速率: 50
请求 15 完成, 响应时间: 0.162s, 当前速率: 50
请求 16 完成, 响应时间: 0.149s, 当前速率: 50
请求 17 完成, 响应时间: 0.161s, 当前速率: 50
请求 18 完成, 响应时间: 0.144s, 当前速率: 50
请求 19 完成, 响应时间: 0.139s, 当前速率: 50
P50响应时间: 0.145s, P95响应时间: 0.168s
请求 20 完成, 响应时间: 0.101s, 当前速率: 50
请求 21 完成, 响应时间: 0.141s, 当前速率: 50
请求 22 完成, 响应时间: 0.135s, 当前速率: 50
请求 23 完成, 响应时间: 0.156s, 当前速率: 50
请求 24 完成, 响应时间: 0.167s, 当前速率: 50
请求 25 完成, 响应时间: 0.172s, 当前速率: 50
请求 26 完成, 响应时间: 0.162s, 当前速率: 50
请求 27 完成, 响应时间: 0.169s, 当前速率: 50
请求 28 完成, 响应时间: 0.134s, 当前速率: 50
请求 29 完成, 响应时间: 0.135s, 当前速率: 50
P50响应时间: 0.145s, P95响应时间: 0.171s
请求 30 完成, 响应时间: 0.148s, 当前速率: 50
请求 31 完成, 响应时间: 0.166s, 当前速率: 50
请求 32 完成, 响应时间: 0.145s, 当前速率: 50
请求 33 完成, 响应时间: 0.144s, 当前速率: 50
请求 34 完成, 响应时间: 0.159s, 当前速率: 50
请求 35 完成, 响应时间: 0.153s, 当前速率: 50
请求 36 完成, 响应时间: 0.144s, 当前速率: 50
请求 37 完成, 响应时间: 0.127s, 当前速率: 50
请求 38 完成, 响应时间: 0.153s, 当前速率: 50
请求 39 完成, 响应时间: 0.148s, 当前速率: 50
P50响应时间: 0.147s, P95响应时间: 0.170s
请求 40 完成, 响应时间: 0.155s, 当前速率: 50
请求 41 完成, 响应时间: 0.124s, 当前速率: 50
请求 42 完成, 响应时间: 0.159s, 当前速率: 50
请求 43 完成, 响应时间: 0.152s, 当前速率: 50
请求 44 完成, 响应时间: 0.167s, 当前速率: 50
请求 45 完成, 响应时间: 0.161s, 当前速率: 50
请求 46 完成, 响应时间: 0.152s, 当前速率: 50
请求 47 完成, 响应时间: 0.151s, 当前速率: 50
请求 48 完成, 响应时间: 0.155s, 当前速率: 50
请求 49 完成, 响应时间: 0.165s, 当前速率: 50
P50响应时间: 0.152s, P95响应时间: 0.168s
请求 50 完成, 响应时间: 0.359s, 当前速率: 50
请求 51 完成, 响应时间: 0.396s, 当前速率: 50
请求 52 完成, 响应时间: 0.400s, 当前速率: 50
请求 53 完成, 响应时间: 0.305s, 当前速率: 50
请求 54 完成, 响应时间: 0.356s, 当前速率: 50
请求 55 完成, 响应时间: 0.267s, 当前速率: 50
请求 56 完成, 响应时间: 0.426s, 当前速率: 50
请求 57 完成, 响应时间: 0.361s, 当前速率: 50
请求 58 完成, 响应时间: 0.475s, 当前速率: 50
请求 59 完成, 响应时间: 0.449s, 当前速率: 50
P50响应时间: 0.155s, P95响应时间: 0.402s
响应时间自适应调整: 50 -> 30
请求 60 完成, 响应时间: 0.370s, 当前速率: 30
请求 61 完成, 响应时间: 0.408s, 当前速率: 30
请求 62 完成, 响应时间: 0.336s, 当前速率: 30
请求 63 完成, 响应时间: 0.313s, 当前速率: 30
请求 64 完成, 响应时间: 0.452s, 当前速率: 30
请求 65 完成, 响应时间: 0.453s, 当前速率: 30
请求 66 完成, 响应时间: 0.401s, 当前速率: 30
请求 67 完成, 响应时间: 0.430s, 当前速率: 30
请求 68 完成, 响应时间: 0.421s, 当前速率: 30
请求 69 完成, 响应时间: 0.348s, 当前速率: 30
P50响应时间: 0.161s, P95响应时间: 0.440s
响应时间自适应调整: 30 -> 18
请求 70 完成, 响应时间: 0.284s, 当前速率: 18
请求 71 完成, 响应时间: 0.440s, 当前速率: 18
请求 72 完成, 响应时间: 0.466s, 当前速率: 18
请求 73 完成, 响应时间: 0.486s, 当前速率: 18
请求 74 完成, 响应时间: 0.403s, 当前速率: 18
请求 75 完成, 响应时间: 0.525s, 当前速率: 18
请求 76 完成, 响应时间: 0.443s, 当前速率: 18
请求 77 完成, 响应时间: 0.460s, 当前速率: 18
请求 78 完成, 响应时间: 0.430s, 当前速率: 18
请求 79 完成, 响应时间: 0.401s, 当前速率: 18
P50响应时间: 0.164s, P95响应时间: 0.461s
响应时间自适应调整: 18 -> 10
请求 80 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 81 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 82 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 83 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 84 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 85 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 86 被限流, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.461s
请求 87 完成, 响应时间: 0.443s, 当前速率: 10
请求 88 完成, 响应时间: 0.368s, 当前速率: 10
请求 89 完成, 响应时间: 0.398s, 当前速率: 10
请求 90 完成, 响应时间: 0.338s, 当前速率: 10
请求 91 完成, 响应时间: 0.385s, 当前速率: 10
请求 92 完成, 响应时间: 0.314s, 当前速率: 10
请求 93 完成, 响应时间: 0.400s, 当前速率: 10
请求 94 完成, 响应时间: 0.430s, 当前速率: 10
请求 95 完成, 响应时间: 0.441s, 当前速率: 10
请求 96 完成, 响应时间: 0.422s, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 97 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 98 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 99 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 100 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 101 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 102 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 103 被限流, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.457s
请求 104 完成, 响应时间: 0.106s, 当前速率: 10
请求 105 完成, 响应时间: 0.164s, 当前速率: 10
请求 106 完成, 响应时间: 0.149s, 当前速率: 10
请求 107 完成, 响应时间: 0.144s, 当前速率: 10
请求 108 完成, 响应时间: 0.112s, 当前速率: 10
请求 109 完成, 响应时间: 0.156s, 当前速率: 10
请求 110 完成, 响应时间: 0.172s, 当前速率: 10
请求 111 完成, 响应时间: 0.165s, 当前速率: 10
请求 112 完成, 响应时间: 0.133s, 当前速率: 10
请求 113 完成, 响应时间: 0.163s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 114 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 115 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 116 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 117 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 118 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 119 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 120 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 121 完成, 响应时间: 0.204s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 122 完成, 响应时间: 0.145s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 123 完成, 响应时间: 0.153s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 124 完成, 响应时间: 0.149s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 125 完成, 响应时间: 0.136s, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.454s
请求 126 完成, 响应时间: 0.134s, 当前速率: 10
P50响应时间: 0.164s, P95响应时间: 0.454s
请求 127 完成, 响应时间: 0.165s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 128 完成, 响应时间: 0.161s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 129 完成, 响应时间: 0.141s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 130 完成, 响应时间: 0.137s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 131 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 132 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 133 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 134 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 135 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 136 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 137 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 138 完成, 响应时间: 0.123s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 139 完成, 响应时间: 0.107s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 140 完成, 响应时间: 0.138s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 141 完成, 响应时间: 0.175s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 142 完成, 响应时间: 0.160s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 143 完成, 响应时间: 0.150s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 144 完成, 响应时间: 0.135s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 145 完成, 响应时间: 0.145s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 146 完成, 响应时间: 0.154s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 147 完成, 响应时间: 0.165s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 148 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 149 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 150 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 151 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 152 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 153 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 154 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 155 完成, 响应时间: 0.148s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 156 完成, 响应时间: 0.189s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 157 完成, 响应时间: 0.146s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 158 完成, 响应时间: 0.131s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 159 完成, 响应时间: 0.178s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 160 完成, 响应时间: 0.159s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 161 完成, 响应时间: 0.159s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 162 完成, 响应时间: 0.138s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 163 完成, 响应时间: 0.163s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 164 完成, 响应时间: 0.162s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 165 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 166 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 167 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 168 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 169 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 170 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 171 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 172 完成, 响应时间: 0.174s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 173 完成, 响应时间: 0.137s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 174 完成, 响应时间: 0.143s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 175 完成, 响应时间: 0.169s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 176 完成, 响应时间: 0.138s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 177 完成, 响应时间: 0.173s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 178 完成, 响应时间: 0.136s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 179 完成, 响应时间: 0.141s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 180 完成, 响应时间: 0.131s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 181 完成, 响应时间: 0.164s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 182 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 183 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 184 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 185 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 186 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 187 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 188 被限流, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 189 完成, 响应时间: 0.170s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 190 完成, 响应时间: 0.161s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 191 完成, 响应时间: 0.134s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 192 完成, 响应时间: 0.149s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 193 完成, 响应时间: 0.143s, 当前速率: 10
P50响应时间: 0.165s, P95响应时间: 0.454s
请求 194 完成, 响应时间: 0.166s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 195 完成, 响应时间: 0.126s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 196 完成, 响应时间: 0.117s, 当前速率: 10
P50响应时间: 0.166s, P95响应时间: 0.454s
请求 197 完成, 响应时间: 0.186s, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.454s
请求 198 完成, 响应时间: 0.123s, 当前速率: 10
P50响应时间: 0.167s, P95响应时间: 0.454s
请求 199 被限流, 当前速率: 10
1.4.3. AIMD (加性增/乘性减) 算法
import time
import threading
from collections import deque
class AIMDLimiter:
def __init__(self,
initial_rate: int = 10,
min_rate: int = 1,
max_rate: int = 1000,
ai_factor: float = 1.0, # 加性增加
md_factor: float = 0.5, # 乘性减少
window_size: int = 10):
"""
AIMD (Additive Increase Multiplicative Decrease) 限流器
"""
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.ai_factor = ai_factor
self.md_factor = md_factor
self.window_size = window_size
self.error_rates = deque(maxlen=window_size)
self.request_count = 0
self.error_count = 0
self.lock = threading.Lock()
def record_result(self, success: bool):
"""记录请求结果"""
with self.lock:
self.request_count += 1
if not success:
self.error_count += 1
# 每window_size个请求计算一次错误率
if self.request_count >= self.window_size:
error_rate = self.error_count / self.request_count
self.error_rates.append(error_rate)
self._adjust_rate(error_rate)
# 重置计数器
self.request_count = 0
self.error_count = 0
def _adjust_rate(self, error_rate: float):
"""根据错误率调整速率"""
error_threshold = 0.1 # 10%错误率阈值
if error_rate > error_threshold:
# 乘性减少
new_rate = max(self.min_rate, int(self.current_rate * self.md_factor))
print(f"错误率 {error_rate:.2%} 过高,乘性减少: {self.current_rate} -> {new_rate}")
else:
# 加性增加
new_rate = min(self.max_rate, self.current_rate + self.ai_factor)
print(f"错误率 {error_rate:.2%} 正常,加性增加: {self.current_rate} -> {new_rate}")
self.current_rate = new_rate
def allow_request(self) -> bool:
"""检查是否允许请求"""
with self.lock:
# 简单的计数器限流
if len(self.error_rates) * self.window_size < self.current_rate:
return True
return False
def get_current_rate(self) -> int:
return self.current_rate
# 使用示例
if __name__ == "__main__":
limiter = AIMDLimiter(initial_rate=10, ai_factor=2, md_factor=0.7)
# 模拟不同错误率的场景
scenarios = [
[True] * 8 + [False] * 2, # 20%错误率
[True] * 9 + [False] * 1, # 10%错误率
[True] * 10, # 0%错误率
[True] * 5 + [False] * 5, # 50%错误率
]
for i, scenario in enumerate(scenarios):
print(f"\n=== 场景 {i+1} ===")
for success in scenario:
if limiter.allow_request():
limiter.record_result(success)
status = "成功" if success else "失败"
print(f"请求 {status}, 当前速率: {limiter.get_current_rate()}")
else:
print(f"请求被限流, 当前速率: {limiter.get_current_rate()}")
time.sleep(0.1)
运行结果:
> python.exe .\adjust3.py
=== 场景 1 ===
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 成功, 当前速率: 10
请求 失败, 当前速率: 10
错误率 20.00% 过高,乘性减少: 10 -> 7
请求 失败, 当前速率: 7
=== 场景 2 ===
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
=== 场景 3 ===
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
=== 场景 4 ===
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
请求被限流, 当前速率: 7
1.4.4. 基于机器学习的自适应限流
环境依赖:
pip install scikit-learn -i https://pypi.tuna.tsinghua.edu.cn/simple
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import deque
import threading
import time
class MLAdaptiveLimiter:
def __init__(self,
initial_rate: int = 100,
min_rate: int = 10,
max_rate: int = 1000,
features_window: int = 100):
"""
基于机器学习的自适应限流器
"""
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.features_window = features_window
# 特征存储
self.features = deque(maxlen=features_window)
self.labels = deque(maxlen=features_window) # 1正常, -1异常
# 异常检测模型
self.model = IsolationForest(contamination=0.1, random_state=42)
self.is_model_trained = False
self.lock = threading.Lock()
self.metrics_window = deque(maxlen=50)
def extract_features(self) -> np.ndarray:
"""提取当前系统特征"""
# 这里简化实现,实际中可以从监控系统获取更多指标
if not self.metrics_window:
return np.array([[0, 0, 0]])
recent_metrics = list(self.metrics_window)
# 计算统计特征
request_rates = [m['request_rate'] for m in recent_metrics]
response_times = [m['response_time'] for m in recent_metrics]
error_rates = [m['error_rate'] for m in recent_metrics]
features = np.array([[
np.mean(request_rates), # 平均请求率
np.std(request_rates), # 请求率标准差
np.mean(response_times), # 平均响应时间
np.std(response_times), # 响应时间标准差
np.mean(error_rates), # 平均错误率
]])
return features
def record_metrics(self, request_rate: float, response_time: float, error_rate: float):
"""记录系统指标"""
with self.lock:
self.metrics_window.append({
'request_rate': request_rate,
'response_time': response_time,
'error_rate': error_rate
})
# 提取特征并判断是否异常
features = self.extract_features()
if len(self.features) >= 10: # 有足够训练数据
if not self.is_model_trained:
# 训练模型
X_train = np.array(list(self.features))
self.model.fit(X_train)
self.is_model_trained = True
# 预测当前状态
prediction = self.model.predict(features)[0]
# 根据预测结果调整限流
if prediction == -1: # 异常
self.current_rate = max(self.min_rate, int(self.current_rate * 0.7))
print(f"检测到异常状态,降低限流速率: {self.current_rate}")
else: # 正常
self.current_rate = min(self.max_rate, int(self.current_rate * 1.1))
print(f"系统状态正常,提高限流速率: {self.current_rate}")
# 存储特征和标签(简化标签生成)
label = 1 if error_rate < 0.1 and response_time < 1.0 else -1
self.features.append(features[0])
self.labels.append(label)
def allow_request(self) -> bool:
"""检查是否允许请求"""
with self.lock:
# 简单的计数器限流
current_count = len([m for m in self.metrics_window
if time.time() - m.get('timestamp', 0) < 1.0])
return current_count < self.current_rate
def get_current_rate(self) -> int:
return self.current_rate
# 使用示例
if __name__ == "__main__":
limiter = MLAdaptiveLimiter(initial_rate=50)
# 模拟指标数据
for i in range(100):
# 模拟变化的系统指标
if i < 30:
# 正常阶段
request_rate = 50 + np.random.normal(0, 5)
response_time = 0.1 + np.random.normal(0, 0.02)
error_rate = 0.02
elif i < 60:
# 异常阶段
request_rate = 80 + np.random.normal(0, 10)
response_time = 0.5 + np.random.normal(0, 0.1)
error_rate = 0.15
else:
# 恢复阶段
request_rate = 40 + np.random.normal(0, 5)
response_time = 0.1 + np.random.normal(0, 0.02)
error_rate = 0.03
limiter.record_metrics(request_rate, response_time, error_rate)
if limiter.allow_request():
print(f"请求 {i:2} 通过, 速率: {limiter.get_current_rate()}")
else:
print(f"请求 {i:2} 限流, 速率: {limiter.get_current_rate()}")
time.sleep(0.1)
运行结果:
> python.exe .\adjust4.py
请求 0 通过, 速率: 50
请求 1 通过, 速率: 50
请求 2 通过, 速率: 50
请求 3 通过, 速率: 50
请求 4 通过, 速率: 50
请求 5 通过, 速率: 50
请求 6 通过, 速率: 50
请求 7 通过, 速率: 50
请求 8 通过, 速率: 50
请求 9 通过, 速率: 50
系统状态正常,提高限流速率: 55
请求 10 通过, 速率: 55
系统状态正常,提高限流速率: 60
请求 11 通过, 速率: 60
系统状态正常,提高限流速率: 66
请求 12 通过, 速率: 66
系统状态正常,提高限流速率: 72
请求 13 通过, 速率: 72
系统状态正常,提高限流速率: 79
请求 14 通过, 速率: 79
系统状态正常,提高限流速率: 86
请求 15 通过, 速率: 86
系统状态正常,提高限流速率: 94
请求 16 通过, 速率: 94
系统状态正常,提高限流速率: 103
请求 17 通过, 速率: 103
系统状态正常,提高限流速率: 113
请求 18 通过, 速率: 113
系统状态正常,提高限流速率: 124
请求 19 通过, 速率: 124
系统状态正常,提高限流速率: 136
请求 20 通过, 速率: 136
系统状态正常,提高限流速率: 149
请求 21 通过, 速率: 149
系统状态正常,提高限流速率: 163
请求 22 通过, 速率: 163
系统状态正常,提高限流速率: 179
请求 23 通过, 速率: 179
系统状态正常,提高限流速率: 196
请求 24 通过, 速率: 196
系统状态正常,提高限流速率: 215
请求 25 通过, 速率: 215
系统状态正常,提高限流速率: 236
请求 26 通过, 速率: 236
系统状态正常,提高限流速率: 259
请求 27 通过, 速率: 259
系统状态正常,提高限流速率: 284
请求 28 通过, 速率: 284
系统状态正常,提高限流速率: 312
请求 29 通过, 速率: 312
系统状态正常,提高限流速率: 343
请求 30 通过, 速率: 343
系统状态正常,提高限流速率: 377
请求 31 通过, 速率: 377
系统状态正常,提高限流速率: 414
请求 32 通过, 速率: 414
系统状态正常,提高限流速率: 455
请求 33 通过, 速率: 455
系统状态正常,提高限流速率: 500
请求 34 通过, 速率: 500
系统状态正常,提高限流速率: 550
请求 35 通过, 速率: 550
系统状态正常,提高限流速率: 605
请求 36 通过, 速率: 605
系统状态正常,提高限流速率: 665
请求 37 通过, 速率: 665
系统状态正常,提高限流速率: 731
请求 38 通过, 速率: 731
系统状态正常,提高限流速率: 804
请求 39 通过, 速率: 804
系统状态正常,提高限流速率: 884
请求 40 通过, 速率: 884
系统状态正常,提高限流速率: 972
请求 41 通过, 速率: 972
系统状态正常,提高限流速率: 1000
请求 42 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 43 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 44 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 45 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 46 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 47 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 48 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 49 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 50 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 51 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 52 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 53 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 54 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 55 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 56 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 57 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 58 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 59 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 60 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 61 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 62 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 63 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 64 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 65 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 66 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 67 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 68 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 69 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 70 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 71 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 72 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 73 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 74 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 75 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 76 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 77 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 78 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 79 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 80 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 81 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 82 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 83 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 84 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 85 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 86 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 87 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 88 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 89 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 90 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 91 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 92 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 93 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 94 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 95 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 96 通过, 速率: 1000
系统状态正常,提高限流速率: 1000
请求 97 通过, 速率: 1000
检测到异常状态,降低限流速率: 700
请求 98 通过, 速率: 700
检测到异常状态,降低限流速率: 489
请求 99 通过, 速率: 489
1.4.5. 混合自适应限流策略
import time
import threading
from collections import deque
import numpy as np
class HybridAdaptiveLimiter:
def __init__(self,
initial_rate: int = 100,
min_rate: int = 10,
max_rate: int = 1000):
"""
混合自适应限流器 - 结合多种策略
"""
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
# 多个监控窗口
# 响应时间
self.response_times = deque(maxlen=100)
# 错误率
self.error_rates = deque(maxlen=50)
# 系统负载
self.system_metrics = deque(maxlen=30)
self.lock = threading.Lock()
self.last_adjustment = time.time()
# 权重配置
self.weights = {
'response_time': 0.4,
'error_rate': 0.3,
'system_load': 0.3
}
def _calculate_adjustment_factors(self) -> dict:
"""计算各个维度的调整因子"""
factors = {}
# 响应时间因子
if self.response_times:
# mean 是 NumPy 库中用于计算平均值的函数,可沿指定轴计算数组元素的算术平均值。
avg_response_time = np.mean(list(self.response_times))
print("avg_response_time", avg_response_time, len(self.response_times))
# 假设目标响应时间为0.2s
factors['response_time'] = max(0.5, min(2.0, 0.2 / avg_response_time))
else:
factors['response_time'] = 1.0
# 错误率因子
if self.error_rates:
avg_error_rate = np.mean(list(self.error_rates))
print("avg_error_rate", avg_error_rate, len(self.error_rates))
# 目标错误率5%
factors['error_rate'] = max(0.3, min(2.0, 0.05 / max(avg_error_rate, 0.001)))
else:
factors['error_rate'] = 1.0
# 系统负载因子(简化)
factors['system_load'] = 1.0 # 实际中可以从系统监控获取
return factors
def _calculate_new_rate(self) -> int:
"""计算新的限流速率"""
factors = self._calculate_adjustment_factors()
# 加权计算总调整因子
total_factor = 0
for factor_name, weight in self.weights.items():
total_factor += factors[factor_name] * weight
# 应用调整
new_rate = int(self.current_rate * total_factor)
new_rate = max(self.min_rate, min(self.max_rate, new_rate))
print(f"速率调整: {self.current_rate} -> {new_rate}")
print(f"调整因子: {factors}, 总因子: {total_factor:.3f}")
return new_rate
def record_metrics(self, response_time: float = None, success: bool = None, system_load: float = None):
"""记录监控指标"""
with self.lock:
if response_time is not None:
self.response_times.append(response_time)
if success is not None:
self.error_rates.append(0 if success else 1)
if system_load is not None:
self.system_metrics.append(system_load)
# 每5秒调整一次
current_time = time.time()
if current_time - self.last_adjustment >= 5:
self.current_rate = self._calculate_new_rate()
self.last_adjustment = current_time
def allow_request(self) -> bool:
"""检查是否允许请求"""
with self.lock:
# 滑动窗口限流
current_time = time.time()
recent_requests = [t for t in self.response_times if current_time - t < 1.0] # 1秒窗口
return len(recent_requests) < self.current_rate
def get_current_rate(self) -> int:
return self.current_rate
# 使用示例
if __name__ == "__main__":
limiter = HybridAdaptiveLimiter(initial_rate=50)
# 模拟不同场景
scenarios = [
{'response_time': 0.1, 'success': True, 'load': 0.3}, # 良好状态
{'response_time': 0.3, 'success': False, 'load': 0.8}, # 压力状态
{'response_time': 0.5, 'success': False, 'load': 0.9}, # 过载状态
{'response_time': 0.15, 'success': True, 'load': 0.4}, # 恢复状态
]
for i in range(100):
scenario = scenarios[i % len(scenarios)]
limiter.record_metrics(
response_time=scenario['response_time'] + np.random.normal(0, 0.02),
success=scenario['success'],
system_load=scenario['load'] + np.random.normal(0, 0.1)
)
if limiter.allow_request():
print(f"请求 {i:2} 通过, 速率: {limiter.get_current_rate()}")
else:
print(f"请求 {i:2} 限流, 速率: {limiter.get_current_rate()}")
time.sleep(0.1)
运行结果:
> python.exe .\adjust5.py
请求 0 通过, 速率: 50
请求 1 通过, 速率: 50
请求 2 通过, 速率: 50
请求 3 通过, 速率: 50
请求 4 通过, 速率: 50
请求 5 通过, 速率: 50
请求 6 通过, 速率: 50
请求 7 通过, 速率: 50
请求 8 通过, 速率: 50
请求 9 通过, 速率: 50
请求 10 通过, 速率: 50
请求 11 通过, 速率: 50
请求 12 通过, 速率: 50
请求 13 通过, 速率: 50
请求 14 通过, 速率: 50
请求 15 通过, 速率: 50
请求 16 通过, 速率: 50
请求 17 通过, 速率: 50
请求 18 通过, 速率: 50
请求 19 通过, 速率: 50
请求 20 通过, 速率: 50
请求 21 通过, 速率: 50
请求 22 通过, 速率: 50
请求 23 通过, 速率: 50
请求 24 通过, 速率: 50
请求 25 通过, 速率: 50
请求 26 通过, 速率: 50
请求 27 通过, 速率: 50
请求 28 通过, 速率: 50
请求 29 通过, 速率: 50
请求 30 通过, 速率: 50
请求 31 通过, 速率: 50
请求 32 通过, 速率: 50
请求 33 通过, 速率: 50
请求 34 通过, 速率: 50
请求 35 通过, 速率: 50
请求 36 通过, 速率: 50
请求 37 通过, 速率: 50
请求 38 通过, 速率: 50
请求 39 通过, 速率: 50
请求 40 通过, 速率: 50
请求 41 通过, 速率: 50
请求 42 通过, 速率: 50
请求 43 通过, 速率: 50
请求 44 通过, 速率: 50
请求 45 通过, 速率: 50
avg_response_time 0.2667061164917195 47
avg_error_rate 0.5106382978723404 47
速率调整: 50 -> 34
调整因子: {'response_time': np.float64(0.7498890637785934), 'error_rate': 0.3, 'system_load': 1.0}, 总因子: 0.690
请求 46 通过, 速率: 34
请求 47 通过, 速率: 34
请求 48 通过, 速率: 34
请求 49 通过, 速率: 34
请求 50 通过, 速率: 34
请求 51 通过, 速率: 34
请求 52 通过, 速率: 34
请求 53 通过, 速率: 34
请求 54 通过, 速率: 34
请求 55 通过, 速率: 34
请求 56 通过, 速率: 34
请求 57 通过, 速率: 34
请求 58 通过, 速率: 34
请求 59 通过, 速率: 34
请求 60 通过, 速率: 34
请求 61 通过, 速率: 34
请求 62 通过, 速率: 34
请求 63 通过, 速率: 34
请求 64 通过, 速率: 34
请求 65 通过, 速率: 34
请求 66 通过, 速率: 34
请求 67 通过, 速率: 34
请求 68 通过, 速率: 34
请求 69 通过, 速率: 34
请求 70 通过, 速率: 34
请求 71 通过, 速率: 34
请求 72 通过, 速率: 34
请求 73 通过, 速率: 34
请求 74 通过, 速率: 34
请求 75 通过, 速率: 34
请求 76 通过, 速率: 34
请求 77 通过, 速率: 34
请求 78 通过, 速率: 34
请求 79 通过, 速率: 34
请求 80 通过, 速率: 34
请求 81 通过, 速率: 34
请求 82 通过, 速率: 34
请求 83 通过, 速率: 34
请求 84 通过, 速率: 34
请求 85 通过, 速率: 34
请求 86 通过, 速率: 34
请求 87 通过, 速率: 34
请求 88 通过, 速率: 34
请求 89 通过, 速率: 34
请求 90 通过, 速率: 34
请求 91 通过, 速率: 34
请求 92 通过, 速率: 34
avg_response_time 0.26144230542232283 94
avg_error_rate 0.5 50
速率调整: 34 -> 23
调整因子: {'response_time': np.float64(0.7649871342624848), 'error_rate': 0.3, 'system_load': 1.0}, 总因子: 0.696
请求 93 通过, 速率: 23
请求 94 通过, 速率: 23
请求 95 通过, 速率: 23
请求 96 通过, 速率: 23
请求 97 通过, 速率: 23
请求 98 通过, 速率: 23
请求 99 通过, 速率: 23
1.5. 流式输出专用限流器
1.5.1. 字符级平滑限流
public class StreamingOutputRateLimiter {
private final RateLimiter characterLimiter; // 字符输出限流
private final RateLimiter chunkLimiter; // 数据块限流
private final ScheduledExecutorService scheduler;
// 限流配置
private final int maxCharsPerSecond;
private final int maxChunksPerSecond;
private final int minChunkSize;
private final int maxChunkSize;
public StreamingOutputRateLimiter(int maxCharsPerSecond, int maxChunksPerSecond) {
this.maxCharsPerSecond = maxCharsPerSecond;
this.maxChunksPerSecond = maxChunksPerSecond;
this.minChunkSize = 1;
this.maxChunkSize = Math.max(1, maxCharsPerSecond / maxChunksPerSecond);
this.characterLimiter = new TokenBucketRateLimiter(maxCharsPerSecond, maxCharsPerSecond);
this.chunkLimiter = new TokenBucketRateLimiter(maxChunksPerSecond, maxChunksPerSecond);
this.scheduler = Executors.newScheduledThreadPool(1);
}
public CompletableFuture<Void> streamOutput(String content, Consumer<String> outputConsumer) {
return CompletableFuture.runAsync(() -> {
try {
streamContentSmoothly(content, outputConsumer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("流式输出被中断", e);
}
});
}
private void streamContentSmoothly(String content, Consumer<String> outputConsumer)
throws InterruptedException {
char[] characters = content.toCharArray();
int position = 0;
while (position < characters.length) {
// 获取数据块许可
chunkLimiter.acquire(1);
// 确定当前块的大小
int chunkSize = calculateChunkSize(characters.length - position);
// 获取字符输出许可
if (!characterLimiter.tryAcquire(chunkSize)) {
// 令牌不足,等待并重试
Thread.sleep(calculateWaitTime(chunkSize));
continue;
}
// 输出数据块
String chunk = new String(characters, position, chunkSize);
outputConsumer.accept(chunk);
position += chunkSize;
// 添加微小延迟,使输出更平滑
if (position < characters.length) {
Thread.sleep(10);
}
}
}
private int calculateChunkSize(int remainingChars) {
// 动态调整块大小,保持输出平滑
int idealChunkSize = maxCharsPerSecond / maxChunksPerSecond;
return Math.min(Math.min(idealChunkSize, maxChunkSize), remainingChars);
}
private long calculateWaitTime(int requiredTokens) {
double missingTokens = requiredTokens - characterLimiter.getAvailableTokens();
return (long) (missingTokens / (maxCharsPerSecond / 1000.0));
}
// 动态调整限流参数
public void adjustRate(int newCharsPerSecond, int newChunksPerSecond) {
// 实现动态调整逻辑
}
public void shutdown() {
scheduler.shutdown();
}
}
1.5.2. 优先级限流策略
public class PriorityRateLimiter {
private final Map<Integer, RateLimiter> priorityLimiters;
private final int baseRate;
public PriorityRateLimiter(int baseRate) {
this.baseRate = baseRate;
this.priorityLimiters = new ConcurrentHashMap<>();
// 初始化不同优先级的限流器
// 优先级越高,限流越宽松
priorityLimiters.put(1, new TokenBucketRateLimiter(baseRate / 4, baseRate / 4)); // 低优先级
priorityLimiters.put(2, new TokenBucketRateLimiter(baseRate / 2, baseRate / 2)); // 中优先级
priorityLimiters.put(3, new TokenBucketRateLimiter(baseRate, baseRate)); // 高优先级
priorityLimiters.put(4, new TokenBucketRateLimiter(baseRate * 2, baseRate * 2)); // 最高优先级
}
public boolean tryAcquire(int priority) {
RateLimiter limiter = priorityLimiters.get(priority);
if (limiter == null) {
limiter = priorityLimiters.get(2); // 默认中优先级
}
return limiter.tryAcquire(1);
}
public long acquire(int priority) {
RateLimiter limiter = priorityLimiters.get(priority);
if (limiter == null) {
limiter = priorityLimiters.get(2);
}
return limiter.acquire(1);
}
// 为重要数据设置高优先级
public boolean tryAcquireForImportantData() {
return tryAcquire(4);
}
// 为普通数据设置中优先级
public boolean tryAcquireForNormalData() {
return tryAcquire(2);
}
}
1.6. 监控和统计
@Component
public class RateLimitMonitor {
private final MeterRegistry meterRegistry;
private final Map<String, Counter> limitCounters = new ConcurrentHashMap<>();
private final Map<String, Timer> requestTimers = new ConcurrentHashMap<>();
public RateLimitMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordRequest(String endpoint, boolean limited, long duration) {
// 记录请求指标
String status = limited ? "limited" : "allowed";
Counter counter = limitCounters.computeIfAbsent(
endpoint + "." + status,
key -> Counter.builder("ratelimit.requests")
.tag("endpoint", endpoint)
.tag("status", status)
.register(meterRegistry)
);
counter.increment();
// 记录响应时间
Timer timer = requestTimers.computeIfAbsent(
endpoint,
key -> Timer.builder("ratelimit.duration")
.tag("endpoint", endpoint)
.register(meterRegistry)
);
timer.record(duration, TimeUnit.MILLISECONDS);
}
public void recordSystemLoad(double load) {
Gauge.builder("ratelimit.system.load", () -> load)
.register(meterRegistry);
}
public Map<String, Object> getRateLimitStats() {
Map<String, Object> stats = new HashMap<>();
// 实现统计信息收集
return stats;
}
}
1.7. 配置管理
@Configuration
@ConfigurationProperties(prefix = "ratelimit")
@Data
public class RateLimitConfig {
private boolean enabled = true;
private int defaultRate = 100; // 默认每秒100个请求
private int burstCapacity = 150; // 突发容量
private long timeoutMillis = 1000; // 超时时间
private Map<String, Integer> endpoints = new HashMap<>();
// 流式输出特殊配置
private Streaming streaming = new Streaming();
@Data
public static class Streaming {
private int charsPerSecond = 50; // 每秒字符数
private int chunksPerSecond = 10; // 每秒数据块数
private int minChunkSize = 1; // 最小块大小
private int maxChunkSize = 10; // 最大块大小
}
public int getRateForEndpoint(String endpoint) {
return endpoints.getOrDefault(endpoint, defaultRate);
}
}
2. 代码分析
class AppGenerateService:
system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400)
@classmethod
def generate(
cls,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
):
"""
App Content Generate
:param app_model: app model
:param user: user
:param args: args
:param invoke_from: invoke from
:param streaming: streaming
:return:
"""
# system level rate limiter
# 默认 BILLING_ENABLED 为false,计费功能不开启
if dify_config.BILLING_ENABLED:
# check if it's free plan
limit_info = BillingService.get_info(app_model.tenant_id)
if limit_info["subscription"]["plan"] == "sandbox":
if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id):
raise InvokeRateLimitError(
"Rate limit exceeded, please upgrade your plan "
f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day"
)
cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id)
# app level rate limiter
# 获取平滑逻辑配置,最大请求数
max_active_request = cls._get_max_active_requests(app_model)
rate_limit = RateLimit(app_model.id, max_active_request)
# staticmethod 静态方法。生成uuid
request_id = RateLimit.gen_request_key()
try:
# 执行平滑限流策略
request_id = rate_limit.enter(request_id)
if app_model.mode == AppMode.COMPLETION.value:
# 平滑流式输出
return rate_limit.generate(
CompletionAppGenerator.convert_to_event_stream(
CompletionAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
),
),
request_id=request_id,
)
elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
# 平滑流式输出
return rate_limit.generate(
AgentChatAppGenerator.convert_to_event_stream(
AgentChatAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
),
),
request_id,
)
elif app_model.mode == AppMode.CHAT.value:
# 平滑流式输出
return rate_limit.generate(
ChatAppGenerator.convert_to_event_stream(
ChatAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
),
),
request_id=request_id,
)
elif app_model.mode == AppMode.ADVANCED_CHAT.value:
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
# 平滑流式输出
return rate_limit.generate(
AdvancedChatAppGenerator.convert_to_event_stream(
AdvancedChatAppGenerator().generate(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
),
),
request_id=request_id,
)
elif app_model.mode == AppMode.WORKFLOW.value:
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
# 平滑流式输出
return rate_limit.generate(
WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().generate(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
call_depth=0,
workflow_thread_pool_id=None,
),
),
request_id,
)
else:
raise ValueError(f"Invalid app mode {app_model.mode}")
except RateLimitError as e:
raise InvokeRateLimitError(str(e))
except Exception:
rate_limit.exit(request_id)
raise
finally:
if not streaming:
rate_limit.exit(request_id)
2.1. 计费方式
-
云服务版 (Dify Cloud)
如果使用的是官方的 Dify Cloud 服务:
计费功能默认开启
在 工作空间设置 → 计费与套餐 中管理
需要绑定支付方式 -
自部署版本
社区版 (Community Edition)
社区版默认没有计费功能,需要企业版或自行开发。
企业版 (Enterprise Edition)
在企业版中,计费配置通常在以下位置:
- 环境变量配置
# 启用计费功能
BILLING_ENABLED=true
# Stripe 支付配置(国际)
STRIPE_API_KEY=sk_test_...
STRIPE_WEBHOOK_SECRET=whsec_...
STRIPE_PRICE_ID=price_...
# 或 支付宝/微信支付配置(国内)
ALIPAY_APP_ID=...
ALIPAY_PRIVATE_KEY=...
WECHATPAY_MCH_ID=...
- 配置文件
在 docker-compose.yml 或环境配置文件中:
environment:
- BILLING_ENABLED=true
- STRIPE_API_KEY=your_stripe_key
- STRIPE_WEBHOOK_SECRET=your_webhook_secret
2.2. 获取限流配置
class AppGenerateService:
system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400)
......
@staticmethod
def _get_max_active_requests(app: App) -> int:
"""
Get the maximum number of active requests allowed for an app.
Returns the smaller value between app's custom limit and global config limit.
A value of 0 means infinite (no limit).
Args:
app: The App model instance
Returns:
The maximum number of active requests allowed
"""
app_limit = app.max_active_requests or 0
config_limit = dify_config.APP_MAX_ACTIVE_REQUESTS
# Filter out infinite (0) values and return the minimum, or 0 if both are infinite
limits = [limit for limit in [app_limit, config_limit] if limit > 0]
return min(limits) if limits else 0
2.3. 平滑限流逻辑
2.3.1. redis 数据存储
$ redis-cli -p 6379 -a difyai123456
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
127.0.0.1:6379> keys *
1) "dify:rate_limit:448e6473-433e-464c-a852-75efd5688eab:max_active_requests"
2) "provider_model_credentials:tenant_id:2e475914-d5b3-44eb-9843-91cdc3763371:id:78c45ce2-78fe-4cb2-9937-c0baecdc3144"
3) "plugin_daemon:cluster-nodes-status-hash-map"
4) "plugin_daemon:plugin_state"
5) "account_refresh_token:626b102f-03c9-485d-b557-724fac2cc527"
6) "refresh_token:967aca877a93cf31b5990e2c365b2ec0c57352e5505f00aad5043b2e97cb246a60c91fc7ed9a1e7a05bb733ac43a9d9634303de69e921cf1eb0ed63cd7396ee6"
7) "plugin_daemon:cluster-master-preemption-lock"
127.0.0.1:6379>
2.3.2. 逻辑分析
class RateLimit:
_MAX_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:max_active_requests"
_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:active_requests"
_UNLIMITED_REQUEST_ID = "unlimited_request_id"
_REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes
_ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes
_instance_dict: dict[str, "RateLimit"] = {}
def __new__(cls: type["RateLimit"], client_id: str, max_active_requests: int):
if client_id not in cls._instance_dict:
instance = super().__new__(cls)
cls._instance_dict[client_id] = instance
return cls._instance_dict[client_id]
def __init__(self, client_id: str, max_active_requests: int):
self.max_active_requests = max_active_requests
# must be called after max_active_requests is set
if self.disabled():
return
if hasattr(self, "initialized"):
return
self.initialized = True
self.client_id = client_id
self.active_requests_key = self._ACTIVE_REQUESTS_KEY.format(client_id)
self.max_active_requests_key = self._MAX_ACTIVE_REQUESTS_KEY.format(client_id)
self.last_recalculate_time = float("-inf")
self.flush_cache(use_local_value=True)
def flush_cache(self, use_local_value=False):
if self.disabled():
return
self.last_recalculate_time = time.time()
# flush max active requests
if use_local_value or not redis_client.exists(self.max_active_requests_key):
redis_client.setex(self.max_active_requests_key, timedelta(days=1), self.max_active_requests)
else:
self.max_active_requests = int(redis_client.get(self.max_active_requests_key).decode("utf-8"))
redis_client.expire(self.max_active_requests_key, timedelta(days=1))
# flush max active requests (in-transit request list)
if not redis_client.exists(self.active_requests_key):
return
request_details = redis_client.hgetall(self.active_requests_key)
redis_client.expire(self.active_requests_key, timedelta(days=1))
timeout_requests = [
k
for k, v in request_details.items()
if time.time() - float(v.decode("utf-8")) > RateLimit._REQUEST_MAX_ALIVE_TIME
]
if timeout_requests:
redis_client.hdel(self.active_requests_key, *timeout_requests)
# 执行平滑限流策略
def enter(self, request_id: Optional[str] = None) -> str:
if self.disabled():
return RateLimit._UNLIMITED_REQUEST_ID
if time.time() - self.last_recalculate_time > RateLimit._ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL:
self.flush_cache()
if not request_id:
request_id = RateLimit.gen_request_key()
# dify:rate_limit:448e6473-433e-464c-a852-75efd5688eab:active_requests
active_requests_count = redis_client.hlen(self.active_requests_key)
if active_requests_count >= self.max_active_requests:
raise AppInvokeQuotaExceededError(
f"Too many requests. Please try again later. The current maximum concurrent requests allowed "
f"for {self.client_id} is {self.max_active_requests}."
)
redis_client.hset(self.active_requests_key, request_id, str(time.time()))
return request_id
def exit(self, request_id: str):
if request_id == RateLimit._UNLIMITED_REQUEST_ID:
return
redis_client.hdel(self.active_requests_key, request_id)
def disabled(self):
return self.max_active_requests <= 0
# 生成uuid
@staticmethod
def gen_request_key() -> str:
return str(uuid.uuid4())
# 平滑流式输出
def generate(self, generator: Union[Generator[str, None, None], Mapping[str, Any]], request_id: str):
if isinstance(generator, Mapping):
return generator
else:
return RateLimitGenerator(
rate_limit=self,
generator=generator, # ty: ignore [invalid-argument-type]
request_id=request_id,
)
class RateLimitGenerator:
def __init__(self, rate_limit: RateLimit, generator: Generator[str, None, None], request_id: str):
self.rate_limit = rate_limit
self.generator = generator
self.request_id = request_id
self.closed = False
def __iter__(self):
return self
def __next__(self):
if self.closed:
raise StopIteration
try:
return next(self.generator)
except Exception:
self.close()
raise
def close(self):
if not self.closed:
self.closed = True
self.rate_limit.exit(self.request_id)
if self.generator is not None and hasattr(self.generator, "close"):
self.generator.close()
更多推荐



所有评论(0)