🔥 Sentinel 流控与熔断机制源码剖析

→ SlotChain 执行链模型

📋 目录

  • 🏗️ 一、Sentinel 整体架构与设计哲学
  • ⛓️ 二、SlotChain 责任链机制深度解析
  • 📊 三、流控规则与算法实现
  • ⚡ 四、熔断降级策略源码剖析
  • 🔄 五、动态规则管理与推送机制
  • 🌐 六、配置中心集成实战
  • 💡 七、生产环境最佳实践

🏗️ 一、Sentinel 整体架构与设计哲学

💡 Sentinel 核心架构图

Sentinel 分层架构设计

入口资源
SlotChain 责任链
NodeSelectorSlot
ClusterBuilderSlot
LogSlot
StatisticSlot
AuthoritySlot
SystemSlot
FlowSlot
DegradeSlot
规则管理器
流控规则
降级规则
系统规则
授权规则
指标统计
滑动窗口
指标持久化
监控告警

🎯 核心设计理念

Sentinel 设计哲学

/**
 * Sentinel 核心设计理念
 * 基于责任链模式的流量控制框架
 */
public class SentinelDesignPhilosophy {
    
    /**
     * 1. 资源为核心
     * 任何需要流量控制的地方都可以定义为资源
     */
    public interface Resource {
        String getName();
        EntryType getType();
        int getResourceType();
    }
    
    /**
     * 2. 槽位链设计
     * 通过责任链模式实现可扩展的流量控制
     */
    public interface SlotChain {
        void entry(Context context, ResourceWrapper resourceWrapper, 
                  Object obj, int count, boolean prioritized, Object... args) throws BlockException;
        void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
    }
    
    /**
     * 3. 上下文传播
     * 保证调用链路的上下文一致性
     */
    public class Context {
        private String name;
        private DefaultNode entranceNode;
        private String origin = "";
        private Entry curEntry;
    }
    
    /**
     * 4. 指标统计
     * 基于滑动窗口的实时指标收集
     */
    public interface Metric {
        long success();
        long block();
        long total();
        double avgRt();
    }
}

⛓️ 二、SlotChain 责任链机制深度解析

🔗 SlotChain 执行流程

SlotChain 核心执行机制

/**
 * SlotChain 责任链核心实现
 * 采用责任链模式处理流量控制逻辑
 */
@Component
@Slf4j
public class DefaultSlotChain implements ProcessorSlotChain {
    
    private AbstractLinkedProcessorSlot<?> first = new DefaultNodeSelectorSlot();
    private AbstractLinkedProcessorSlot<?> end = first;
    
    /**
     * 入口处理 - 责任链正向执行
     */
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     Object obj, int count, boolean prioritized, Object... args) throws BlockException {
        first.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
    }
    
    /**
     * 出口处理 - 责任链反向执行
     */
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        first.exit(context, resourceWrapper, count, args);
    }
    
    /**
     * 添加槽位到链尾
     */
    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first);
        first = protocolProcessor;
    }
    
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
    
    /**
     * 构建默认槽位链
     */
    public static ProcessorSlotChain newSlotChain() {
        ProcessorSlotChain chain = new DefaultSlotChain();
        
        // 按顺序添加槽位
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        
        return chain;
    }
}

/**
 * 抽象槽位基类
 * 定义槽位链的基本执行逻辑
 */
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
    private AbstractLinkedProcessorSlot<?> next;
    
    /**
     * 入口处理
     */
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     T param, int count, boolean prioritized, Object... args) throws Throwable {
        
        // 前置处理
        if (entryLocal(context, resourceWrapper, param, count, prioritized, args)) {
            // 执行下一个槽位
            if (next != null) {
                next.transformEntry(context, resourceWrapper, param, count, prioritized, args);
            }
        }
    }
    
    /**
     * 出口处理
     */
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 后置处理
        exitLocal(context, resourceWrapper, count, args);
        
        // 执行下一个槽位
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }
    
    /**
     * 具体的入口处理逻辑 - 由子类实现
     */
    protected abstract boolean entryLocal(Context context, ResourceWrapper resourceWrapper, 
                                        T param, int count, boolean prioritized, Object... args) throws Throwable;
    
    /**
     * 具体的出口处理逻辑 - 由子类实现
     */
    protected abstract void exitLocal(Context context, ResourceWrapper resourceWrapper, 
                                    int count, Object... args);
}

🎯 核心槽位深度解析

1. NodeSelectorSlot - 节点选择器
/**
 * NodeSelectorSlot - 资源节点选择器
 * 负责创建调用链路节点树
 */
@Component
@Slf4j
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    
    /**
     * 节点缓存
     */
    private volatile Map<String, DefaultNode> map = new HashMap<>();
    
    @Override
    protected boolean entryLocal(Context context, ResourceWrapper resourceWrapper, 
                               Object param, int count, boolean prioritized, Object... args) throws Throwable {
        
        // 1. 获取或创建当前资源的节点
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    Map<String, DefaultNode> newMap = new HashMap<>(map);
                    newMap.put(context.getName(), node);
                    map = newMap;
                }
            }
        }
        
        // 2. 设置当前入口节点
        context.setCurEntry(new Entry(resourceWrapper, node, context));
        
        // 3. 构建调用链节点关系
        buildNodeTree(context, node);
        
        return true;
    }
    
    /**
     * 构建调用链节点树
     */
    private void buildNodeTree(Context context, DefaultNode node) {
        if (context.getCurEntry().getParent() != null) {
            // 设置父子节点关系
            ((DefaultNode) context.getCurEntry().getParent().getCurNode())
                .addChild(node);
        }
    }
    
    @Override
    protected void exitLocal(Context context, ResourceWrapper resourceWrapper, 
                           int count, Object... args) {
        // 清理上下文
        if (context.getCurEntry() != null) {
            context.getCurEntry().exit(count, args);
        }
    }
}
2. ClusterBuilderSlot - 集群构建器
/**
 * ClusterBuilderSlot - 集群节点构建器
 * 负责集群流量统计和限流
 */
@Component
@Slf4j
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    private static volatile ClusterNode clusterNode = null;
    private final ClusterNodeManager clusterNodeManager;
    
    @Override
    protected boolean entryLocal(Context context, ResourceWrapper resourceWrapper, 
                               DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        
        // 1. 获取或创建集群节点
        if (clusterNode == null) {
            synchronized (this) {
                if (clusterNode == null) {
                    clusterNode = clusterNodeManager.getClusterNode(resourceWrapper.getName());
                }
            }
        }
        
        // 2. 设置集群节点
        node.setClusterNode(clusterNode);
        
        // 3. 集群流量统计
        if (!"".equals(context.getOrigin())) {
            // 按调用来源统计
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }
        
        // 4. 应用集群流控规则
        applyClusterFlowRule(context, resourceWrapper, clusterNode, count, args);
        
        return true;
    }
    
    /**
     * 应用集群流控规则
     */
    private void applyClusterFlowRule(Context context, ResourceWrapper resourceWrapper,
                                     ClusterNode clusterNode, int count, Object... args) throws BlockException {
        
        // 获取集群流控规则
        List<FlowRule> rules = FlowRuleManager.getRulesForResource(resourceWrapper.getName());
        
        for (FlowRule rule : rules) {
            if (!rule.passClusterCheck(clusterNode, count)) {
                // 触发流控
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}
3. StatisticSlot - 统计槽位
/**
 * StatisticSlot - 统计指标槽位
 * 负责实时指标统计和滑动窗口管理
 */
@Component
@Slf4j
public class StatisticSlot extends AbstractLinkedProcessorSlot<Object> {
    
    private final Metric metric;
    private final int sampleCount;
    private final int intervalInMs;
    
    public StatisticSlot() {
        this.sampleCount = 2;     // 采样窗口数
        this.intervalInMs = 1000; // 统计窗口间隔
        this.metric = new ArrayMetric(sampleCount, intervalInMs);
    }
    
    @Override
    protected boolean entryLocal(Context context, ResourceWrapper resourceWrapper, 
                               Object param, int count, boolean prioritized, Object... args) throws Throwable {
        
        long startTime = System.currentTimeMillis();
        try {
            // 1. 触发下一个槽位
            boolean result = fireEntry(context, resourceWrapper, param, count, prioritized, args);
            
            // 2. 成功请求统计
            if (result) {
                metric.addSuccess(count);
                metric.addRt(System.currentTimeMillis() - startTime);
                
                // 记录通过请求
                log.debug("请求通过统计: resource={}, count={}", 
                         resourceWrapper.getName(), count);
            }
            
            return result;
            
        } catch (BlockException e) {
            // 3. 被阻塞请求统计
            metric.addBlock(count);
            throw e;
            
        } catch (Throwable e) {
            // 4. 业务异常统计
            metric.addException(count);
            throw e;
        }
    }
    
    /**
     * 触发下一个槽位执行
     */
    private boolean fireEntry(Context context, ResourceWrapper resourceWrapper, 
                            Object param, int count, boolean prioritized, Object... args) throws Throwable {
        // 实际的槽位链执行逻辑
        return true;
    }
    
    @Override
    protected void exitLocal(Context context, ResourceWrapper resourceWrapper, 
                           int count, Object... args) {
        // 出口统计
        long completeTime = System.currentTimeMillis();
        Entry entry = context.getCurEntry();
        if (entry != null) {
            long rt = completeTime - entry.getCreateTime();
            metric.addRt(rt);
        }
    }
    
    /**
     * 数组指标统计实现
     * 基于滑动窗口的实时统计
     */
    private static class ArrayMetric implements Metric {
        private final LeapArray<MetricBucket> data;
        
        public ArrayMetric(int sampleCount, int intervalInMs) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        }
        
        @Override
        public long success() {
            data.currentWindow().value().addSuccess(1);
            return data.values().stream()
                .mapToLong(MetricBucket::success)
                .sum();
        }
        
        @Override
        public long total() {
            return success() + block() + exception();
        }
        
        @Override
        public double avgRt() {
            long successCount = success();
            if (successCount == 0) {
                return 0;
            }
            return data.values().stream()
                .mapToLong(MetricBucket::rt)
                .sum() / (double) successCount;
        }
    }
}

📊 三、流控规则与算法实现

🎯 流控规则体系

流控规则类结构

/**
 * 流控规则体系
 * 支持多种流控策略和算法
 */
public class FlowRuleManager {
    
    /**
     * 流控规则基类
     */
    @Data
    public static class FlowRule {
        private String resource;                    // 资源名称
        private double count;                       // 阈值
        private int grade = GRADE_QPS;              // 阈值类型
        private int strategy = STRATEGY_DIRECT;     // 流控策略
        private String limitApp = "default";        // 流控应用
        private int controlBehavior = CONTROL_BEHAVIOR_DEFAULT; // 控制效果
        
        // 阈值类型常量
        public static final int GRADE_QPS = 1;      // QPS 流控
        public static final int GRADE_THREAD = 0;   // 线程数流控
        
        // 流控策略常量
        public static final int STRATEGY_DIRECT = 0;        // 直接
        public static final int STRATEGY_RELATE = 1;       // 关联
        public static final int STRATEGY_CHAIN = 2;         // 链路
        
        // 控制效果常量
        public static final int CONTROL_BEHAVIOR_DEFAULT = 0;      // 快速失败
        public static final int CONTROL_BEHAVIOR_WARM_UP = 1;      // 预热
        public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2; // 排队等待
    }
    
    /**
     * 流控检查器
     */
    @Component
    @Slf4j
    public static class FlowRuleChecker {
        
        /**
         * 检查流控规则
         */
        public void checkFlow(FlowRule rule, Context context, DefaultNode node, int count) 
                throws BlockException {
            
            // 根据流控策略选择检查方法
            switch (rule.getStrategy()) {
                case FlowRule.STRATEGY_DIRECT:
                    checkDirectFlow(rule, node, count);
                    break;
                case FlowRule.STRATEGY_RELATE:
                    checkRelateFlow(rule, context, count);
                    break;
                case FlowRule.STRATEGY_CHAIN:
                    checkChainFlow(rule, context, count);
                    break;
                default:
                    checkDirectFlow(rule, node, count);
            }
        }
        
        /**
         * 直接流控检查
         */
        private void checkDirectFlow(FlowRule rule, DefaultNode node, int count) 
                throws BlockException {
            
            // 根据阈值类型检查
            if (rule.getGrade() == FlowRule.GRADE_QPS) {
                // QPS 流控
                if (!passQpsCheck(rule, node, count)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            } else {
                // 线程数流控
                if (!passThreadCheck(rule, node)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
        
        /**
         * QPS 流控检查
         */
        private boolean passQpsCheck(FlowRule rule, DefaultNode node, int count) {
            double qps = node.avgQps();
            double threshold = rule.getCount();
            
            // 应用控制效果
            switch (rule.getControlBehavior()) {
                case FlowRule.CONTROL_BEHAVIOR_WARM_UP:
                    // 预热模式
                    threshold = calculateWarmUpThreshold(rule, node);
                    break;
                case FlowRule.CONTROL_BEHAVIOR_RATE_LIMITER:
                    // 速率限制器模式
                    return passRateLimiterCheck(rule, node, count);
                default:
                    // 快速失败模式
                    break;
            }
            
            return qps + count <= threshold;
        }
        
        /**
         * 预热算法实现
         */
        private double calculateWarmUpThreshold(FlowRule rule, DefaultNode node) {
            long uptime = System.currentTimeMillis() - node.getStartTime();
            long warmupPeriod = rule.getWarmUpPeriodSec() * 1000;
            
            if (uptime > warmupPeriod) {
                return rule.getCount();
            } else {
                // 冷启动阶段,逐步提高阈值
                double coldFactor = rule.getWarmUpColdFactor();
                if (coldFactor <= 1) {
                    coldFactor = 3; // 默认冷启动因子
                }
                
                double threshold = rule.getCount() / coldFactor;
                double slope = (rule.getCount() - threshold) / warmupPeriod;
                return threshold + slope * uptime;
            }
        }
    }
}

⚡ 滑动窗口算法

高性能滑动窗口实现

/**
 * 滑动窗口算法实现
 * 基于 LeapArray 的高性能统计窗口
 */
@Component
@Slf4j
public class LeapArray<T> {
    
    private final int windowLengthInMs;    // 窗口长度
    private final int sampleCount;        // 采样窗口数
    private final int intervalInMs;       // 统计间隔
    private final AtomicReferenceArray<WindowWrap<T>> array; // 窗口数组
    
    /**
     * 窗口包装类
     */
    @Data
    public static class WindowWrap<T> {
        private final long windowLength;  // 窗口长度
        private long windowStart;         // 窗口开始时间
        private T value;                  // 窗口值
        
        /**
         * 重置窗口
         */
        public WindowWrap<T> resetTo(long startTime) {
            this.windowStart = startTime;
            return this;
        }
        
        /**
         * 检查窗口是否过期
         */
        public boolean isTimeInWindow(long time) {
            return windowStart <= time && time < windowStart + windowLength;
        }
    }
    
    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.sampleCount = sampleCount;
        this.intervalInMs = intervalInMs;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    
    /**
     * 获取当前时间窗口
     */
    public WindowWrap<T> currentWindow() {
        return currentWindow(System.currentTimeMillis());
    }
    
    /**
     * 获取指定时间对应的窗口
     */
    public WindowWrap<T> currentWindow(long time) {
        if (time < 0) {
            return null;
        }
        
        int idx = calculateTimeIdx(time);
        long windowStart = calculateWindowStart(time);
        
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 创建新窗口
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(time));
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    Thread.yield();
                }
            } else if (windowStart == old.getWindowStart()) {
                // 返回现有窗口
                return old;
            } else if (windowStart > old.getWindowStart()) {
                // 重置过期窗口
                if (updateLock.tryLock()) {
                    try {
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.getWindowStart()) {
                // 不应该发生的情况
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(time));
            }
        }
    }
    
    /**
     * 计算时间索引
     */
    private int calculateTimeIdx(long time) {
        long timeId = time / windowLengthInMs;
        return (int) (timeId % array.length());
    }
    
    /**
     * 计算窗口开始时间
     */
    private long calculateWindowStart(long time) {
        return time - time % windowLengthInMs;
    }
    
    /**
     * 获取所有有效窗口
     */
    public List<WindowWrap<T>> list() {
        return list(System.currentTimeMillis());
    }
    
    public List<WindowWrap<T>> list(long validTime) {
        int size = array.length();
        List<WindowWrap<T>> result = new ArrayList<>(size);
        
        for (int i = 0; i < size; i++) {
            WindowWrap<T> window = array.get(i);
            if (window == null || isWindowDeprecated(validTime, window)) {
                continue;
            }
            result.add(window);
        }
        
        return result;
    }
    
    /**
     * 检查窗口是否过期
     */
    private boolean isWindowDeprecated(long time, WindowWrap<T> window) {
        return time - window.getWindowStart() > intervalInMs;
    }
}

⚡ 四、熔断降级策略源码剖析

🔄 熔断器状态机

熔断器核心实现

/**
 * 熔断器状态机
 * 基于响应时间和异常比例的熔断降级
 */
@Component
@Slf4j
public class CircuitBreaker {
    
    /**
     * 熔断器状态
     */
    public enum Status {
        CLOSED,     // 关闭状态 - 正常通行
        OPEN,       // 打开状态 - 快速失败
        HALF_OPEN   // 半开状态 - 试探性放行
    }
    
    /**
     * 熔断规则
     */
    @Data
    public static class DegradeRule {
        private String resource;                    // 资源名称
        private double count;                       // 阈值
        private int grade = GRADE_RT;               // 降级策略
        private int timeWindow;                     // 时间窗口(秒)
        private int minRequestAmount = 5;          // 最小请求数
        private double slowRatioThreshold = 1.0;   // 慢调用比例阈值
        private int statIntervalMs = 1000;         // 统计间隔
        
        // 降级策略常量
        public static final int GRADE_RT = 0;               // 响应时间
        public static final int GRADE_EXCEPTION_RATIO = 1;  // 异常比例
        public static final int GRADE_EXCEPTION_COUNT = 2;  // 异常数量
    }
    
    /**
     * 熔断器实现
     */
    @Component
    @Slf4j
    public static class DefaultCircuitBreaker {
        
        private volatile Status status = Status.CLOSED;
        private final DegradeRule rule;
        private final Metric metric;
        private long nextRetryTime = 0L;
        
        public DefaultCircuitBreaker(DegradeRule rule) {
            this.rule = rule;
            this.metric = new ArrayMetric(2, 1000);
        }
        
        /**
         * 请求是否允许通过
         */
        public boolean canPass() {
            // 检查熔断器状态
            if (status == Status.OPEN) {
                // 检查是否应该尝试半开
                if (System.currentTimeMillis() > nextRetryTime) {
                    status = Status.HALF_OPEN;
                    log.info("熔断器进入半开状态: {}", rule.getResource());
                    return true;
                }
                return false;
            }
            
            if (status == Status.HALF_OPEN) {
                // 半开状态,允许部分请求通过
                return Math.random() < 0.5; // 50% 的通过率
            }
            
            return true; // 关闭状态,允许通过
        }
        
        /**
         * 记录请求结果
         */
        public void onRequestComplete(long rt, Throwable error) {
            metric.addRt(rt);
            metric.addTotal(1);
            
            if (error != null) {
                metric.addException(1);
            }
            
            // 根据当前状态处理
            switch (status) {
                case CLOSED:
                    handleClosedState();
                    break;
                case HALF_OPEN:
                    handleHalfOpenState(rt, error);
                    break;
                case OPEN:
                    // 打开状态不处理
                    break;
            }
        }
        
        /**
         * 处理关闭状态
         */
        private void handleClosedState() {
            // 检查是否应该打开熔断器
            if (shouldOpen()) {
                status = Status.OPEN;
                nextRetryTime = System.currentTimeMillis() + rule.getTimeWindow() * 1000;
                log.warn("熔断器打开: {}", rule.getResource());
            }
        }
        
        /**
         * 处理半开状态
         */
        private void handleHalfOpenState(long rt, Throwable error) {
            if (error == null && rt < rule.getCount()) {
                // 试探请求成功
                metric.reset();
                status = Status.CLOSED;
                log.info("熔断器关闭: {}", rule.getResource());
            } else {
                // 试探请求失败,重新打开
                status = Status.OPEN;
                nextRetryTime = System.currentTimeMillis() + rule.getTimeWindow() * 1000;
                log.warn("熔断器重新打开: {}", rule.getResource());
            }
        }
        
        /**
         * 检查是否应该打开熔断器
         */
        private boolean shouldOpen() {
            // 检查最小请求数
            if (metric.total() < rule.getMinRequestAmount()) {
                return false;
            }
            
            // 根据降级策略检查
            switch (rule.getGrade()) {
                case DegradeRule.GRADE_RT:
                    return checkResponseTime();
                case DegradeRule.GRADE_EXCEPTION_RATIO:
                    return checkExceptionRatio();
                case DegradeRule.GRADE_EXCEPTION_COUNT:
                    return checkExceptionCount();
                default:
                    return false;
            }
        }
        
        /**
         * 检查响应时间
         */
        private boolean checkResponseTime() {
            double slowCount = metric.total() - metric.success();
            double ratio = slowCount / metric.total();
            
            return ratio > rule.getSlowRatioThreshold() && 
                   metric.avgRt() > rule.getCount();
        }
        
        /**
         * 检查异常比例
         */
        private boolean checkExceptionRatio() {
            if (metric.total() == 0) {
                return false;
            }
            double ratio = (double) metric.exception() / metric.total();
            return ratio > rule.getCount();
        }
    }
}

🔄 五、动态规则管理与推送机制

📡 规则管理器实现

动态规则管理

/**
 * 规则管理器
 * 负责规则的加载、更新和推送
 */
@Component
@Slf4j
public class RuleManager {
    
    private final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<>();
    private final Map<String, List<DegradeRule>> degradeRules = new ConcurrentHashMap<>();
    private final RulePublisher rulePublisher;
    
    /**
     * 加载流控规则
     */
    public void loadRules(List<FlowRule> rules) {
        if (rules == null) {
            return;
        }
        
        // 按资源分组
        Map<String, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
        for (FlowRule rule : rules) {
            String resource = rule.getResource();
            newRuleMap.computeIfAbsent(resource, k -> new CopyOnWriteArrayList<>())
                     .add(rule);
        }
        
        // 原子性更新规则
        flowRules.clear();
        flowRules.putAll(newRuleMap);
        
        log.info("流控规则加载完成: {} 条规则", rules.size());
    }
    
    /**
     * 获取资源的流控规则
     */
    public List<FlowRule> getRules(String resource) {
        return flowRules.getOrDefault(resource, Collections.emptyList());
    }
    
    /**
     * 添加规则监听器
     */
    public void addFlowRuleListener(PropertyListener<List<FlowRule>> listener) {
        // 添加规则变更监听
    }
}

/**
 * 规则发布器
 * 负责规则的动态推送和更新
 */
@Component
@Slf4j
public class RulePublisher {
    
    private final List<RuleListener> listeners = new CopyOnWriteArrayList<>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    /**
     * 规则监听器接口
     */
    public interface RuleListener {
        void onRuleChanged(String ruleType, List<?> rules);
    }
    
    /**
     * 发布规则变更
     */
    public void publishRules(String ruleType, List<?> rules) {
        for (RuleListener listener : listeners) {
            try {
                listener.onRuleChanged(ruleType, rules);
                log.debug("规则变更通知成功: {}", ruleType);
            } catch (Exception e) {
                log.error("规则变更通知失败", e);
            }
        }
    }
    
    /**
     * 添加规则监听器
     */
    public void addListener(RuleListener listener) {
        listeners.add(listener);
    }
    
    /**
     * 启动规则推送任务
     */
    public void startPublishTask() {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                // 定期推送规则到客户端
                pushRulesToClients();
            } catch (Exception e) {
                log.error("规则推送任务执行失败", e);
            }
        }, 0, 30, TimeUnit.SECONDS); // 30秒推送一次
    }
}

🌐 六、配置中心集成实战

🔄 Nacos 集成配置

Sentinel 与 Nacos 集成

/**
 * Sentinel 与 Nacos 集成配置
 * 实现规则的动态推送和持久化
 */
@Configuration
@Slf4j
public class NacosIntegrationConfig {
    
    /**
     * Nacos 数据源配置
     */
    @Bean
    @ConditionalOnProperty(name = "spring.cloud.sentinel.datasource.nacos.enabled", havingValue = "true")
    public DataSource nacosDataSource() {
        NacosDataSource<List<FlowRule>> flowRuleDataSource = new NacosDataSource<>(
            properties.getNacos().getServerAddr(),
            properties.getNacos().getGroupId(),
            properties.getNacos().getDataId(),
            new Converter<String, List<FlowRule>>() {
                @Override
                public List<FlowRule> convert(String source) {
                    return JSON.parseArray(source, FlowRule.class);
                }
            }
        );
        
        flowRuleDataSource.getProperty().addListener(new PropertyListener<List<FlowRule>>() {
            @Override
            public void configUpdate(List<FlowRule> value) {
                // 规则更新回调
                log.info("从Nacos接收到规则更新: {} 条规则", value.size());
                FlowRuleManager.loadRules(value);
            }
            
            @Override
            public void configLoad(List<FlowRule> value) {
                // 规则加载回调
                log.info("从Nacos加载规则: {} 条规则", value.size());
                FlowRuleManager.loadRules(value);
            }
        });
        
        return flowRuleDataSource;
    }
    
    /**
     * Apollo 数据源配置
     */
    @Bean
    @ConditionalOnProperty(name = "spring.cloud.sentinel.datasource.apollo.enabled", havingValue = "true")
    public DataSource apolloDataSource() {
        ApolloDataSource<List<DegradeRule>> degradeRuleDataSource = new ApolloDataSource<>(
            properties.getApollo().getNamespace(),
            properties.getApollo().getFlowRulesKey(),
            "",
            new Converter<String, List<DegradeRule>>() {
                @Override
                public List<DegradeRule> convert(String source) {
                    return JSON.parseArray(source, DegradeRule.class);
                }
            }
        );
        
        return degradeRuleDataSource;
    }
}

/**
 * 动态数据源管理器
 */
@Component
@Slf4j
public class DynamicDataSourceManager {
    
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
    
    /**
     * 注册数据源
     */
    public void registerDataSource(String dataSourceName, DataSource dataSource) {
        dataSourceMap.put(dataSourceName, dataSource);
        log.info("数据源注册成功: {}", dataSourceName);
    }
    
    /**
     * 从所有数据源加载规则
     */
    public void loadRulesFromDataSources() {
        for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
            try {
                String ruleType = entry.getKey();
                DataSource dataSource = entry.getValue();
                
                // 加载规则
                Object rules = dataSource.loadRules();
                if (rules != null) {
                    publishRules(ruleType, rules);
                }
                
            } catch (Exception e) {
                log.error("从数据源加载规则失败: {}", entry.getKey(), e);
            }
        }
    }
}

💡 七、生产环境最佳实践

🔧 高可用配置

生产环境 Sentinel 配置

# application-prod.yml
spring:
  cloud:
    sentinel:
      # 基础配置
      eager: true
      transport:
        dashboard: ${SENTINEL_DASHBOARD:localhost}:8080
        port: 8719
        client-ip: ${spring.cloud.client.ip-address}
      
      # 过滤器配置
      filter:
        enabled: true
        url-patterns: /**
        order: -2147483648
      
      # 数据源配置
      datasource:
        # 流控规则数据源
        flow:
          nacos:
            server-addr: ${NACOS_SERVER:localhost}:8848
            dataId: ${spring.application.name}-flow-rules
            groupId: SENTINEL_GROUP
            rule-type: flow
            data-type: json
        # 降级规则数据源
        degrade:
          nacos:
            server-addr: ${NACOS_SERVER:localhost}:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            rule-type: degrade
            data-type: json
        # 系统规则数据源
        system:
          nacos:
            server-addr: ${NACOS_SERVER:localhost}:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            rule-type: system
            data-type: json
      
      # 流控配置
      flow:
        cold-factor: 3          # 冷启动因子
        max-queueing-time-ms: 500  # 最大排队时间
      
      # 日志配置
      log:
        dir: /logs/sentinel
        switch-pid: true
        max-file-size: 104857600  # 100MB
        max-file-count: 10

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,sentinel
  endpoint:
    sentinel:
      enabled: true
  metrics:
    export:
      sentinel:
        enabled: true
        step: 1m

# 自定义配置
sentinel:
  # 统计配置
  metric:
    file-single-size: 52428800    # 50MB
    file-total-count: 6           # 最多6个文件
  # 系统规则默认值
  system:
    max-thread: 1000
    max-qps: 5000
    avg-rt: 1000
    max-load: 8.0

🚀 性能优化建议

高性能配置优化

/**
 * Sentinel 性能优化配置
 */
@Configuration
@Slf4j
public class SentinelPerformanceConfig {
    
    /**
     * 自定义资源注解切面
     * 优化注解方式的性能开销
     */
    @Aspect
    @Component
    @Slf4j
    public class SentinelResourceAspect {
        
        @Around("@annotation(sentinelResource)")
        public Object aroundAdvice(ProceedingJoinPoint pjp, 
                                  SentinelResource sentinelResource) throws Throwable {
            
            String resourceName = getResourceName(pjp, sentinelResource);
            Entry entry = null;
            
            try {
                // 入口检查
                entry = SphU.entry(resourceName, 
                    sentinelResource.entryType(), 
                    sentinelResource.resourceType(), 
                    pjp.getArgs());
                
                // 执行原方法
                Object result = pjp.proceed();
                
                // 记录成功
                if (entry != null) {
                    entry.exit(1, pjp.getArgs());
                }
                
                return result;
                
            } catch (BlockException e) {
                // 流控异常处理
                return handleBlockException(pjp, sentinelResource, e);
            } catch (Throwable e) {
                // 业务异常处理
                if (entry != null) {
                    Tracer.trace(e);
                }
                throw e;
            }
        }
    }
    
    /**
     *  Ͳ自定义上下文管理器
     * 优化上下文传播性能
     */
    @Component
    @Slf4j
    public class CustomContextManager {
        
        private static final ThreadLocal<Context> contextThreadLocal = new ThreadLocal<>();
        
        /**
         * 入口上下文设置
         */
        public static void enter(String name, String origin) {
            if (contextThreadLocal.get() == null) {
                Context context = new Context(name, origin);
                contextThreadLocal.set(context);
            }
        }
        
        /**
         * 出口上下文清理
         */
        public static void exit() {
            Context context = contextThreadLocal.get();
            if (context != null) {
                contextThreadLocal.remove();
            }
        }
    }
    
    /**
     * 异步支持配置
     */
    @Configuration
    @Slf4j
    public class AsyncSupportConfig {
        
        @Bean
        public AsyncTaskExecutor sentinelAsyncTaskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(20);
            executor.setMaxPoolSize(100);
            executor.setQueueCapacity(1000);
            executor.setThreadNamePrefix("sentinel-async-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setAwaitTerminationSeconds(60);
            return executor;
        }
        
        /**
         * 异步调用支持
         */
        @Async("sentinelAsyncTaskExecutor")
        public <T> CompletableFuture<T> executeWithSentinel(Supplier<T> supplier, String resourceName) {
            Entry entry = null;
            try {
                entry = SphU.entry(resourceName);
                T result = supplier.get();
                return CompletableFuture.completedFuture(result);
            } catch (BlockException e) {
                return CompletableFuture.failedFuture(e);
            } finally {
                if (entry != null) {
                    entry.exit();
                }
            }
        }
    }
}

洞察:Sentinel 的核心价值在于其灵活的 SlotChain 责任链设计和高效的滑动窗口统计机制。深入理解其内部执行流程和算法实现,是构建高可用、高性能流量控制系统的关键。


如果觉得本文对你有帮助,请点击 👍 点赞 + ⭐ 收藏 + 💬 留言支持!

讨论话题

  1. 你在生产环境中如何配置 Sentinel 的流控规则?
  2. 面对突发流量,如何设计合理的熔断降级策略?
  3. 在微服务架构中,如何实现全局流量控制?

相关资源推荐

  • 📚 https://github.com/alibaba/Sentinel/wiki
  • 🔧 https://github.com/example/sentinel-deep-dive
  • 💻 https://github.com/alibaba/Sentinel/wiki/Dashboard

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐