架构之熔断

定义

熔断法则(Circuit Breaker Pattern)是一种设计模式,用于防止系统在分布式环境中因某个服务的故障而引发级联故障(Cascade Failure)。该模式源自电子电路中的熔断器概念,当电路中出现过载时,熔断器会自动断开以保护整个电路。

在软件架构中,熔断器监控对下游服务的调用,当检测到故障率达到一定阈值时,熔断器会"打开",快速失败并阻止后续请求,从而保护系统资源,避免故障扩散。

核心原理

2.1 熔断器的三种状态

故障率超过阈值

等待超时时间结束

探测请求成功

探测请求失败

达到成功计数

冷却时间结束且探测成功

Closed

Open

HalfOpen

2.1.1 关闭状态(Closed)
  • 描述:熔断器的初始状态,正常工作状态
  • 行为:所有请求正常通过,熔断器统计成功和失败的请求数
  • 判定:当失败率超过预设阈值时,熔断器转换为打开状态
2.1.2 打开状态(Open)
  • 描述:熔断器已触发,阻止请求通过
  • 行为:所有请求立即返回失败或降级响应,不实际调用下游服务
  • 判定:经过预设的冷却时间(Wait Duration)后,熔断器转换为半开状态
2.1.3 半开状态(Half-Open)
  • 描述:探测状态,用于检测下游服务是否已恢复
  • 行为:允许少量请求通过,用于探测服务是否恢复正常
  • 判定
    • 如果探测请求成功:转换为关闭状态
    • 如果探测请求失败:重新转换为打开状态

2.2 核心指标

指标 说明 典型配置
失败率阈值 触发熔断的失败比例 50%-80%
滑动窗口大小 统计失败率的样本数量 10-100个请求
最小请求数 开始统计的最小请求数 5-20个请求
冷却时间 打开状态转为半开的等待时间 10-60秒
半开状态请求数 探测阶段允许的请求数 3-10个请求

实现模式

3.1 基于计数器的实现

public class CircuitBreaker {
    private enum State { CLOSED, OPEN, HALF_OPEN }
    
    private State state = State.CLOSED;
    private int failureCount = 0;
    private int successCount = 0;
    private int threshold = 5;           // 失败阈值
    private int minRequests = 10;        // 最小请求数
    private long waitDuration = 60000;   // 冷却时间(ms)
    private long lastFailureTime = 0;
    private int halfOpenMaxCalls = 3;    // 半开状态最大调用数
    private int halfOpenCallCount = 0;
    
    public synchronized void recordSuccess() {
        if (state == State.HALF_OPEN) {
            successCount++;
            halfOpenCallCount++;
            if (successCount >= halfOpenMaxCalls) {
                reset();
                state = State.CLOSED;
            }
        } else if (state == State.CLOSED) {
            failureCount = 0;
        }
    }
    
    public synchronized void recordFailure() {
        failureCount++;
        lastFailureTime = System.currentTimeMillis();
        
        if (state == State.HALF_OPEN) {
            state = State.OPEN;
            halfOpenCallCount = 0;
        } else if (state == State.CLOSED && 
                   failureCount >= threshold && 
                   getTotalRequests() >= minRequests) {
            state = State.OPEN;
        }
    }
    
    public synchronized boolean allowRequest() {
        if (state == State.CLOSED) {
            return true;
        }
        
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime >= waitDuration) {
                state = State.HALF_OPEN;
                halfOpenCallCount = 0;
                successCount = 0;
                return true;
            }
            return false;
        }
        
        if (state == State.HALF_OPEN) {
            return halfOpenCallCount < halfOpenMaxCalls;
        }
        
        return false;
    }
    
    private synchronized void reset() {
        failureCount = 0;
        successCount = 0;
        halfOpenCallCount = 0;
    }
    
    private int getTotalRequests() {
        // 实际实现中需要维护总请求数
        return failureCount + successCount;
    }
}

3.2 基于滑动窗口的实现

import java.util.LinkedList;
import java.util.Queue;

public class SlidingWindowCircuitBreaker {
    private enum State { CLOSED, OPEN, HALF_OPEN }
    
    private State state = State.CLOSED;
    private final Queue<Boolean> requestResults = new LinkedList<>();
    private final int windowSize = 100;      // 滑动窗口大小
    private final double failureThreshold = 0.5;  // 失败率阈值 50%
    private final long waitDuration = 30000;      // 冷却时间
    private long lastFailureTime = 0;
    private final int halfOpenMaxCalls = 5;
    private int halfOpenCallCount = 0;
    
    public synchronized void recordResult(boolean success) {
        if (state == State.CLOSED) {
            requestResults.offer(success);
            if (requestResults.size() > windowSize) {
                requestResults.poll();
            }
            
            if (getFailureRate() >= failureThreshold && 
                requestResults.size() >= 10) {
                state = State.OPEN;
                lastFailureTime = System.currentTimeMillis();
            }
        } else if (state == State.HALF_OPEN) {
            halfOpenCallCount++;
            if (success) {
                if (halfOpenCallCount >= halfOpenMaxCalls) {
                    reset();
                    state = State.CLOSED;
                }
            } else {
                state = State.OPEN;
                lastFailureTime = System.currentTimeMillis();
                halfOpenCallCount = 0;
            }
        }
    }
    
    public synchronized boolean allowRequest() {
        if (state == State.CLOSED) {
            return true;
        }
        
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime >= waitDuration) {
                state = State.HALF_OPEN;
                halfOpenCallCount = 0;
                return true;
            }
            return false;
        }
        
        if (state == State.HALF_OPEN) {
            return halfOpenCallCount < halfOpenMaxCalls;
        }
        
        return false;
    }
    
    private double getFailureRate() {
        if (requestResults.isEmpty()) return 0;
        
        long failures = requestResults.stream().filter(r -> !r).count();
        return (double) failures / requestResults.size();
    }
    
    private void reset() {
        requestResults.clear();
        halfOpenCallCount = 0;
    }
}

3.3 使用 Resilience4j 实现

Resilience4j 是 Java 生态中广泛使用的容错库,提供了完善的熔断器实现。

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.time.Duration;

public class Resilience4jExample {
    
    public CircuitBreaker createCircuitBreaker() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)                    // 失败率阈值 50%
            .waitDurationInOpenState(Duration.ofSeconds(30))  // 冷却时间 30秒
            .slidingWindowSize(100)                     // 滑动窗口大小
            .minimumNumberOfCalls(10)                    // 最小调用次数
            .permittedNumberOfCallsInHalfOpenState(5)    // 半开状态允许调用数
            .slowCallRateThreshold(50)                   // 慢调用率阈值
            .slowCallDurationThreshold(Duration.ofSeconds(2))  // 慢调用阈值 2秒
            .build();
        
        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
        return registry.circuitBreaker("userService");
    }
    
    public String callWithCircuitBreaker(CircuitBreaker circuitBreaker) {
        return CircuitBreaker.decorateSupplier(circuitBreaker, () -> {
            // 实际的远程服务调用
            return remoteServiceCall();
        }).get();
    }
    
    private String remoteServiceCall() {
        // 模拟远程调用
        return "Service Response";
    }
}

3.4 使用 Hystrix 实现(已停止维护,供参考)

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

public class HystrixExample {
    
    public class RemoteServiceCommand extends HystrixCommand<String> {
        private final String serviceUrl;
        
        protected RemoteServiceCommand(String serviceUrl) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceGroup"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                    .withCircuitBreakerEnabled(true)
                    .withCircuitBreakerRequestVolumeThreshold(10)      // 最小请求数
                    .withCircuitBreakerSleepWindowInMilliseconds(30000) // 冷却时间
                    .withCircuitBreakerErrorThresholdPercentage(50)    // 失败率阈值
                    .withExecutionTimeoutEnabled(true)
                    .withExecutionTimeoutInMilliseconds(5000)));      // 超时时间
            
            this.serviceUrl = serviceUrl;
        }
        
        @Override
        protected String run() throws Exception {
            // 实际的远程服务调用
            return callRemoteService(serviceUrl);
        }
        
        @Override
        protected String getFallback() {
            // 降级逻辑
            return "Fallback Response";
        }
        
        private String callRemoteService(String url) {
            // 实现远程调用逻辑
            return "Remote Response";
        }
    }
}

使用场景

4.1 典型应用场景

场景 说明
微服务调用 服务A调用服务B,防止B故障影响A
第三方API集成 调用外部API时,防止外部服务故障影响自身系统
数据库访问 数据库故障时快速失败,避免连接池耗尽
缓存服务 Redis等缓存故障时的降级处理
消息队列 消息队列故障时的降级策略

4.2 熔断与其他模式的配合

通过

熔断打开

客户端请求

API网关

熔断器

负载均衡

降级服务

服务实例1

服务实例2

服务实例3

限流器

业务逻辑

4.2.1 熔断 + 限流
  • 限流:保护系统不被过载请求压垮
  • 熔断:防止下游服务故障引发级联失败
  • 配合使用:先限流,再熔断,双重保护
4.2.2 熔断 + 降级
  • 熔断触发后:执行降级逻辑
  • 降级策略:返回缓存数据、默认值、友好提示等
  • 降级级别:部分降级、完全降级、自动恢复
4.2.3 熔断 + 超时控制
  • 超时时间:设置合理的超时阈值
  • 超时判定:超时视为失败,计入熔断统计
  • 快速失败:避免长时间阻塞等待

最佳实践

5.1 配置原则

原则 说明
阈值合理 根据业务特点设置合理的失败率阈值,避免过于敏感或迟钝
冷却时间 根据服务恢复时间设置,太短可能频繁切换,太长影响恢复
滑动窗口 根据请求量设置窗口大小,保证统计的准确性
分级配置 不同重要程度的服务使用不同的熔断配置
动态调整 支持运行时动态调整配置,无需重启

5.2 监控与告警

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.event.*;

public class CircuitBreakerMonitoring {
    
    public void setupMonitoring(CircuitBreaker circuitBreaker) {
        // 监听熔断器状态变化
        circuitBreaker.getEventPublisher()
            .onStateTransition(event -> {
                System.out.println("熔断器状态变更: " + event);
                // 发送告警通知
                sendAlert("Circuit Breaker State Changed: " + 
                         event.getStateTransition());
            });
        
        // 监听调用失败事件
        circuitBreaker.getEventPublisher()
            .onError(event -> {
                System.out.println("调用失败: " + event);
                // 记录失败日志
                logFailure(event);
            });
        
        // 监听熔断器打开事件
        circuitBreaker.getEventPublisher()
            .onCallNotPermitted(event -> {
                System.out.println("请求被拒绝: " + event);
                // 记录拒绝日志
                logRejection(event);
            });
        
        // 监听成功事件
        circuitBreaker.getEventPublisher()
            .onSuccess(event -> {
                System.out.println("调用成功: " + event);
            });
    }
    
    private void sendAlert(String message) {
        // 实现告警发送逻辑
    }
    
    private void logFailure(CircuitBreakerOnErrorEvent event) {
        // 实现失败日志记录
    }
    
    private void logRejection(CircuitBreakerOnCallNotPermittedEvent event) {
        // 实现拒绝日志记录
    }
}

5.3 关键监控指标

指标 说明
熔断器状态 当前状态(Closed/Open/Half-Open)
失败率 滑动窗口内的请求失败比例
请求拒绝数 熔断打开时拒绝的请求数量
状态转换次数 熔断器状态转换的频率
平均响应时间 成功请求的平均响应时间
调用成功率 总体调用成功率

5.4 降级策略设计

public class FallbackStrategy {
    
    // 策略1:返回缓存数据
    public String getCachedResponse(String key) {
        return cacheService.get(key, () -> "Default Cached Response");
    }
    
    // 策略2:返回默认值
    public String getDefaultValue() {
        return "Service Temporarily Unavailable";
    }
    
    // 策略3:调用备用服务
    public String callBackupService(String request) {
        try {
            return backupService.call(request);
        } catch (Exception e) {
            return getDefaultValue();
        }
    }
    
    // 策略4:异步重试
    public CompletableFuture<String> asyncRetry(String request) {
        return CompletableFuture.supplyAsync(() -> {
            // 异步重试逻辑
            return retryService.call(request);
        });
    }
    
    // 策略5:部分降级
    public PartialResponse getPartialResponse(String request) {
        PartialResponse response = new PartialResponse();
        response.setMainData(getCachedResponse("main"));
        // 非核心数据返回null或默认值
        response.setSecondaryData(null);
        return response;
    }
}

5.5 注意事项

  1. 避免熔断器滥用

    • 不是所有服务都需要熔断器
    • 对于关键路径上的服务优先配置
    • 避免过度设计增加系统复杂度
  2. 合理设置超时时间

    • 超时时间应小于熔断器的冷却时间
    • 根据P95或P99响应时间设置
    • 考虑网络延迟和服务处理时间
  3. 测试熔断器行为

    • 进行故障注入测试
    • 验证熔断和恢复逻辑
    • 测试降级策略的有效性
  4. 日志和追踪

    • 记录熔断器状态变化
    • 关联分布式追踪ID
    • 便于问题排查和分析
  5. 优雅降级

    • 降级时应保持核心功能可用
    • 提供友好的用户提示
    • 避免降级后系统不可用

代码示例:完整实现

6.1 Spring Boot 集成示例

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

@Service
public class UserService {
    
    private final CircuitBreaker circuitBreaker;
    private final TimeLimiter timeLimiter;
    private final UserApiClient userApiClient;
    
    public UserService(UserApiClient userApiClient) {
        this.userApiClient = userApiClient;
        
        // 配置熔断器
        CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .slidingWindowSize(100)
            .minimumNumberOfCalls(10)
            .permittedNumberOfCallsInHalfOpenState(5)
            .slowCallRateThreshold(50)
            .slowCallDurationThreshold(Duration.ofSeconds(2))
            .build();
        
        // 配置超时限制器
        TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom()
            .timeoutDuration(Duration.ofSeconds(3))
            .build();
        
        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(circuitBreakerConfig);
        this.circuitBreaker = registry.circuitBreaker("userService");
        this.timeLimiter = TimeLimiter.of(timeLimiterConfig);
        
        // 设置监控
        setupMonitoring();
    }
    
    public User getUserById(String userId) {
        Supplier<CompletableFuture<User>> futureSupplier = () -> 
            CompletableFuture.supplyAsync(() -> userApiClient.getUserById(userId));
        
        Supplier<CompletableFuture<User>> decoratedSupplier = 
            TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
        
        decoratedSupplier = CircuitBreaker.decorateCompletionStage(
            circuitBreaker, decoratedSupplier);
        
        try {
            return decoratedSupplier.get().join();
        } catch (Exception e) {
            if (e.getCause() instanceof TimeoutException) {
                return getFallbackUser(userId);
            }
            return getFallbackUser(userId);
        }
    }
    
    private User getFallbackUser(String userId) {
        // 从缓存获取或返回默认用户
        User fallbackUser = new User();
        fallbackUser.setId(userId);
        fallbackUser.setName("Unknown User (Service Unavailable)");
        return fallbackUser;
    }
    
    private void setupMonitoring() {
        circuitBreaker.getEventPublisher()
            .onStateTransition(event -> {
                System.out.println("熔断器状态变更: " + 
                    event.getStateTransition());
                // 发送告警
                alertService.sendAlert(
                    "Circuit Breaker " + circuitBreaker.getName() + 
                    " is now " + event.getStateTransition().getToState());
            })
            .onCallNotPermitted(event -> {
                System.out.println("请求被熔断器拒绝");
                metricsService.increment("circuit.breaker.rejected");
            });
    }
}

// 用户API客户端
@Service
class UserApiClient {
    public User getUserById(String userId) {
        // 实际的API调用
        return restTemplate.getForObject(
            "http://user-service/api/users/" + userId, 
            User.class);
    }
}

// 用户实体
class User {
    private String id;
    private String name;
    // getters and setters
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
}

6.2 Go 语言实现示例

package circuitbreaker

import (
	"errors"
	"sync"
	"time"
)

// State represents the state of the circuit breaker
type State int

const (
	StateClosed State = iota
	StateOpen
	StateHalfOpen
)

// Config holds the configuration for the circuit breaker
type Config struct {
	MaxFailures        int           // 最大失败次数
	WaitDuration       time.Duration // 打开状态等待时间
	HalfOpenMaxCalls   int           // 半开状态最大调用次数
	Timeout            time.Duration // 超时时间
}

// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
	mu                sync.RWMutex
	state             State
	failures          int
	successCount      int
	halfOpenCallCount int
	lastFailureTime   time.Time
	config            Config
}

// New creates a new circuit breaker
func New(config Config) *CircuitBreaker {
	return &CircuitBreaker{
		state:  StateClosed,
		config: config,
	}
}

// Execute runs the given function with circuit breaker protection
func (cb *CircuitBreaker) Execute(fn func() error) error {
	if !cb.allowRequest() {
		return errors.New("circuit breaker is open")
	}

	err := fn()
	cb.recordResult(err)
	return err
}

// allowRequest determines if a request should be allowed
func (cb *CircuitBreaker) allowRequest() bool {
	cb.mu.Lock()
	defer cb.mu.Unlock()

	switch cb.state {
	case StateClosed:
		return true
	case StateOpen:
		if time.Since(cb.lastFailureTime) >= cb.config.WaitDuration {
			cb.state = StateHalfOpen
			cb.halfOpenCallCount = 0
			cb.successCount = 0
			return true
		}
		return false
	case StateHalfOpen:
		return cb.halfOpenCallCount < cb.config.HalfOpenMaxCalls
	}
	return false
}

// recordResult records the result of a request
func (cb *CircuitBreaker) recordResult(err error) {
	cb.mu.Lock()
	defer cb.mu.Unlock()

	if err == nil {
		cb.recordSuccess()
	} else {
		cb.recordFailure()
	}
}

// recordSuccess records a successful request
func (cb *CircuitBreaker) recordSuccess() {
	switch cb.state {
	case StateClosed:
		cb.failures = 0
	case StateHalfOpen:
		cb.successCount++
		cb.halfOpenCallCount++
		if cb.successCount >= cb.config.HalfOpenMaxCalls {
			cb.reset()
			cb.state = StateClosed
		}
	}
}

// recordFailure records a failed request
func (cb *CircuitBreaker) recordFailure() {
	cb.failures++
	cb.lastFailureTime = time.Now()

	switch cb.state {
	case StateHalfOpen:
		cb.state = StateOpen
		cb.halfOpenCallCount = 0
	case StateClosed:
		if cb.failures >= cb.config.MaxFailures {
			cb.state = StateOpen
		}
	}
}

// reset resets the circuit breaker state
func (cb *CircuitBreaker) reset() {
	cb.failures = 0
	cb.successCount = 0
	cb.halfOpenCallCount = 0
}

// GetState returns the current state of the circuit breaker
func (cb *CircuitBreaker) GetState() State {
	cb.mu.RLock()
	defer cb.mu.RUnlock()
	return cb.state
}

// 使用示例
func ExampleUsage() {
	config := Config{
		MaxFailures:      5,
		WaitDuration:     30 * time.Second,
		HalfOpenMaxCalls: 3,
		Timeout:          3 * time.Second,
	}
	
	cb := New(config)
	
	// 执行受保护的调用
	err := cb.Execute(func() error {
		// 调用远程服务
		return callRemoteService()
	})
	
	if err != nil {
		// 处理错误,执行降级逻辑
		return handleFallback()
	}
}

func callRemoteService() error {
	// 实际的远程服务调用
	return nil
}

func handleFallback() error {
	// 降级逻辑
	return nil
}

总结

熔断法则是分布式系统架构中至关重要的容错机制,它通过快速失败和自动恢复,有效防止了级联故障的发生。正确实施熔断法则需要注意以下几点:

  1. 合理配置:根据业务特点和系统负载,设置合适的阈值和参数
  2. 完善监控:建立全面的监控体系,及时发现和响应熔断事件
  3. 优雅降级:设计合理的降级策略,保证核心功能可用
  4. 充分测试:进行故障注入测试,验证熔断器的有效性
  5. 持续优化:根据运行数据持续优化熔断器配置

熔断法则与限流、降级、超时控制等模式配合使用,能够构建出高可用、高可靠的分布式系统架构。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐