在这里插入图片描述

👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕Sentinel这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!


文章目录

Sentinel - 核心原理深度解析:限流熔断的底层实现与设计思想 🧠

引言 📝

在当今这个分布式系统和微服务架构盛行的时代,系统的稳定性和可靠性变得尤为重要。任何微小的服务故障都可能引发连锁反应,导致整个系统雪崩。为了应对这种挑战,流量控制(Rate Limiting)、熔断降级(Circuit Breaking)等容错机制应运而生。作为阿里巴巴开源的流量控制组件,Sentinel 以其强大的功能和灵活的设计,成为了众多企业保障系统稳定运行的利器。

然而,仅仅知道如何使用 Sentinel 是远远不够的。要想真正掌握它的精髓,深入理解其核心原理和底层实现机制是至关重要的。这不仅能帮助我们更好地进行系统设计和调优,也能在遇到问题时快速定位和解决。

本文将带你深入 Sentinel 的心脏,从其核心数据结构、核心组件、限流和熔断的具体实现机制,再到设计思想的剖析,层层递进,为你揭开 Sentinel 的神秘面纱。我们将通过大量的 Java 代码示例和图表,带你领略 Sentinel 在流量治理方面的精妙设计。

Sentinel 核心架构概览 🏗️

在开始深入细节之前,让我们先从宏观角度看看 Sentinel 的整体架构。Sentinel 的核心目标是提供一个高性能、可扩展、易于使用的流量控制平台。它的架构设计体现了高度的模块化和解耦思想。

1. 核心组件说明

  • Entry/Exit API (SphU, SphO):这是 Sentinel 提供给用户的最基础 API。用户通过 SphU.entry(resource) 进入资源,通过 entry.exit() 退出资源。这是所有限流、熔断等逻辑的起点。
  • Slot Chain (SlotChain):一个责任链模式的实现。当用户调用 SphU.entry() 时,会依次执行链上的各个 Slot(插槽)。每个 Slot 负责不同的功能,比如流量控制、熔断、系统保护等。
  • Slots (FlowSlot, CircuitBreakerSlot, SystemSlot):这些是具体的功能实现单元。每个 Slot 都负责处理特定类型的规则(如流量规则、熔断规则等),并决定是否放行请求。
  • Rule Managers (FlowRuleManager, CircuitBreakerRuleManager, SystemRuleManager):这些是规则的管理者,负责加载、更新和维护各种规则。它们通常会监听规则数据源的变化,并将最新的规则应用到相应的 Slot 中。
  • Rule Storage (RuleStorage):规则的持久化存储。规则可以存储在内存中,也可以存储在外部的数据源(如 Nacos, Zookeeper, Consul)中。Sentinel 支持多种数据源,方便动态更新规则。
  • Dashboard:提供可视化的管理界面,方便用户查看监控数据、配置规则。
  • Metrics Exporter:将 Sentinel 的内部指标导出,供外部监控系统(如 Prometheus)消费。

Sentinel 核心数据结构详解 📦

了解 Sentinel 的核心数据结构是理解其工作原理的第一步。这些结构构成了 Sentinel 的数据基础,支撑着其强大的功能。

1. 资源 (Resource)

资源是 Sentinel 进行流量控制的基本单位。它可以是 URL、方法名、服务名、数据库表名等等。在 Sentinel 中,资源被抽象为一个字符串,称为 resourceName

示例代码:资源标识
// 在 Sentinel 中,资源通常通过字符串标识
String resourceName = "com.example.service.UserService.getUserById";
// 或者更简单的形式
String resourceName2 = "/api/users/{id}";

2. 资源上下文 (Context)

资源上下文(Context)是 Sentinel 中的一个重要概念。它代表了当前请求所处的上下文环境,包含了资源、入口、线程信息等。

示例代码:Context 结构
// Context 是一个抽象类,实际实现是 DefaultContext
public abstract class Context {
    // 当前资源名称
    public abstract String getName();
    // 当前入口节点
    public abstract Node getEntranceNode();
    // 当前线程编号
    public abstract long getThreadId();
    // 当前线程名
    public abstract String getThreadName();
    // 当前资源的调用链路信息
    public abstract List<Node> getCurNodeList();
}

// 默认上下文实现
class DefaultContext extends Context {
    private final String name;
    private final Node entranceNode;
    private final long threadId;
    private final String threadName;
    private final List<Node> curNodeList;

    public DefaultContext(String name, Node entranceNode, long threadId, String threadName) {
        this.name = name;
        this.entranceNode = entranceNode;
        this.threadId = threadId;
        this.threadName = threadName;
        this.curNodeList = new ArrayList<>();
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public Node getEntranceNode() {
        return entranceNode;
    }

    @Override
    public long getThreadId() {
        return threadId;
    }

    @Override
    public String getThreadName() {
        return threadName;
    }

    @Override
    public List<Node> getCurNodeList() {
        return curNodeList;
    }
}

3. 节点 (Node)

节点(Node)是 Sentinel 中用于记录资源运行时统计数据的核心数据结构。它保存了资源的调用次数、响应时间、错误数等信息。Sentinel 中主要有两种节点类型:DefaultNodeClusterNode

3.1 DefaultNode

DefaultNode 是一个基本的节点,用于记录单个资源的统计信息。它通常作为调用链路中的叶子节点存在。

3.2 ClusterNode

ClusterNode 是集群节点,用于记录资源在集群环境下的聚合统计信息。它通常作为调用链路中的中间节点存在,汇总其子节点的数据。

示例代码:Node 结构
// Node 接口定义
public interface Node {
    // 获取资源名称
    String getName();
    // 获取当前节点的统计信息
    StatisticNode getStatisticNode();
    // 获取父节点
    Node getParent();
    // 获取子节点列表
    Map<String, Node> getChildren();
    // 添加子节点
    void addChild(Node child);
    // 移除子节点
    void removeChild(Node child);
    // 获取调用链路中的入口节点
    Node getEntranceNode();
}

// DefaultNode 实现
public class DefaultNode implements Node {
    private final String name;
    private final StatisticNode statisticNode;
    private Node parent;
    private final Map<String, Node> children = new HashMap<>();

    public DefaultNode(String name, StatisticNode statisticNode) {
        this.name = name;
        this.statisticNode = statisticNode;
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public StatisticNode getStatisticNode() {
        return statisticNode;
    }

    @Override
    public Node getParent() {
        return parent;
    }

    @Override
    public void setParent(Node parent) {
        this.parent = parent;
    }

    @Override
    public Map<String, Node> getChildren() {
        return children;
    }

    @Override
    public void addChild(Node child) {
        children.put(child.getName(), child);
    }

    @Override
    public void removeChild(Node child) {
        children.remove(child.getName());
    }

    @Override
    public Node getEntranceNode() {
        Node node = this;
        while (node.getParent() != null) {
            node = node.getParent();
        }
        return node;
    }
}

// StatisticNode 接口定义
public interface StatisticNode {
    // 获取最近的统计信息
    Snapshot getSnapshot(long timeMillis);
    // 记录一次请求
    void addRequest(int count, long rt, int status);
    // 获取最近的 QPS
    double getQps();
    // 获取最近的平均响应时间
    long getAvgRt();
    // 获取最近的错误数
    long getErrorCount();
    // 获取最近的线程数
    int getActiveThreads();
}

// DefaultStatisticNode 实现
public class DefaultStatisticNode implements StatisticNode {
    // 用于存储统计信息的滑动窗口
    private final SlidingWindow slidingWindow;
    // 用于存储线程数的滑动窗口
    private final SlidingWindow threadSlidingWindow;

    public DefaultStatisticNode(int windowLengthInMs, int sampleCount) {
        this.slidingWindow = new SlidingWindow(windowLengthInMs, sampleCount);
        this.threadSlidingWindow = new SlidingWindow(windowLengthInMs, sampleCount);
    }

    @Override
    public Snapshot getSnapshot(long timeMillis) {
        return slidingWindow.getSnapshot(timeMillis);
    }

    @Override
    public void addRequest(int count, long rt, int status) {
        slidingWindow.addRequest(count, rt, status);
        if (status == 0) { // 成功
            threadSlidingWindow.addRequest(1, 0, 0); // 增加线程数
        } else { // 失败
            // 不增加线程数,但可以记录错误
        }
    }

    @Override
    public double getQps() {
        return slidingWindow.getQps();
    }

    @Override
    public long getAvgRt() {
        return slidingWindow.getAvgRt();
    }

    @Override
    public long getErrorCount() {
        return slidingWindow.getErrorCount();
    }

    @Override
    public int getActiveThreads() {
        return threadSlidingWindow.getActiveThreads();
    }
}

4. 滑动窗口 (Sliding Window)

滑动窗口是 Sentinel 实现统计信息的核心数据结构。它能够高效地维护一段时间内的统计数据,如 QPS、平均响应时间、错误数等。

示例代码:滑动窗口实现
// 滑动窗口实现
public class SlidingWindow {
    // 窗口长度(毫秒)
    private final int windowLengthInMs;
    // 滑动窗口样本数
    private final int sampleCount;
    // 每个采样周期的毫秒数
    private final int intervalInMs;

    // 存储采样点的数组
    private final Sample[] samples;
    // 当前窗口的起始时间戳
    private volatile long windowStart;

    public SlidingWindow(int windowLengthInMs, int sampleCount) {
        this.windowLengthInMs = windowLengthInMs;
        this.sampleCount = sampleCount;
        this.intervalInMs = windowLengthInMs / sampleCount;
        this.samples = new Sample[sampleCount];
        this.windowStart = getCurrentTimeMillis();
        // 初始化采样点
        for (int i = 0; i < sampleCount; i++) {
            samples[i] = new Sample();
        }
    }

    // 获取当前时间戳
    private long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    // 获取当前时间所在的采样点索引
    private int getCurrentSampleIndex(long currentTimeMillis) {
        long elapsed = currentTimeMillis - windowStart;
        int index = (int) (elapsed / intervalInMs);
        return index % sampleCount;
    }

    // 获取当前时间的采样点
    private Sample getCurrentSample(long currentTimeMillis) {
        int index = getCurrentSampleIndex(currentTimeMillis);
        return samples[index];
    }

    // 记录请求
    public void addRequest(int count, long rt, int status) {
        long currentTimeMillis = getCurrentTimeMillis();
        // 如果窗口已经滚动,则更新窗口起始时间
        if (currentTimeMillis >= windowStart + windowLengthInMs) {
            windowStart = currentTimeMillis;
            // 重置旧的采样点
            for (Sample sample : samples) {
                sample.reset();
            }
        }

        Sample currentSample = getCurrentSample(currentTimeMillis);
        currentSample.addRequest(count, rt, status);
    }

    // 获取快照
    public Snapshot getSnapshot(long timeMillis) {
        Snapshot snapshot = new Snapshot();
        long currentWindowStart = timeMillis - (timeMillis % windowLengthInMs);
        long windowEnd = currentWindowStart + windowLengthInMs;

        // 遍历所有采样点,计算总和
        for (Sample sample : samples) {
            if (sample.getTimeWindowStart() >= currentWindowStart && sample.getTimeWindowStart() < windowEnd) {
                snapshot.merge(sample);
            }
        }
        return snapshot;
    }

    // 获取 QPS
    public double getQps() {
        long now = getCurrentTimeMillis();
        Snapshot snapshot = getSnapshot(now);
        return snapshot.getTotalCount() * 1000.0 / windowLengthInMs;
    }

    // 获取平均响应时间
    public long getAvgRt() {
        long now = getCurrentTimeMillis();
        Snapshot snapshot = getSnapshot(now);
        if (snapshot.getTotalCount() == 0) {
            return 0;
        }
        return snapshot.getTotalRt() / snapshot.getTotalCount();
    }

    // 获取错误数
    public long getErrorCount() {
        long now = getCurrentTimeMillis();
        Snapshot snapshot = getSnapshot(now);
        return snapshot.getErrorCount();
    }

    // 获取活动线程数
    public int getActiveThreads() {
        long now = getCurrentTimeMillis();
        Snapshot snapshot = getSnapshot(now);
        return snapshot.getActiveThreads();
    }
}

// 采样点
class Sample {
    private long timeWindowStart;
    private int totalCount;
    private long totalRt;
    private int errorCount;
    private int activeThreads;

    public Sample() {
        reset();
    }

    public void reset() {
        this.timeWindowStart = System.currentTimeMillis();
        this.totalCount = 0;
        this.totalRt = 0;
        this.errorCount = 0;
        this.activeThreads = 0;
    }

    public void addRequest(int count, long rt, int status) {
        this.totalCount += count;
        this.totalRt += rt * count;
        if (status != 0) { // 非成功状态
            this.errorCount += count;
        }
        // 这里简化处理,实际中可能需要更复杂的线程计数逻辑
    }

    public long getTimeWindowStart() {
        return timeWindowStart;
    }

    public int getTotalCount() {
        return totalCount;
    }

    public long getTotalRt() {
        return totalRt;
    }

    public int getErrorCount() {
        return errorCount;
    }

    public int getActiveThreads() {
        return activeThreads;
    }

    // 合并另一个采样点的数据
    public void merge(Sample other) {
        this.totalCount += other.totalCount;
        this.totalRt += other.totalRt;
        this.errorCount += other.errorCount;
        this.activeThreads += other.activeThreads;
    }
}

// 快照
class Snapshot {
    private int totalCount;
    private long totalRt;
    private int errorCount;
    private int activeThreads;

    public void merge(Sample sample) {
        this.totalCount += sample.getTotalCount();
        this.totalRt += sample.getTotalRt();
        this.errorCount += sample.getErrorCount();
        this.activeThreads += sample.getActiveThreads();
    }

    public int getTotalCount() {
        return totalCount;
    }

    public long getTotalRt() {
        return totalRt;
    }

    public int getErrorCount() {
        return errorCount;
    }

    public int getActiveThreads() {
        return activeThreads;
    }
}

5. 规则 (Rule)

规则是 Sentinel 实现流量控制和熔断降级的核心。Sentinel 支持多种规则,如流量规则、熔断规则、系统规则等。每种规则都有其特定的属性和含义。

5.1 流量规则 (FlowRule)

流量规则用于控制资源的流量,防止系统过载。它定义了资源的限流阈值、限流模式(QPS、线程数等)等。

示例代码:FlowRule 结构
// 流量规则
public class FlowRule {
    // 资源名称
    private String resource;
    // 限流阈值
    private double count;
    // 限流模式
    private int grade = 1; // 1 表示 QPS 模式,0 表示线程数模式
    // 限流策略
    private int strategy = 0; // 0 表示直接限流,1 表示关联限流,2 表示链路限流
    // 关联资源(用于关联限流)
    private String refResource;
    // 预热时间(毫秒)
    private int warmUpPeriodSec;
    // 是否开启匀速排队
    private boolean controlBehavior = false;
    // 排队等待时间(毫秒)
    private int maxQueueingTimeMs = 0;
    // 隔离级别
    private int clusterMode = 0;
    // 集群限流阈值
    private double clusterThreshold = 0;
    // 是否启用
    private boolean enable = true;

    // Getters and Setters
    public String getResource() {
        return resource;
    }

    public void setResource(String resource) {
        this.resource = resource;
    }

    public double getCount() {
        return count;
    }

    public void setCount(double count) {
        this.count = count;
    }

    public int getGrade() {
        return grade;
    }

    public void setGrade(int grade) {
        this.grade = grade;
    }

    public int getStrategy() {
        return strategy;
    }

    public void setStrategy(int strategy) {
        this.strategy = strategy;
    }

    public String getRefResource() {
        return refResource;
    }

    public void setRefResource(String refResource) {
        this.refResource = refResource;
    }

    public int getWarmUpPeriodSec() {
        return warmUpPeriodSec;
    }

    public void setWarmUpPeriodSec(int warmUpPeriodSec) {
        this.warmUpPeriodSec = warmUpPeriodSec;
    }

    public boolean isControlBehavior() {
        return controlBehavior;
    }

    public void setControlBehavior(boolean controlBehavior) {
        this.controlBehavior = controlBehavior;
    }

    public int getMaxQueueingTimeMs() {
        return maxQueueingTimeMs;
    }

    public void setMaxQueueingTimeMs(int maxQueueingTimeMs) {
        this.maxQueueingTimeMs = maxQueueingTimeMs;
    }

    public int getClusterMode() {
        return clusterMode;
    }

    public void setClusterMode(int clusterMode) {
        this.clusterMode = clusterMode;
    }

    public double getClusterThreshold() {
        return clusterThreshold;
    }

    public void setClusterThreshold(double clusterThreshold) {
        this.clusterThreshold = clusterThreshold;
    }

    public boolean isEnable() {
        return enable;
    }

    public void setEnable(boolean enable) {
        this.enable = enable;
    }
}
5.2 熔断规则 (DegradeRule)

熔断规则用于控制服务的熔断降级。当服务出现故障时,自动触发熔断,防止雪崩效应。

示例代码:DegradeRule 结构
// 熔断规则
public class DegradeRule {
    // 资源名称
    private String resource;
    // 熔断策略
    private int grade = 0; // 0 表示基于 RT 熔断,1 表示基于异常比例,2 表示基于异常数
    // 熔断阈值
    private double count;
    // 熔断时间窗口(秒)
    private int timeWindow;
    // 慢调用比例阈值(仅在 RT 熔断时有效)
    private double slowRatioThreshold = 0.0;
    // 最小请求数(仅在 RT 熔断时有效)
    private int minRequestAmount = 5;
    // 统计窗口大小(秒)
    private int statIntervalMs = 1000;
    // 是否启用
    private boolean enable = true;

    // Getters and Setters
    public String getResource() {
        return resource;
    }

    public void setResource(String resource) {
        this.resource = resource;
    }

    public int getGrade() {
        return grade;
    }

    public void setGrade(int grade) {
        this.grade = grade;
    }

    public double getCount() {
        return count;
    }

    public void setCount(double count) {
        this.count = count;
    }

    public int getTimeWindow() {
        return timeWindow;
    }

    public void setTimeWindow(int timeWindow) {
        this.timeWindow = timeWindow;
    }

    public double getSlowRatioThreshold() {
        return slowRatioThreshold;
    }

    public void setSlowRatioThreshold(double slowRatioThreshold) {
        this.slowRatioThreshold = slowRatioThreshold;
    }

    public int getMinRequestAmount() {
        return minRequestAmount;
    }

    public void setMinRequestAmount(int minRequestAmount) {
        this.minRequestAmount = minRequestAmount;
    }

    public int getStatIntervalMs() {
        return statIntervalMs;
    }

    public void setStatIntervalMs(int statIntervalMs) {
        this.statIntervalMs = statIntervalMs;
    }

    public boolean isEnable() {
        return enable;
    }

    public void setEnable(boolean enable) {
        this.enable = enable;
    }
}
5.3 系统规则 (SystemRule)

系统规则用于保护整个系统,防止因资源不足导致系统崩溃。

示例代码:SystemRule 结构
// 系统规则
public class SystemRule {
    // 系统保护模式
    private int mode = 0; // 0 表示基于负载保护,1 表示基于 QPS 保护
    // 系统保护阈值
    private double threshold = 0;
    // 是否启用
    private boolean enable = true;

    // Getters and Setters
    public int getMode() {
        return mode;
    }

    public void setMode(int mode) {
        this.mode = mode;
    }

    public double getThreshold() {
        return threshold;
    }

    public void setThreshold(double threshold) {
        this.threshold = threshold;
    }

    public boolean isEnable() {
        return enable;
    }

    public void setEnable(boolean enable) {
        this.enable = enable;
    }
}

Sentinel 核心组件详解 🔧

1. Slot Chain (责任链)

Slot Chain 是 Sentinel 核心设计思想的体现。它通过责任链模式,将不同的功能(如限流、熔断、系统保护等)模块化地组织起来,每个 Slot 负责处理特定类型的规则。

示例代码:Slot Chain 实现
// Slot 接口
public interface Slot {
    // 处理请求
    void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable;
    // 退出请求
    void exit(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args);
}

// Slot Chain 实现
public class DefaultSlotChain implements SlotChain {
    // Slot 列表
    private final List<Slot> slots = new ArrayList<>();

    public DefaultSlotChain() {
        // 初始化 Slot 列表
        slots.add(new FlowSlot()); // 流量控制
        slots.add(new CircuitBreakerSlot()); // 熔断降级
        slots.add(new SystemSlot()); // 系统保护
        // ... 可以继续添加其他 Slot
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        // 顺序执行每个 Slot 的 entry 方法
        for (Slot slot : slots) {
            slot.entry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) {
        // 逆序执行每个 Slot 的 exit 方法
        for (int i = slots.size() - 1; i >= 0; i--) {
            slots.get(i).exit(context, resourceWrapper, obj, count, prioritized, args);
        }
    }
}

2. FlowSlot (流量控制)

FlowSlot 是负责执行流量控制逻辑的核心 Slot。它会根据预设的流量规则,判断是否应该允许请求通过。

示例代码:FlowSlot 实现
// FlowSlot 实现
public class FlowSlot implements Slot {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        // 获取资源名称
        String resource = resourceWrapper.getName();
        // 获取流量规则
        List<FlowRule> rules = FlowRuleManager.getRulesForResource(resource);

        // 遍历所有规则
        for (FlowRule rule : rules) {
            if (!rule.isEnable()) continue; // 如果规则未启用,则跳过

            // 检查是否满足限流条件
            if (shouldBlock(context, resourceWrapper, rule, count)) {
                // 如果被限流,则抛出 BlockException
                throw new FlowException(resource, rule);
            }
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) {
        // 通常不需要特殊处理,这里只是占位符
    }

    // 判断是否应该阻塞请求
    private boolean shouldBlock(Context context, ResourceWrapper resourceWrapper, FlowRule rule, int count) {
        // 获取当前资源的统计信息
        StatisticNode node = context.getCurNodeList().get(0); // 假设只有一个节点
        if (node == null) {
            return false;
        }

        // 根据不同规则类型进行判断
        switch (rule.getGrade()) {
            case 0: // 线程数模式
                return node.getActiveThreads() >= rule.getCount();
            case 1: // QPS 模式
                return node.getQps() >= rule.getCount();
            default:
                return false;
        }
    }
}

// 流量异常类
public class FlowException extends BlockException {
    public FlowException(String resource, FlowRule rule) {
        super("Flow control triggered for resource: " + resource + ", rule: " + rule.toString());
    }
}

3. CircuitBreakerSlot (熔断降级)

CircuitBreakerSlot 负责执行熔断降级逻辑。它会根据预设的熔断规则,判断服务是否处于熔断状态,如果是,则直接拒绝请求。

示例代码:CircuitBreakerSlot 实现
// CircuitBreakerSlot 实现
public class CircuitBreakerSlot implements Slot {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        // 获取资源名称
        String resource = resourceWrapper.getName();
        // 获取熔断规则
        List<DegradeRule> rules = DegradeRuleManager.getRulesForResource(resource);

        // 遍历所有规则
        for (DegradeRule rule : rules) {
            if (!rule.isEnable()) continue; // 如果规则未启用,则跳过

            // 检查是否需要触发熔断
            if (shouldTrip(context, resourceWrapper, rule)) {
                // 如果触发熔断,则抛出 DegradeException
                throw new DegradeException(resource, rule);
            }
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) {
        // 通常不需要特殊处理,这里只是占位符
    }

    // 判断是否应该触发熔断
    private boolean shouldTrip(Context context, ResourceWrapper resourceWrapper, DegradeRule rule) {
        // 获取当前资源的统计信息
        StatisticNode node = context.getCurNodeList().get(0); // 假设只有一个节点
        if (node == null) {
            return false;
        }

        // 根据不同熔断策略进行判断
        switch (rule.getGrade()) {
            case 0: // 基于 RT 熔断
                // 检查平均响应时间是否超过阈值
                return node.getAvgRt() > rule.getCount();
            case 1: // 基于异常比例熔断
                // 检查错误比例是否超过阈值
                double errorRate = (double) node.getErrorCount() / node.getTotalCount();
                return errorRate > rule.getCount();
            case 2: // 基于异常数熔断
                // 检查错误数是否超过阈值
                return node.getErrorCount() > rule.getCount();
            default:
                return false;
        }
    }
}

// 熔断异常类
public class DegradeException extends BlockException {
    public DegradeException(String resource, DegradeRule rule) {
        super("Circuit breaker triggered for resource: " + resource + ", rule: " + rule.toString());
    }
}

4. SystemSlot (系统保护)

SystemSlot 负责执行系统保护逻辑。它会根据系统规则,判断当前系统负载是否过高,如果过高,则拒绝部分请求,以保护系统。

示例代码:SystemSlot 实现
// SystemSlot 实现
public class SystemSlot implements Slot {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        // 获取系统规则
        List<SystemRule> rules = SystemRuleManager.getRules();

        // 遍历所有规则
        for (SystemRule rule : rules) {
            if (!rule.isEnable()) continue; // 如果规则未启用,则跳过

            // 检查是否满足系统保护条件
            if (shouldBlock(context, rule)) {
                // 如果被系统保护,则抛出 SystemBlockException
                throw new SystemBlockException(rule);
            }
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) {
        // 通常不需要特殊处理,这里只是占位符
    }

    // 判断是否应该阻塞请求
    private boolean shouldBlock(Context context, SystemRule rule) {
        // 根据不同系统保护模式进行判断
        switch (rule.getMode()) {
            case 0: // 基于负载保护
                // 检查系统负载是否超过阈值
                return getCurrentSystemLoad() > rule.getThreshold();
            case 1: // 基于 QPS 保护
                // 检查系统 QPS 是否超过阈值
                return getCurrentSystemQps() > rule.getThreshold();
            default:
                return false;
        }
    }

    // 获取当前系统负载
    private double getCurrentSystemLoad() {
        // 实际实现中,这里会调用操作系统 API 获取负载信息
        // 为了演示,返回一个模拟值
        return 0.8; // 模拟负载为 80%
    }

    // 获取当前系统 QPS
    private double getCurrentSystemQps() {
        // 实际实现中,这里会计算整个系统的 QPS
        // 为了演示,返回一个模拟值
        return 150.0; // 模拟 QPS 为 150
    }
}

// 系统阻塞异常类
public class SystemBlockException extends BlockException {
    public SystemBlockException(SystemRule rule) {
        super("System protection triggered, rule: " + rule.toString());
    }
}

5. Rule Manager (规则管理器)

Rule Manager 负责加载、更新和管理各种规则。它会监听规则数据源的变化,并将最新的规则应用到相应的 Slot 中。

示例代码:Rule Manager 实现
// 规则管理器基类
public abstract class AbstractRuleManager<T extends Rule> {
    // 规则缓存
    protected final Map<String, List<T>> rulesCache = new ConcurrentHashMap<>();

    // 获取指定资源的规则
    public List<T> getRulesForResource(String resource) {
        return rulesCache.getOrDefault(resource, Collections.emptyList());
    }

    // 更新规则
    public void updateRules(List<T> rules) {
        // 清空旧缓存
        rulesCache.clear();
        // 更新缓存
        for (T rule : rules) {
            String resource = rule.getResource();
            rulesCache.computeIfAbsent(resource, k -> new ArrayList<>()).add(rule);
        }
    }

    // 加载规则
    public abstract void loadRules();
}

// 流量规则管理器
public class FlowRuleManager extends AbstractRuleManager<FlowRule> {
    // 单例模式
    private static volatile FlowRuleManager instance;

    private FlowRuleManager() {}

    public static FlowRuleManager getInstance() {
        if (instance == null) {
            synchronized (FlowRuleManager.class) {
                if (instance == null) {
                    instance = new FlowRuleManager();
                }
            }
        }
        return instance;
    }

    @Override
    public void loadRules() {
        // 从配置中心加载规则
        // 这里只是一个示例,实际实现会从 Nacos, Zookeeper 等加载
        List<FlowRule> rules = loadFromConfigCenter();
        updateRules(rules);
    }

    private List<FlowRule> loadFromConfigCenter() {
        // 模拟从配置中心加载规则
        FlowRule rule1 = new FlowRule();
        rule1.setResource("test-resource");
        rule1.setGrade(1); // QPS 模式
        rule1.setCount(5); // 限流阈值为 5 QPS

        FlowRule rule2 = new FlowRule();
        rule2.setResource("test-resource");
        rule2.setGrade(0); // 线程数模式
        rule2.setCount(2); // 限流阈值为 2 线程

        return Arrays.asList(rule1, rule2);
    }
}

// 熔断规则管理器
public class DegradeRuleManager extends AbstractRuleManager<DegradeRule> {
    // 单例模式
    private static volatile DegradeRuleManager instance;

    private DegradeRuleManager() {}

    public static DegradeRuleManager getInstance() {
        if (instance == null) {
            synchronized (DegradeRuleManager.class) {
                if (instance == null) {
                    instance = new DegradeRuleManager();
                }
            }
        }
        return instance;
    }

    @Override
    public void loadRules() {
        // 从配置中心加载规则
        // 这里只是一个示例,实际实现会从 Nacos, Zookeeper 等加载
        List<DegradeRule> rules = loadFromConfigCenter();
        updateRules(rules);
    }

    private List<DegradeRule> loadFromConfigCenter() {
        // 模拟从配置中心加载规则
        DegradeRule rule1 = new DegradeRule();
        rule1.setResource("slow-resource");
        rule1.setGrade(0); // 基于 RT 熔断
        rule1.setCount(1000); // RT 超过 1000ms
        rule1.setTimeWindow(10); // 熔断时间窗口 10 秒

        DegradeRule rule2 = new DegradeRule();
        rule2.setResource("error-resource");
        rule2.setGrade(1); // 基于异常比例熔断
        rule2.setCount(0.5); // 异常比例超过 50%
        rule2.setTimeWindow(5); // 熔断时间窗口 5 秒

        return Arrays.asList(rule1, rule2);
    }
}

// 系统规则管理器
public class SystemRuleManager extends AbstractRuleManager<SystemRule> {
    // 单例模式
    private static volatile SystemRuleManager instance;

    private SystemRuleManager() {}

    public static SystemRuleManager getInstance() {
        if (instance == null) {
            synchronized (SystemRuleManager.class) {
                if (instance == null) {
                    instance = new SystemRuleManager();
                }
            }
        }
        return instance;
    }

    @Override
    public void loadRules() {
        // 从配置中心加载规则
        // 这里只是一个示例,实际实现会从 Nacos, Zookeeper 等加载
        List<SystemRule> rules = loadFromConfigCenter();
        updateRules(rules);
    }

    private List<SystemRule> loadFromConfigCenter() {
        // 模拟从配置中心加载规则
        SystemRule rule1 = new SystemRule();
        rule1.setMode(0); // 基于负载保护
        rule1.setThreshold(1.0); // 负载阈值为 1.0

        SystemRule rule2 = new SystemRule();
        rule2.setMode(1); // 基于 QPS 保护
        rule2.setThreshold(100); // QPS 阈值为 100

        return Arrays.asList(rule1, rule2);
    }
}

限流机制深度剖析 🚦

1. 限流原理

限流是一种控制流量的技术,目的是防止系统过载。Sentinel 支持多种限流模式,主要包括:

  • QPS 限流:限制单位时间内通过的请求数量。
  • 线程数限流:限制同时处理的线程数量。
  • 关联限流:根据关联资源的指标进行限流。
  • 链路限流:根据调用链路的指标进行限流。
  • 匀速排队:控制请求以恒定速率通过。

2. QPS 限流实现

QPS 限流是最常见的限流模式。Sentinel 通过滑动窗口来计算单位时间内的请求数量,并与预设阈值进行比较。

示例代码:QPS 限流逻辑
// QPS 限流实现
public class QpsLimiter {
    // 滑动窗口
    private final SlidingWindow slidingWindow;
    // 限流阈值
    private final double threshold;

    public QpsLimiter(int windowLengthInMs, int sampleCount, double threshold) {
        this.slidingWindow = new SlidingWindow(windowLengthInMs, sampleCount);
        this.threshold = threshold;
    }

    // 尝试获取许可
    public boolean tryAcquire() {
        // 获取当前时间戳
        long now = System.currentTimeMillis();
        // 获取当前 QPS
        double currentQps = slidingWindow.getQps();
        // 如果当前 QPS 超过了阈值,则拒绝
        if (currentQps >= threshold) {
            return false;
        }
        // 如果没有超过阈值,则允许并记录请求
        slidingWindow.addRequest(1, 0, 0); // 假设请求成功
        return true;
    }

    // 获取当前 QPS
    public double getCurrentQps() {
        return slidingWindow.getQps();
    }
}

3. 线程数限流实现

线程数限流则是限制同时处理的线程数量。它通过原子变量或锁来控制并发数。

示例代码:线程数限流逻辑
// 线程数限流实现
public class ThreadLimiter {
    // 当前活跃线程数
    private final AtomicInteger currentThreads = new AtomicInteger(0);
    // 限流阈值
    private final int threshold;

    public ThreadLimiter(int threshold) {
        this.threshold = threshold;
    }

    // 尝试获取许可
    public boolean tryAcquire() {
        int current = currentThreads.get();
        while (current < threshold) {
            if (currentThreads.compareAndSet(current, current + 1)) {
                return true;
            }
            current = currentThreads.get();
        }
        return false;
    }

    // 释放许可
    public void release() {
        currentThreads.decrementAndGet();
    }

    // 获取当前线程数
    public int getCurrentThreads() {
        return currentThreads.get();
    }
}

4. 关联限流实现

关联限流是指根据关联资源的指标来限制当前资源的流量。例如,当某个商品的购买请求过多时,限制该商品详情页的访问。

示例代码:关联限流逻辑
// 关联限流实现
public class AssociatedLimiter {
    // 关联资源的 QPS 限流器
    private final Map<String, QpsLimiter> associatedLimiters = new ConcurrentHashMap<>();

    // 注册关联资源
    public void registerAssociatedResource(String resource, int windowLengthInMs, int sampleCount, double threshold) {
        associatedLimiters.put(resource, new QpsLimiter(windowLengthInMs, sampleCount, threshold));
    }

    // 检查是否应该限流
    public boolean shouldBlock(String targetResource, String associatedResource, double threshold) {
        QpsLimiter limiter = associatedLimiters.get(associatedResource);
        if (limiter == null) {
            return false; // 如果没有关联资源的限流器,则不进行限流
        }
        // 检查关联资源的 QPS
        double associatedQps = limiter.getCurrentQps();
        // 如果关联资源 QPS 超过阈值,则对目标资源进行限流
        return associatedQps >= threshold;
    }
}

5. 匀速排队实现

匀速排队是一种平滑限流策略,它将请求放入一个队列中,按照固定的时间间隔进行处理,从而避免瞬时流量高峰。

示例代码:匀速排队逻辑
// 匀速排队实现
public class RateLimiter {
    // 请求队列
    private final BlockingQueue<Request> queue = new LinkedBlockingQueue<>();
    // 每个请求的处理间隔(毫秒)
    private final long intervalMs;
    // 线程池
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public RateLimiter(long intervalMs) {
        this.intervalMs = intervalMs;
        // 启动处理线程
        startProcessing();
    }

    // 启动处理线程
    private void startProcessing() {
        executor.submit(() -> {
            while (true) {
                try {
                    Request request = queue.take(); // 从队列中取出请求
                    // 处理请求
                    processRequest(request);
                    // 等待下一个请求
                    Thread.sleep(intervalMs);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }

    // 尝试添加请求到队列
    public boolean tryAddRequest(Request request) {
        return queue.offer(request);
    }

    // 处理请求
    private void processRequest(Request request) {
        // 实际处理逻辑
        System.out.println("Processing request: " + request.getId());
    }

    // 请求类
    static class Request {
        private final String id;
        private final long timestamp;

        public Request(String id) {
            this.id = id;
            this.timestamp = System.currentTimeMillis();
        }

        public String getId() {
            return id;
        }

        public long getTimestamp() {
            return timestamp;
        }
    }
}

熔断机制深度剖析 ⚡

1. 熔断原理

熔断是服务容错的一种机制,当服务出现故障时,自动切断对该服务的调用,防止故障扩散,给服务一个恢复的机会。Sentinel 支持多种熔断策略:

  • 基于 RT(响应时间):当平均响应时间超过设定阈值时熔断。
  • 基于异常比例:当异常请求的比例超过设定阈值时熔断。
  • 基于异常数:当异常请求数超过设定阈值时熔断。

2. 熔断状态机

熔断器的状态机是熔断机制的核心。它包含三个状态:

  • CLOSED(关闭):服务正常运行,允许请求通过。
  • OPEN(打开):服务故障,直接拒绝所有请求。
  • HALF-OPEN(半开):在熔断后的一段时间内,允许部分请求通过,以检测服务是否恢复正常。
示例代码:熔断器状态机
// 熔断器状态
public enum CircuitBreakerState {
    CLOSED, // 关闭
    OPEN,   // 打开
    HALF_OPEN // 半开
}

// 熔断器状态机
public class CircuitBreakerStateMachine {
    // 当前状态
    private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;
    // 熔断阈值
    private final double failureThreshold;
    // 熔断时间窗口(秒)
    private final int timeWindowSeconds;
    // 最近一次失败时间戳
    private volatile long lastFailureTime = 0;
    // 连续失败次数
    private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
    // 半开状态下允许的请求数
    private final AtomicInteger allowedRequests = new AtomicInteger(0);

    public CircuitBreakerStateMachine(double failureThreshold, int timeWindowSeconds) {
        this.failureThreshold = failureThreshold;
        this.timeWindowSeconds = timeWindowSeconds;
    }

    // 判断是否应该熔断
    public boolean shouldTrip(long currentTimeMillis, boolean isSuccess) {
        if (isSuccess) {
            // 成功,重置失败计数
            consecutiveFailures.set(0);
            return false;
        } else {
            // 失败,增加失败计数
            int failures = consecutiveFailures.incrementAndGet();
            lastFailureTime = currentTimeMillis;
            // 计算失败率
            double failureRate = (double) failures / (failures + 1); // 简化计算
            return failureRate >= failureThreshold;
        }
    }

    // 判断是否应该允许请求通过
    public boolean allowRequest(long currentTimeMillis) {
        switch (state) {
            case CLOSED:
                return true;
            case OPEN:
                // 检查是否到了熔断时间窗口
                if (currentTimeMillis - lastFailureTime > timeWindowSeconds * 1000) {
                    // 切换到半开状态
                    state = CircuitBreakerState.HALF_OPEN;
                    allowedRequests.set(1); // 允许一个请求尝试
                    return true;
                }
                return false;
            case HALF_OPEN:
                // 半开状态下只允许一个请求通过
                if (allowedRequests.get() > 0) {
                    allowedRequests.decrementAndGet();
                    return true;
                }
                return false;
            default:
                return false;
        }
    }

    // 处理请求结果
    public void handleResult(long currentTimeMillis, boolean isSuccess) {
        if (isSuccess) {
            // 成功,重置状态
            state = CircuitBreakerState.CLOSED;
            consecutiveFailures.set(0);
        } else {
            // 失败,根据当前状态处理
            if (state == CircuitBreakerState.CLOSED) {
                // 如果是关闭状态,检查是否应该熔断
                if (shouldTrip(currentTimeMillis, isSuccess)) {
                    state = CircuitBreakerState.OPEN;
                }
            } else if (state == CircuitBreakerState.HALF_OPEN) {
                // 如果是半开状态,失败则重新熔断
                state = CircuitBreakerState.OPEN;
            }
        }
    }

    // 获取当前状态
    public CircuitBreakerState getState() {
        return state;
    }
}

3. 基于 RT 的熔断实现

基于 RT 的熔断策略关注的是服务的响应时间。当平均响应时间超过设定阈值时,认为服务出现问题。

示例代码:基于 RT 的熔断实现
// 基于 RT 的熔断器
public class RtCircuitBreaker {
    // 滑动窗口,用于统计响应时间
    private final SlidingWindow slidingWindow;
    // RT 阈值(毫秒)
    private final long rtThreshold;
    // 熔断状态机
    private final CircuitBreakerStateMachine stateMachine;

    public RtCircuitBreaker(int windowLengthInMs, int sampleCount, long rtThreshold, int timeWindowSeconds) {
        this.slidingWindow = new SlidingWindow(windowLengthInMs, sampleCount);
        this.rtThreshold = rtThreshold;
        this.stateMachine = new CircuitBreakerStateMachine(0.5, timeWindowSeconds); // 简化设置失败率阈值
    }

    // 尝试执行请求
    public boolean tryExecute(Runnable task) {
        long startTime = System.currentTimeMillis();
        try {
            task.run();
            // 记录成功
            long rt = System.currentTimeMillis() - startTime;
            slidingWindow.addRequest(1, rt, 0); // 0 表示成功
            return true;
        } catch (Exception e) {
            // 记录失败
            long rt = System.currentTimeMillis() - startTime;
            slidingWindow.addRequest(1, rt, 1); // 1 表示失败
            return false;
        } finally {
            // 更新熔断状态
            long currentTime = System.currentTimeMillis();
            // 检查平均 RT
            long avgRt = slidingWindow.getAvgRt();
            if (avgRt > rtThreshold) {
                // 平均 RT 超过阈值,可能需要熔断
                // 这里简化处理,实际会更复杂
            }
        }
    }

    // 获取当前平均 RT
    public long getAvgRt() {
        return slidingWindow.getAvgRt();
    }

    // 获取当前状态
    public CircuitBreakerState getState() {
        return stateMachine.getState();
    }
}

4. 基于异常比例的熔断实现

基于异常比例的熔断策略关注的是服务的错误率。当错误请求的比例超过设定阈值时,认为服务出现问题。

示例代码:基于异常比例的熔断实现
// 基于异常比例的熔断器
public class ExceptionRatioCircuitBreaker {
    // 滑动窗口,用于统计请求和错误
    private final SlidingWindow slidingWindow;
    // 异常比例阈值
    private final double exceptionRatioThreshold;
    // 熔断状态机
    private final CircuitBreakerStateMachine stateMachine;

    public ExceptionRatioCircuitBreaker(int windowLengthInMs, int sampleCount, double exceptionRatioThreshold, int timeWindowSeconds) {
        this.slidingWindow = new SlidingWindow(windowLengthInMs, sampleCount);
        this.exceptionRatioThreshold = exceptionRatioThreshold;
        this.stateMachine = new CircuitBreakerStateMachine(exceptionRatioThreshold, timeWindowSeconds);
    }

    // 尝试执行请求
    public boolean tryExecute(Runnable task) {
        long startTime = System.currentTimeMillis();
        try {
            task.run();
            // 记录成功
            slidingWindow.addRequest(1, 0, 0); // 0 表示成功
            return true;
        } catch (Exception e) {
            // 记录失败
            slidingWindow.addRequest(1, 0, 1); // 1 表示失败
            return false;
        } finally {
            // 更新熔断状态
            long currentTime = System.currentTimeMillis();
            // 计算异常比例
            long total = slidingWindow.getTotalCount();
            long errors = slidingWindow.getErrorCount();
            double ratio = (double) errors / total;
            if (ratio >= exceptionRatioThreshold) {
                // 异常比例超过阈值,可能需要熔断
                // 这里简化处理,实际会更复杂
            }
        }
    }

    // 获取当前异常比例
    public double getExceptionRatio() {
        long total = slidingWindow.getTotalCount();
        long errors = slidingWindow.getErrorCount();
        if (total == 0) return 0.0;
        return (double) errors / total;
    }

    // 获取当前状态
    public CircuitBreakerState getState() {
        return stateMachine.getState();
    }
}

5. 基于异常数的熔断实现

基于异常数的熔断策略关注的是服务的错误总数。当错误请求的数量超过设定阈值时,认为服务出现问题。

示例代码:基于异常数的熔断实现
// 基于异常数的熔断器
public class ExceptionCountCircuitBreaker {
    // 滑动窗口,用于统计错误数
    private final SlidingWindow slidingWindow;
    // 异常数阈值
    private final long exceptionCountThreshold;
    // 熔断状态机
    private final CircuitBreakerStateMachine stateMachine;

    public ExceptionCountCircuitBreaker(int windowLengthInMs, int sampleCount, long exceptionCountThreshold, int timeWindowSeconds) {
        this.slidingWindow = new SlidingWindow(windowLengthInMs, sampleCount);
        this.exceptionCountThreshold = exceptionCountThreshold;
        this.stateMachine = new CircuitBreakerStateMachine(0.5, timeWindowSeconds); // 简化设置失败率阈值
    }

    // 尝试执行请求
    public boolean tryExecute(Runnable task) {
        long startTime = System.currentTimeMillis();
        try {
            task.run();
            // 记录成功
            slidingWindow.addRequest(1, 0, 0); // 0 表示成功
            return true;
        } catch (Exception e) {
            // 记录失败
            slidingWindow.addRequest(1, 0, 1); // 1 表示失败
            return false;
        } finally {
            // 更新熔断状态
            long currentTime = System.currentTimeMillis();
            // 获取错误数
            long errors = slidingWindow.getErrorCount();
            if (errors >= exceptionCountThreshold) {
                // 错误数超过阈值,可能需要熔断
                // 这里简化处理,实际会更复杂
            }
        }
    }

    // 获取当前错误数
    public long getExceptionCount() {
        return slidingWindow.getErrorCount();
    }

    // 获取当前状态
    public CircuitBreakerState getState() {
        return stateMachine.getState();
    }
}

Sentinel 的设计思想与架构哲学 🧠

1. 责任链模式 (Chain of Responsibility Pattern)

责任链模式是 Sentinel 设计思想的核心之一。通过将不同的功能模块(Slot)组织成一个链,每个模块负责处理特定类型的规则,使得系统具有很高的灵活性和可扩展性。

示例代码:责任链模式实现
// 责任链模式示例
interface Handler {
    void handle(Request request);
}

class ConcreteHandlerA implements Handler {
    private Handler next;

    public void setNext(Handler handler) {
        this.next = handler;
    }

    @Override
    public void handle(Request request) {
        if (request.getType() == 1) {
            System.out.println("Handler A processing request: " + request.getContent());
        } else if (next != null) {
            next.handle(request);
        }
    }
}

class ConcreteHandlerB implements Handler {
    private Handler next;

    public void setNext(Handler handler) {
        this.next = handler;
    }

    @Override
    public void handle(Request request) {
        if (request.getType() == 2) {
            System.out.println("Handler B processing request: " + request.getContent());
        } else if (next != null) {
            next.handle(request);
        }
    }
}

class Request {
    private int type;
    private String content;

    public Request(int type, String content) {
        this.type = type;
        this.content = content;
    }

    public int getType() {
        return type;
    }

    public String getContent() {
        return content;
    }
}

// 使用责任链
public class ChainOfResponsibilityDemo {
    public static void main(String[] args) {
        Handler handlerA = new ConcreteHandlerA();
        Handler handlerB = new ConcreteHandlerB();

        handlerA.setNext(handlerB);

        Request req1 = new Request(1, "Request 1");
        Request req2 = new Request(2, "Request 2");
        Request req3 = new Request(3, "Request 3");

        handlerA.handle(req1); // Handler A 处理
        handlerA.handle(req2); // Handler B 处理
        handlerA.handle(req3); // 无处理
    }
}

2. 观察者模式 (Observer Pattern)

观察者模式在 Sentinel 中用于实现规则的动态更新。当规则发生变化时,通知相关的监听器进行更新。

示例代码:观察者模式实现
// 观察者接口
interface Observer {
    void update(String data);
}

// 主题接口
interface Subject {
    void addObserver(Observer observer);
    void removeObserver(Observer observer);
    void notifyObservers(String data);
}

// 具体主题
class RuleSubject implements Subject {
    private List<Observer> observers = new ArrayList<>();

    @Override
    public void addObserver(Observer observer) {
        observers.add(observer);
    }

    @Override
    public void removeObserver(Observer observer) {
        observers.remove(observer);
    }

    @Override
    public void notifyObservers(String data) {
        for (Observer observer : observers) {
            observer.update(data);
        }
    }

    // 规则变更通知
    public void ruleChanged(String newData) {
        notifyObservers(newData);
    }
}

// 具体观察者
class RuleListener implements Observer {
    private String name;

    public RuleListener(String name) {
        this.name = name;
    }

    @Override
    public void update(String data) {
        System.out.println(name + " received rule update: " + data);
    }
}

// 使用观察者模式
public class ObserverPatternDemo {
    public static void main(String[] args) {
        RuleSubject subject = new RuleSubject();
        RuleListener listener1 = new RuleListener("Listener 1");
        RuleListener listener2 = new RuleListener("Listener 2");

        subject.addObserver(listener1);
        subject.addObserver(listener2);

        subject.ruleChanged("New rule applied");
    }
}

3. 单例模式 (Singleton Pattern)

单例模式在 Sentinel 中被广泛应用于规则管理器、统计节点等核心组件,确保全局只有一个实例,避免重复创建和资源浪费。

示例代码:单例模式实现
// 单例模式示例
class Singleton {
    // 私有静态实例
    private static volatile Singleton instance;

    // 私有构造函数
    private Singleton() {}

    // 获取实例(双重检查锁定)
    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }

    public void doSomething() {
        System.out.println("Singleton doing something...");
    }
}

// 使用单例
public class SingletonDemo {
    public static void main(String[] args) {
        Singleton s1 = Singleton.getInstance();
        Singleton s2 = Singleton.getInstance();
        System.out.println(s1 == s2); // true
        s1.doSomething();
    }
}

4. 模板方法模式 (Template Method Pattern)

模板方法模式在 Sentinel 中用于定义算法骨架,具体的步骤由子类实现。例如,不同的 Slot 可以继承基类并实现自己的 entryexit 方法。

示例代码:模板方法模式实现
// 抽象模板类
abstract class AbstractProcessor {
    // 模板方法
    public final void process() {
        preProcess();     // 步骤 1: 预处理
        doProcess();      // 步骤 2: 核心处理 (子类实现)
        postProcess();    // 步骤 3: 后处理
    }

    // 预处理
    protected void preProcess() {
        System.out.println("Pre-processing...");
    }

    // 核心处理 (抽象方法)
    protected abstract void doProcess();

    // 后处理
    protected void postProcess() {
        System.out.println("Post-processing...");
    }
}

// 具体处理器
class FlowProcessor extends AbstractProcessor {
    @Override
    protected void doProcess() {
        System.out.println("Processing flow control logic...");
    }
}

class CircuitBreakerProcessor extends AbstractProcessor {
    @Override
    protected void doProcess() {
        System.out.println("Processing circuit breaker logic...");
    }
}

// 使用模板方法
public class TemplateMethodDemo {
    public static void main(String[] args) {
        AbstractProcessor flowProcessor = new FlowProcessor();
        AbstractProcessor circuitBreakerProcessor = new CircuitBreakerProcessor();

        flowProcessor.process();
        System.out.println("---");
        circuitBreakerProcessor.process();
    }
}

5. 工厂模式 (Factory Pattern)

工厂模式在 Sentinel 中用于创建不同类型的资源包装器、节点等对象,隐藏了对象创建的复杂性。

示例代码:工厂模式实现
// 资源包装器接口
interface ResourceWrapper {
    String getName();
}

// 具体资源包装器
class DefaultResourceWrapper implements ResourceWrapper {
    private String name;

    public DefaultResourceWrapper(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return name;
    }
}

class ClusterResourceWrapper implements ResourceWrapper {
    private String name;

    public ClusterResourceWrapper(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return name;
    }
}

// 资源包装器工厂
class ResourceWrapperFactory {
    public static ResourceWrapper createResourceWrapper(String name, boolean isCluster) {
        if (isCluster) {
            return new ClusterResourceWrapper(name);
        } else {
            return new DefaultResourceWrapper(name);
        }
    }
}

// 使用工厂模式
public class FactoryPatternDemo {
    public static void main(String[] args) {
        ResourceWrapper wrapper1 = ResourceWrapperFactory.createResourceWrapper("test-resource", false);
        ResourceWrapper wrapper2 = ResourceWrapperFactory.createResourceWrapper("cluster-resource", true);

        System.out.println(wrapper1.getName()); // test-resource
        System.out.println(wrapper2.getName()); // cluster-resource
    }
}

性能优化与最佳实践 🚀

1. 数据结构优化

Sentinel 通过精心设计的数据结构来优化性能,例如使用高效的滑动窗口来统计指标,使用原子变量来减少锁竞争。

1.1 滑动窗口优化

滑动窗口是 Sentinel 中用于统计指标的核心数据结构。为了提高性能,Sentinel 采用了多级缓存和懒加载等技术。

1.2 原子变量优化

在并发环境中,使用 AtomicIntegerAtomicLong 等原子变量可以避免锁的竞争,提高并发性能。

2. 缓存与预热

Sentinel 通过缓存规则和统计信息来减少不必要的计算和查询,提升响应速度。

2.1 规则缓存

将规则缓存在内存中,避免每次请求都去查询外部数据源。

2.2 统计缓存

缓存最近的统计信息,避免重复计算。

3. 异步处理

对于一些非关键的统计和上报操作,Sentinel 采用异步处理的方式,避免阻塞主线程。

4. 监控与日志优化

合理的监控和日志策略有助于及时发现和解决问题,但过度的日志输出会影响性能。

4.1 日志级别控制

通过配置日志级别,只记录必要的信息。

4.2 异步日志

使用异步日志框架,减少日志写入对主线程的影响。

5. 配置优化

合理配置 Sentinel 的各项参数,可以显著提升其性能。

5.1 窗口长度与采样数

根据实际业务场景调整滑动窗口的长度和采样数。

5.2 线程池配置

根据系统负载调整线程池的大小。

Sentinel 在实际项目中的应用 🧪

1. 微服务架构中的应用

在微服务架构中,Sentinel 可以部署在每个服务实例中,作为服务治理的一部分,实现服务间的流量控制和熔断。

示例代码:Spring Boot 应用中的 Sentinel 集成
// 在 Spring Boot 应用中集成 Sentinel
@RestController
public class UserController {

    @Autowired
    private UserService userService;

    // 使用 Sentinel 注解进行限流
    @GetMapping("/user/{id}")
    @SentinelResource(value = "getUserById", blockHandler = "handleGetUserBlock")
    public User getUserById(@PathVariable Long id) {
        return userService.findById(id);
    }

    // 限流回调方法
    public User handleGetUserBlock(BlockException ex) {
        // 返回默认值或错误信息
        return new User(-1L, "Blocked by Sentinel");
    }
}

// 配置类
@Configuration
public class SentinelConfig {

    @PostConstruct
    public void init() {
        // 初始化 Sentinel
        InitUtil.init();
        // 加载规则
        FlowRuleManager.loadRules(loadFlowRules());
        DegradeRuleManager.loadRules(loadDegradeRules());
    }

    private List<FlowRule> loadFlowRules() {
        // 加载流量规则
        FlowRule rule = new FlowRule();
        rule.setResource("getUserById");
        rule.setGrade(1); // QPS 模式
        rule.setCount(10); // 限流阈值 10 QPS
        return Collections.singletonList(rule);
    }

    private List<DegradeRule> loadDegradeRules() {
        // 加载熔断规则
        DegradeRule rule = new DegradeRule();
        rule.setResource("getUserById");
        rule.setGrade(0); // 基于 RT 熔断
        rule.setCount(1000); // RT 超过 1000ms
        rule.setTimeWindow(10); // 熔断时间窗口 10 秒
        return Collections.singletonList(rule);
    }
}

2. API 网关中的应用

在 API 网关中,Sentinel 可以用来控制 API 的访问频率和并发数,防止恶意请求和过载攻击。

示例代码:API 网关中的 Sentinel 应用
// API 网关过滤器
@Component
public class SentinelGatewayFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getPath().value();
        String method = request.getMethodValue();

        // 对每个 API 路径应用限流规则
        try {
            Entry entry = SphU.entry(path);
            return chain.filter(exchange).doFinally(signalType -> entry.exit());
        } catch (BlockException e) {
            // 如果被限流,返回 429 Too Many Requests
            ServerHttpResponse response = exchange.getResponse();
            response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
            response.getHeaders().add("Content-Type", "application/json");
            return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"error\":\"Too Many Requests\"}".getBytes())));
        }
    }
}

3. 服务网格中的应用

在服务网格环境中,Sentinel 可以作为服务代理的一部分,实现服务间的流量治理。

总结与展望 🌟

通过本文的深度解析,我们从宏观到微观,从理论到实践,全面地了解了 Sentinel 的核心原理和底层实现机制。从其核心数据结构到关键组件,从限流熔断的实现细节到设计思想,每一部分都展现了 Sentinel 的精妙之处。

Sentinel 的成功在于其模块化设计责任链模式的应用、高性能的数据结构以及灵活的规则管理。这些特性使其不仅能够满足复杂多变的业务需求,还能在高并发场景下保持优异的性能。

未来,随着云原生技术的发展和微服务架构的进一步普及,Sentinel 也将在以下几个方面持续演进:

  • 更强的智能化:通过引入机器学习算法,实现更智能的流量预测和动态调整。
  • 更广泛的生态集成:与更多的云原生工具和平台深度集成,提供更无缝的体验。
  • 更精细化的控制:提供更细粒度的控制选项,满足不同场景下的需求。
  • 更完善的监控体系:提供更丰富的监控指标和更直观的可视化界面。

无论你是刚接触 Sentinel 的新手,还是已经熟练运用它的老手,理解其核心原理都将帮助你更好地利用这个强大的工具,构建更稳定、更可靠的分布式系统。


参考链接:


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐