Sentinel 流控与熔断机制源码剖析
摘要:本文深入剖析了Sentinel的流控与熔断机制源码,重点解析其核心架构设计。Sentinel采用分层架构,基于SlotChain责任链模型实现流控功能,包含NodeSelectorSlot、StatisticSlot等8个核心槽位。其设计哲学强调资源为核心、槽位链可扩展、上下文传播一致性及滑动窗口指标统计。通过DefaultSlotChain实现责任链机制,采用双向处理流程(正向entry/
·
文章目录
🔥 Sentinel 流控与熔断机制源码剖析
→ SlotChain 执行链模型
📋 目录
- 🏗️ 一、Sentinel 整体架构与设计哲学
- ⛓️ 二、SlotChain 责任链机制深度解析
- 📊 三、流控规则与算法实现
- ⚡ 四、熔断降级策略源码剖析
- 🔄 五、动态规则管理与推送机制
- 🌐 六、配置中心集成实战
- 💡 七、生产环境最佳实践
🏗️ 一、Sentinel 整体架构与设计哲学
💡 Sentinel 核心架构图
Sentinel 分层架构设计:
🎯 核心设计理念
Sentinel 设计哲学:
/**
* Sentinel 核心设计理念
* 基于责任链模式的流量控制框架
*/
public class SentinelDesignPhilosophy {
/**
* 1. 资源为核心
* 任何需要流量控制的地方都可以定义为资源
*/
public interface Resource {
String getName();
EntryType getType();
int getResourceType();
}
/**
* 2. 槽位链设计
* 通过责任链模式实现可扩展的流量控制
*/
public interface SlotChain {
void entry(Context context, ResourceWrapper resourceWrapper,
Object obj, int count, boolean prioritized, Object... args) throws BlockException;
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
/**
* 3. 上下文传播
* 保证调用链路的上下文一致性
*/
public class Context {
private String name;
private DefaultNode entranceNode;
private String origin = "";
private Entry curEntry;
}
/**
* 4. 指标统计
* 基于滑动窗口的实时指标收集
*/
public interface Metric {
long success();
long block();
long total();
double avgRt();
}
}
⛓️ 二、SlotChain 责任链机制深度解析
🔗 SlotChain 执行流程
SlotChain 核心执行机制:
/**
* SlotChain 责任链核心实现
* 采用责任链模式处理流量控制逻辑
*/
@Component
@Slf4j
public class DefaultSlotChain implements ProcessorSlotChain {
private AbstractLinkedProcessorSlot<?> first = new DefaultNodeSelectorSlot();
private AbstractLinkedProcessorSlot<?> end = first;
/**
* 入口处理 - 责任链正向执行
*/
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
Object obj, int count, boolean prioritized, Object... args) throws BlockException {
first.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
/**
* 出口处理 - 责任链反向执行
*/
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
first.exit(context, resourceWrapper, count, args);
}
/**
* 添加槽位到链尾
*/
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first);
first = protocolProcessor;
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
/**
* 构建默认槽位链
*/
public static ProcessorSlotChain newSlotChain() {
ProcessorSlotChain chain = new DefaultSlotChain();
// 按顺序添加槽位
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
/**
* 抽象槽位基类
* 定义槽位链的基本执行逻辑
*/
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next;
/**
* 入口处理
*/
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
T param, int count, boolean prioritized, Object... args) throws Throwable {
// 前置处理
if (entryLocal(context, resourceWrapper, param, count, prioritized, args)) {
// 执行下一个槽位
if (next != null) {
next.transformEntry(context, resourceWrapper, param, count, prioritized, args);
}
}
}
/**
* 出口处理
*/
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
// 后置处理
exitLocal(context, resourceWrapper, count, args);
// 执行下一个槽位
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
/**
* 具体的入口处理逻辑 - 由子类实现
*/
protected abstract boolean entryLocal(Context context, ResourceWrapper resourceWrapper,
T param, int count, boolean prioritized, Object... args) throws Throwable;
/**
* 具体的出口处理逻辑 - 由子类实现
*/
protected abstract void exitLocal(Context context, ResourceWrapper resourceWrapper,
int count, Object... args);
}
🎯 核心槽位深度解析
1. NodeSelectorSlot - 节点选择器
/**
* NodeSelectorSlot - 资源节点选择器
* 负责创建调用链路节点树
*/
@Component
@Slf4j
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* 节点缓存
*/
private volatile Map<String, DefaultNode> map = new HashMap<>();
@Override
protected boolean entryLocal(Context context, ResourceWrapper resourceWrapper,
Object param, int count, boolean prioritized, Object... args) throws Throwable {
// 1. 获取或创建当前资源的节点
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
Map<String, DefaultNode> newMap = new HashMap<>(map);
newMap.put(context.getName(), node);
map = newMap;
}
}
}
// 2. 设置当前入口节点
context.setCurEntry(new Entry(resourceWrapper, node, context));
// 3. 构建调用链节点关系
buildNodeTree(context, node);
return true;
}
/**
* 构建调用链节点树
*/
private void buildNodeTree(Context context, DefaultNode node) {
if (context.getCurEntry().getParent() != null) {
// 设置父子节点关系
((DefaultNode) context.getCurEntry().getParent().getCurNode())
.addChild(node);
}
}
@Override
protected void exitLocal(Context context, ResourceWrapper resourceWrapper,
int count, Object... args) {
// 清理上下文
if (context.getCurEntry() != null) {
context.getCurEntry().exit(count, args);
}
}
}
2. ClusterBuilderSlot - 集群构建器
/**
* ClusterBuilderSlot - 集群节点构建器
* 负责集群流量统计和限流
*/
@Component
@Slf4j
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private static volatile ClusterNode clusterNode = null;
private final ClusterNodeManager clusterNodeManager;
@Override
protected boolean entryLocal(Context context, ResourceWrapper resourceWrapper,
DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
// 1. 获取或创建集群节点
if (clusterNode == null) {
synchronized (this) {
if (clusterNode == null) {
clusterNode = clusterNodeManager.getClusterNode(resourceWrapper.getName());
}
}
}
// 2. 设置集群节点
node.setClusterNode(clusterNode);
// 3. 集群流量统计
if (!"".equals(context.getOrigin())) {
// 按调用来源统计
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
// 4. 应用集群流控规则
applyClusterFlowRule(context, resourceWrapper, clusterNode, count, args);
return true;
}
/**
* 应用集群流控规则
*/
private void applyClusterFlowRule(Context context, ResourceWrapper resourceWrapper,
ClusterNode clusterNode, int count, Object... args) throws BlockException {
// 获取集群流控规则
List<FlowRule> rules = FlowRuleManager.getRulesForResource(resourceWrapper.getName());
for (FlowRule rule : rules) {
if (!rule.passClusterCheck(clusterNode, count)) {
// 触发流控
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
3. StatisticSlot - 统计槽位
/**
* StatisticSlot - 统计指标槽位
* 负责实时指标统计和滑动窗口管理
*/
@Component
@Slf4j
public class StatisticSlot extends AbstractLinkedProcessorSlot<Object> {
private final Metric metric;
private final int sampleCount;
private final int intervalInMs;
public StatisticSlot() {
this.sampleCount = 2; // 采样窗口数
this.intervalInMs = 1000; // 统计窗口间隔
this.metric = new ArrayMetric(sampleCount, intervalInMs);
}
@Override
protected boolean entryLocal(Context context, ResourceWrapper resourceWrapper,
Object param, int count, boolean prioritized, Object... args) throws Throwable {
long startTime = System.currentTimeMillis();
try {
// 1. 触发下一个槽位
boolean result = fireEntry(context, resourceWrapper, param, count, prioritized, args);
// 2. 成功请求统计
if (result) {
metric.addSuccess(count);
metric.addRt(System.currentTimeMillis() - startTime);
// 记录通过请求
log.debug("请求通过统计: resource={}, count={}",
resourceWrapper.getName(), count);
}
return result;
} catch (BlockException e) {
// 3. 被阻塞请求统计
metric.addBlock(count);
throw e;
} catch (Throwable e) {
// 4. 业务异常统计
metric.addException(count);
throw e;
}
}
/**
* 触发下一个槽位执行
*/
private boolean fireEntry(Context context, ResourceWrapper resourceWrapper,
Object param, int count, boolean prioritized, Object... args) throws Throwable {
// 实际的槽位链执行逻辑
return true;
}
@Override
protected void exitLocal(Context context, ResourceWrapper resourceWrapper,
int count, Object... args) {
// 出口统计
long completeTime = System.currentTimeMillis();
Entry entry = context.getCurEntry();
if (entry != null) {
long rt = completeTime - entry.getCreateTime();
metric.addRt(rt);
}
}
/**
* 数组指标统计实现
* 基于滑动窗口的实时统计
*/
private static class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
@Override
public long success() {
data.currentWindow().value().addSuccess(1);
return data.values().stream()
.mapToLong(MetricBucket::success)
.sum();
}
@Override
public long total() {
return success() + block() + exception();
}
@Override
public double avgRt() {
long successCount = success();
if (successCount == 0) {
return 0;
}
return data.values().stream()
.mapToLong(MetricBucket::rt)
.sum() / (double) successCount;
}
}
}
📊 三、流控规则与算法实现
🎯 流控规则体系
流控规则类结构:
/**
* 流控规则体系
* 支持多种流控策略和算法
*/
public class FlowRuleManager {
/**
* 流控规则基类
*/
@Data
public static class FlowRule {
private String resource; // 资源名称
private double count; // 阈值
private int grade = GRADE_QPS; // 阈值类型
private int strategy = STRATEGY_DIRECT; // 流控策略
private String limitApp = "default"; // 流控应用
private int controlBehavior = CONTROL_BEHAVIOR_DEFAULT; // 控制效果
// 阈值类型常量
public static final int GRADE_QPS = 1; // QPS 流控
public static final int GRADE_THREAD = 0; // 线程数流控
// 流控策略常量
public static final int STRATEGY_DIRECT = 0; // 直接
public static final int STRATEGY_RELATE = 1; // 关联
public static final int STRATEGY_CHAIN = 2; // 链路
// 控制效果常量
public static final int CONTROL_BEHAVIOR_DEFAULT = 0; // 快速失败
public static final int CONTROL_BEHAVIOR_WARM_UP = 1; // 预热
public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2; // 排队等待
}
/**
* 流控检查器
*/
@Component
@Slf4j
public static class FlowRuleChecker {
/**
* 检查流控规则
*/
public void checkFlow(FlowRule rule, Context context, DefaultNode node, int count)
throws BlockException {
// 根据流控策略选择检查方法
switch (rule.getStrategy()) {
case FlowRule.STRATEGY_DIRECT:
checkDirectFlow(rule, node, count);
break;
case FlowRule.STRATEGY_RELATE:
checkRelateFlow(rule, context, count);
break;
case FlowRule.STRATEGY_CHAIN:
checkChainFlow(rule, context, count);
break;
default:
checkDirectFlow(rule, node, count);
}
}
/**
* 直接流控检查
*/
private void checkDirectFlow(FlowRule rule, DefaultNode node, int count)
throws BlockException {
// 根据阈值类型检查
if (rule.getGrade() == FlowRule.GRADE_QPS) {
// QPS 流控
if (!passQpsCheck(rule, node, count)) {
throw new FlowException(rule.getLimitApp(), rule);
}
} else {
// 线程数流控
if (!passThreadCheck(rule, node)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
/**
* QPS 流控检查
*/
private boolean passQpsCheck(FlowRule rule, DefaultNode node, int count) {
double qps = node.avgQps();
double threshold = rule.getCount();
// 应用控制效果
switch (rule.getControlBehavior()) {
case FlowRule.CONTROL_BEHAVIOR_WARM_UP:
// 预热模式
threshold = calculateWarmUpThreshold(rule, node);
break;
case FlowRule.CONTROL_BEHAVIOR_RATE_LIMITER:
// 速率限制器模式
return passRateLimiterCheck(rule, node, count);
default:
// 快速失败模式
break;
}
return qps + count <= threshold;
}
/**
* 预热算法实现
*/
private double calculateWarmUpThreshold(FlowRule rule, DefaultNode node) {
long uptime = System.currentTimeMillis() - node.getStartTime();
long warmupPeriod = rule.getWarmUpPeriodSec() * 1000;
if (uptime > warmupPeriod) {
return rule.getCount();
} else {
// 冷启动阶段,逐步提高阈值
double coldFactor = rule.getWarmUpColdFactor();
if (coldFactor <= 1) {
coldFactor = 3; // 默认冷启动因子
}
double threshold = rule.getCount() / coldFactor;
double slope = (rule.getCount() - threshold) / warmupPeriod;
return threshold + slope * uptime;
}
}
}
}
⚡ 滑动窗口算法
高性能滑动窗口实现:
/**
* 滑动窗口算法实现
* 基于 LeapArray 的高性能统计窗口
*/
@Component
@Slf4j
public class LeapArray<T> {
private final int windowLengthInMs; // 窗口长度
private final int sampleCount; // 采样窗口数
private final int intervalInMs; // 统计间隔
private final AtomicReferenceArray<WindowWrap<T>> array; // 窗口数组
/**
* 窗口包装类
*/
@Data
public static class WindowWrap<T> {
private final long windowLength; // 窗口长度
private long windowStart; // 窗口开始时间
private T value; // 窗口值
/**
* 重置窗口
*/
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
/**
* 检查窗口是否过期
*/
public boolean isTimeInWindow(long time) {
return windowStart <= time && time < windowStart + windowLength;
}
}
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.sampleCount = sampleCount;
this.intervalInMs = intervalInMs;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* 获取当前时间窗口
*/
public WindowWrap<T> currentWindow() {
return currentWindow(System.currentTimeMillis());
}
/**
* 获取指定时间对应的窗口
*/
public WindowWrap<T> currentWindow(long time) {
if (time < 0) {
return null;
}
int idx = calculateTimeIdx(time);
long windowStart = calculateWindowStart(time);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建新窗口
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(time));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.getWindowStart()) {
// 返回现有窗口
return old;
} else if (windowStart > old.getWindowStart()) {
// 重置过期窗口
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.getWindowStart()) {
// 不应该发生的情况
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(time));
}
}
}
/**
* 计算时间索引
*/
private int calculateTimeIdx(long time) {
long timeId = time / windowLengthInMs;
return (int) (timeId % array.length());
}
/**
* 计算窗口开始时间
*/
private long calculateWindowStart(long time) {
return time - time % windowLengthInMs;
}
/**
* 获取所有有效窗口
*/
public List<WindowWrap<T>> list() {
return list(System.currentTimeMillis());
}
public List<WindowWrap<T>> list(long validTime) {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> window = array.get(i);
if (window == null || isWindowDeprecated(validTime, window)) {
continue;
}
result.add(window);
}
return result;
}
/**
* 检查窗口是否过期
*/
private boolean isWindowDeprecated(long time, WindowWrap<T> window) {
return time - window.getWindowStart() > intervalInMs;
}
}
⚡ 四、熔断降级策略源码剖析
🔄 熔断器状态机
熔断器核心实现:
/**
* 熔断器状态机
* 基于响应时间和异常比例的熔断降级
*/
@Component
@Slf4j
public class CircuitBreaker {
/**
* 熔断器状态
*/
public enum Status {
CLOSED, // 关闭状态 - 正常通行
OPEN, // 打开状态 - 快速失败
HALF_OPEN // 半开状态 - 试探性放行
}
/**
* 熔断规则
*/
@Data
public static class DegradeRule {
private String resource; // 资源名称
private double count; // 阈值
private int grade = GRADE_RT; // 降级策略
private int timeWindow; // 时间窗口(秒)
private int minRequestAmount = 5; // 最小请求数
private double slowRatioThreshold = 1.0; // 慢调用比例阈值
private int statIntervalMs = 1000; // 统计间隔
// 降级策略常量
public static final int GRADE_RT = 0; // 响应时间
public static final int GRADE_EXCEPTION_RATIO = 1; // 异常比例
public static final int GRADE_EXCEPTION_COUNT = 2; // 异常数量
}
/**
* 熔断器实现
*/
@Component
@Slf4j
public static class DefaultCircuitBreaker {
private volatile Status status = Status.CLOSED;
private final DegradeRule rule;
private final Metric metric;
private long nextRetryTime = 0L;
public DefaultCircuitBreaker(DegradeRule rule) {
this.rule = rule;
this.metric = new ArrayMetric(2, 1000);
}
/**
* 请求是否允许通过
*/
public boolean canPass() {
// 检查熔断器状态
if (status == Status.OPEN) {
// 检查是否应该尝试半开
if (System.currentTimeMillis() > nextRetryTime) {
status = Status.HALF_OPEN;
log.info("熔断器进入半开状态: {}", rule.getResource());
return true;
}
return false;
}
if (status == Status.HALF_OPEN) {
// 半开状态,允许部分请求通过
return Math.random() < 0.5; // 50% 的通过率
}
return true; // 关闭状态,允许通过
}
/**
* 记录请求结果
*/
public void onRequestComplete(long rt, Throwable error) {
metric.addRt(rt);
metric.addTotal(1);
if (error != null) {
metric.addException(1);
}
// 根据当前状态处理
switch (status) {
case CLOSED:
handleClosedState();
break;
case HALF_OPEN:
handleHalfOpenState(rt, error);
break;
case OPEN:
// 打开状态不处理
break;
}
}
/**
* 处理关闭状态
*/
private void handleClosedState() {
// 检查是否应该打开熔断器
if (shouldOpen()) {
status = Status.OPEN;
nextRetryTime = System.currentTimeMillis() + rule.getTimeWindow() * 1000;
log.warn("熔断器打开: {}", rule.getResource());
}
}
/**
* 处理半开状态
*/
private void handleHalfOpenState(long rt, Throwable error) {
if (error == null && rt < rule.getCount()) {
// 试探请求成功
metric.reset();
status = Status.CLOSED;
log.info("熔断器关闭: {}", rule.getResource());
} else {
// 试探请求失败,重新打开
status = Status.OPEN;
nextRetryTime = System.currentTimeMillis() + rule.getTimeWindow() * 1000;
log.warn("熔断器重新打开: {}", rule.getResource());
}
}
/**
* 检查是否应该打开熔断器
*/
private boolean shouldOpen() {
// 检查最小请求数
if (metric.total() < rule.getMinRequestAmount()) {
return false;
}
// 根据降级策略检查
switch (rule.getGrade()) {
case DegradeRule.GRADE_RT:
return checkResponseTime();
case DegradeRule.GRADE_EXCEPTION_RATIO:
return checkExceptionRatio();
case DegradeRule.GRADE_EXCEPTION_COUNT:
return checkExceptionCount();
default:
return false;
}
}
/**
* 检查响应时间
*/
private boolean checkResponseTime() {
double slowCount = metric.total() - metric.success();
double ratio = slowCount / metric.total();
return ratio > rule.getSlowRatioThreshold() &&
metric.avgRt() > rule.getCount();
}
/**
* 检查异常比例
*/
private boolean checkExceptionRatio() {
if (metric.total() == 0) {
return false;
}
double ratio = (double) metric.exception() / metric.total();
return ratio > rule.getCount();
}
}
}
🔄 五、动态规则管理与推送机制
📡 规则管理器实现
动态规则管理:
/**
* 规则管理器
* 负责规则的加载、更新和推送
*/
@Component
@Slf4j
public class RuleManager {
private final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<>();
private final Map<String, List<DegradeRule>> degradeRules = new ConcurrentHashMap<>();
private final RulePublisher rulePublisher;
/**
* 加载流控规则
*/
public void loadRules(List<FlowRule> rules) {
if (rules == null) {
return;
}
// 按资源分组
Map<String, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
for (FlowRule rule : rules) {
String resource = rule.getResource();
newRuleMap.computeIfAbsent(resource, k -> new CopyOnWriteArrayList<>())
.add(rule);
}
// 原子性更新规则
flowRules.clear();
flowRules.putAll(newRuleMap);
log.info("流控规则加载完成: {} 条规则", rules.size());
}
/**
* 获取资源的流控规则
*/
public List<FlowRule> getRules(String resource) {
return flowRules.getOrDefault(resource, Collections.emptyList());
}
/**
* 添加规则监听器
*/
public void addFlowRuleListener(PropertyListener<List<FlowRule>> listener) {
// 添加规则变更监听
}
}
/**
* 规则发布器
* 负责规则的动态推送和更新
*/
@Component
@Slf4j
public class RulePublisher {
private final List<RuleListener> listeners = new CopyOnWriteArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 规则监听器接口
*/
public interface RuleListener {
void onRuleChanged(String ruleType, List<?> rules);
}
/**
* 发布规则变更
*/
public void publishRules(String ruleType, List<?> rules) {
for (RuleListener listener : listeners) {
try {
listener.onRuleChanged(ruleType, rules);
log.debug("规则变更通知成功: {}", ruleType);
} catch (Exception e) {
log.error("规则变更通知失败", e);
}
}
}
/**
* 添加规则监听器
*/
public void addListener(RuleListener listener) {
listeners.add(listener);
}
/**
* 启动规则推送任务
*/
public void startPublishTask() {
scheduler.scheduleAtFixedRate(() -> {
try {
// 定期推送规则到客户端
pushRulesToClients();
} catch (Exception e) {
log.error("规则推送任务执行失败", e);
}
}, 0, 30, TimeUnit.SECONDS); // 30秒推送一次
}
}
🌐 六、配置中心集成实战
🔄 Nacos 集成配置
Sentinel 与 Nacos 集成:
/**
* Sentinel 与 Nacos 集成配置
* 实现规则的动态推送和持久化
*/
@Configuration
@Slf4j
public class NacosIntegrationConfig {
/**
* Nacos 数据源配置
*/
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.datasource.nacos.enabled", havingValue = "true")
public DataSource nacosDataSource() {
NacosDataSource<List<FlowRule>> flowRuleDataSource = new NacosDataSource<>(
properties.getNacos().getServerAddr(),
properties.getNacos().getGroupId(),
properties.getNacos().getDataId(),
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseArray(source, FlowRule.class);
}
}
);
flowRuleDataSource.getProperty().addListener(new PropertyListener<List<FlowRule>>() {
@Override
public void configUpdate(List<FlowRule> value) {
// 规则更新回调
log.info("从Nacos接收到规则更新: {} 条规则", value.size());
FlowRuleManager.loadRules(value);
}
@Override
public void configLoad(List<FlowRule> value) {
// 规则加载回调
log.info("从Nacos加载规则: {} 条规则", value.size());
FlowRuleManager.loadRules(value);
}
});
return flowRuleDataSource;
}
/**
* Apollo 数据源配置
*/
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.datasource.apollo.enabled", havingValue = "true")
public DataSource apolloDataSource() {
ApolloDataSource<List<DegradeRule>> degradeRuleDataSource = new ApolloDataSource<>(
properties.getApollo().getNamespace(),
properties.getApollo().getFlowRulesKey(),
"",
new Converter<String, List<DegradeRule>>() {
@Override
public List<DegradeRule> convert(String source) {
return JSON.parseArray(source, DegradeRule.class);
}
}
);
return degradeRuleDataSource;
}
}
/**
* 动态数据源管理器
*/
@Component
@Slf4j
public class DynamicDataSourceManager {
private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
/**
* 注册数据源
*/
public void registerDataSource(String dataSourceName, DataSource dataSource) {
dataSourceMap.put(dataSourceName, dataSource);
log.info("数据源注册成功: {}", dataSourceName);
}
/**
* 从所有数据源加载规则
*/
public void loadRulesFromDataSources() {
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
try {
String ruleType = entry.getKey();
DataSource dataSource = entry.getValue();
// 加载规则
Object rules = dataSource.loadRules();
if (rules != null) {
publishRules(ruleType, rules);
}
} catch (Exception e) {
log.error("从数据源加载规则失败: {}", entry.getKey(), e);
}
}
}
}
💡 七、生产环境最佳实践
🔧 高可用配置
生产环境 Sentinel 配置:
# application-prod.yml
spring:
cloud:
sentinel:
# 基础配置
eager: true
transport:
dashboard: ${SENTINEL_DASHBOARD:localhost}:8080
port: 8719
client-ip: ${spring.cloud.client.ip-address}
# 过滤器配置
filter:
enabled: true
url-patterns: /**
order: -2147483648
# 数据源配置
datasource:
# 流控规则数据源
flow:
nacos:
server-addr: ${NACOS_SERVER:localhost}:8848
dataId: ${spring.application.name}-flow-rules
groupId: SENTINEL_GROUP
rule-type: flow
data-type: json
# 降级规则数据源
degrade:
nacos:
server-addr: ${NACOS_SERVER:localhost}:8848
dataId: ${spring.application.name}-degrade-rules
groupId: SENTINEL_GROUP
rule-type: degrade
data-type: json
# 系统规则数据源
system:
nacos:
server-addr: ${NACOS_SERVER:localhost}:8848
dataId: ${spring.application.name}-system-rules
groupId: SENTINEL_GROUP
rule-type: system
data-type: json
# 流控配置
flow:
cold-factor: 3 # 冷启动因子
max-queueing-time-ms: 500 # 最大排队时间
# 日志配置
log:
dir: /logs/sentinel
switch-pid: true
max-file-size: 104857600 # 100MB
max-file-count: 10
# 监控配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,sentinel
endpoint:
sentinel:
enabled: true
metrics:
export:
sentinel:
enabled: true
step: 1m
# 自定义配置
sentinel:
# 统计配置
metric:
file-single-size: 52428800 # 50MB
file-total-count: 6 # 最多6个文件
# 系统规则默认值
system:
max-thread: 1000
max-qps: 5000
avg-rt: 1000
max-load: 8.0
🚀 性能优化建议
高性能配置优化:
/**
* Sentinel 性能优化配置
*/
@Configuration
@Slf4j
public class SentinelPerformanceConfig {
/**
* 自定义资源注解切面
* 优化注解方式的性能开销
*/
@Aspect
@Component
@Slf4j
public class SentinelResourceAspect {
@Around("@annotation(sentinelResource)")
public Object aroundAdvice(ProceedingJoinPoint pjp,
SentinelResource sentinelResource) throws Throwable {
String resourceName = getResourceName(pjp, sentinelResource);
Entry entry = null;
try {
// 入口检查
entry = SphU.entry(resourceName,
sentinelResource.entryType(),
sentinelResource.resourceType(),
pjp.getArgs());
// 执行原方法
Object result = pjp.proceed();
// 记录成功
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
return result;
} catch (BlockException e) {
// 流控异常处理
return handleBlockException(pjp, sentinelResource, e);
} catch (Throwable e) {
// 业务异常处理
if (entry != null) {
Tracer.trace(e);
}
throw e;
}
}
}
/**
* Ͳ自定义上下文管理器
* 优化上下文传播性能
*/
@Component
@Slf4j
public class CustomContextManager {
private static final ThreadLocal<Context> contextThreadLocal = new ThreadLocal<>();
/**
* 入口上下文设置
*/
public static void enter(String name, String origin) {
if (contextThreadLocal.get() == null) {
Context context = new Context(name, origin);
contextThreadLocal.set(context);
}
}
/**
* 出口上下文清理
*/
public static void exit() {
Context context = contextThreadLocal.get();
if (context != null) {
contextThreadLocal.remove();
}
}
}
/**
* 异步支持配置
*/
@Configuration
@Slf4j
public class AsyncSupportConfig {
@Bean
public AsyncTaskExecutor sentinelAsyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("sentinel-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
/**
* 异步调用支持
*/
@Async("sentinelAsyncTaskExecutor")
public <T> CompletableFuture<T> executeWithSentinel(Supplier<T> supplier, String resourceName) {
Entry entry = null;
try {
entry = SphU.entry(resourceName);
T result = supplier.get();
return CompletableFuture.completedFuture(result);
} catch (BlockException e) {
return CompletableFuture.failedFuture(e);
} finally {
if (entry != null) {
entry.exit();
}
}
}
}
}
洞察:Sentinel 的核心价值在于其灵活的 SlotChain 责任链设计和高效的滑动窗口统计机制。深入理解其内部执行流程和算法实现,是构建高可用、高性能流量控制系统的关键。
如果觉得本文对你有帮助,请点击 👍 点赞 + ⭐ 收藏 + 💬 留言支持!
讨论话题:
- 你在生产环境中如何配置 Sentinel 的流控规则?
- 面对突发流量,如何设计合理的熔断降级策略?
- 在微服务架构中,如何实现全局流量控制?
相关资源推荐:
- 📚 https://github.com/alibaba/Sentinel/wiki
- 🔧 https://github.com/example/sentinel-deep-dive
- 💻 https://github.com/alibaba/Sentinel/wiki/Dashboard
。
更多推荐

所有评论(0)