构建坚不可摧的AI系统:全链路故障注入测试实战指南

引言:为什么AI系统更需要容错测试?

在当今数字化时代,AI系统已从实验室走向生产环境,承担着推荐算法、风险控制、智能客服等关键业务功能。与传统软件系统不同,AI系统面临双重复杂性:既有传统分布式系统的架构复杂性,又增加了机器学习特有的不确定性。一次模型推理失败、一次数据管道异常,都可能导致连锁反应,影响用户体验甚至造成业务损失。

本文将通过实战化视角,深入探讨AI系统的容错测试方法论,结合具体代码示例和架构图解,帮助您构建真正可靠的智能系统。

1. 容错测试核心目标:定义AI系统的韧性标准

1.1 三大核心验证维度

容错测试核心目标

可用性验证

一致性保障

恢复能力测试

服务降级能力

优雅功能退化

用户体验维持

数据完整性

状态一致性

事务原子性

自动故障检测

快速服务恢复

数据重建能力

硬件故障场景

软件异常场景

数据问题场景

可用性是AI系统的生命线。以推荐系统为例,99.9%的可用性意味着每年只能有约8.76小时的服务中断。但单纯追求高可用数字是不够的,我们需要区分:

  • 完全可用:所有功能正常,推荐准确率100%
  • 降级可用:核心推荐功能正常,但个性化程度降低
  • 基本可用:返回通用推荐结果,保证页面不报错

一致性在AI系统中尤为重要。考虑这样一个场景:用户连续请求同一个推荐服务,第一次返回了商品A、B、C,第二次由于缓存故障,返回了商品D、E、F。这种不一致会严重影响用户体验和业务转化。

恢复能力包含三个关键指标:

  1. MTTD(平均故障检测时间):系统多快能发现故障
  2. MTTR(平均修复时间):系统多快能自动恢复
  3. 恢复完整性:恢复后数据是否完整、状态是否一致

1.2 AI系统的特殊容错挑战

与传统的Web服务不同,AI系统面临独特的挑战:

  1. 模型推理的不确定性:相同的输入可能产生不同的输出(特别是在概率性模型中)
  2. 数据依赖的复杂性:特征工程、数据预处理的多层依赖
  3. 资源消耗的突发性:GPU显存溢出、推理时间波动
  4. 第三方服务耦合:模型服务、特征存储、向量数据库等多组件依赖

2. 硬件故障测试:从基础资源到基础设施

2.1 GPU/CPU故障模拟与应对

GPU故障是AI系统特有的痛点。以下Python代码展示了如何模拟和应对GPU显存不足:

import torch
import numpy as np
from contextlib import contextmanager
import threading
import time

class GPUStressTest:
    """GPU压力测试与容错验证"""
    
    def __init__(self, reserve_mb=200):
        """
        初始化GPU测试器
        :param reserve_mb: 预留显存大小(MB),用于系统运行
        """
        self.reserve_bytes = reserve_mb * 1024 * 1024
        
    @contextmanager
    def simulate_gpu_oom(self):
        """
        模拟GPU显存不足的上下文管理器
        使用场景:with test.simulate_gpu_oom():
                     model.predict(data)
        """
        if torch.cuda.is_available():
            # 获取GPU总显存和已使用显存
            total_memory = torch.cuda.get_device_properties(0).total_memory
            used_memory = torch.cuda.memory_allocated(0)
            
            # 计算可用显存
            available_memory = total_memory - used_memory - self.reserve_bytes
            
            if available_memory > 0:
                # 故意分配大量显存,制造OOM场景
                try:
                    # 创建大张量占用显存
                    block_size = min(available_memory, 100 * 1024 * 1024)  # 每次分配100MB
                    blocks = []
                    allocated = 0
                    
                    while allocated < available_memory:
                        try:
                            block = torch.empty(
                                block_size // 4,  # float32占4字节
                                dtype=torch.float32,
                                device='cuda'
                            )
                            blocks.append(block)
                            allocated += block_size
                        except RuntimeError as e:
                            if "out of memory" in str(e):
                                print(f"成功模拟OOM,已分配{allocated/(1024*1024):.1f}MB")
                                break
                    yield
                finally:
                    # 清理分配的显存
                    del blocks
                    torch.cuda.empty_cache()
            else:
                yield
        else:
            print("未检测到GPU,跳过OOM测试")
            yield
    
    def test_model_oom_resilience(self, model, input_data):
        """
        测试模型在OOM情况下的降级策略
        """
        print("开始OOM容错测试...")
        
        try:
            with self.simulate_gpu_oom():
                # 尝试在OOM环境下推理
                result = model.predict(input_data)
                print("警告:模型在OOM情况下仍成功推理,可能需要调整测试参数")
                return result
        except RuntimeError as e:
            if "out of memory" in str(e):
                print("检测到OOM异常,触发降级策略")
                # 触发降级策略
                return self._fallback_prediction(input_data)
            else:
                raise e
    
    def _fallback_prediction(self, input_data):
        """
        OOM降级策略:使用轻量级模型或返回默认结果
        """
        print("执行降级预测策略")
        # 1. 尝试使用CPU推理
        # 2. 返回缓存的通用结果
        # 3. 使用简化模型
        return {"prediction": "default", "reason": "gpu_oom_fallback"}

class CPUStressTest:
    """CPU资源耗尽测试"""
    
    def simulate_cpu_stress(self, duration=30, threads_per_core=2):
        """
        模拟CPU使用率100%场景
        """
        import multiprocessing
        import math
        
        def cpu_burn():
            """CPU密集型计算"""
            while True:
                # 计算质数,消耗CPU
                for i in range(2, 10000):
                    for j in range(2, int(math.sqrt(i)) + 1):
                        if i % j == 0:
                            break
        
        # 创建多个进程消耗CPU
        num_cores = multiprocessing.cpu_count()
        processes = []
        
        print(f"开始CPU压力测试,使用{num_cores * threads_per_core}个线程")
        
        for _ in range(num_cores * threads_per_core):
            p = multiprocessing.Process(target=cpu_burn)
            p.start()
            processes.append(p)
        
        # 等待指定时间
        time.sleep(duration)
        
        # 清理进程
        for p in processes:
            p.terminate()
            p.join()
        
        print("CPU压力测试结束")

# 使用示例
def test_hardware_resilience():
    """硬件容错测试入口"""
    
    # 1. GPU OOM测试
    gpu_tester = GPUStressTest(reserve_mb=500)
    
    # 模拟AI模型
    class AIModel:
        def predict(self, data):
            # 模拟模型推理
            if torch.cuda.is_available():
                # 尝试分配显存
                tensor = torch.randn(10000, 10000, device='cuda')
                return tensor.mean().item()
            return 0.5
    
    model = AIModel()
    gpu_tester.test_model_oom_resilience(model, "test_data")
    
    # 2. CPU压力测试
    cpu_tester = CPUStressTest()
    # 谨慎使用:cpu_tester.simulate_cpu_stress(duration=10)

关键发现:在GPU OOM测试中,我们发现多数深度学习框架(PyTorch、TensorFlow)的默认行为是直接抛出异常,这可能导致整个服务崩溃。合理的容错策略应包括:

  1. 分层降级:GPU → CPU → 简化模型 → 缓存结果
  2. 资源预检:推理前检查可用显存
  3. 批处理调整:动态调整batch size

2.2 存储故障测试实战

存储故障往往是最具破坏性的。以下Java代码展示了磁盘故障的模拟:

import java.io.*;
import java.nio.file.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class StorageFailureTest {
    
    /**
     * 模拟磁盘写满场景
     */
    public static void simulateDiskFull(String targetPath, long reserveMB) 
            throws IOException, InterruptedException {
        
        File targetDir = new File(targetPath);
        if (!targetDir.exists()) {
            targetDir.mkdirs();
        }
        
        // 获取磁盘空间信息
        FileStore store = Files.getFileStore(targetDir.toPath());
        long totalSpace = store.getTotalSpace();
        long usableSpace = store.getUsableSpace();
        long threshold = reserveMB * 1024 * 1024;
        
        System.out.printf("磁盘信息:总量=%.2fGB, 可用=%.2fGB, 保留=%.2fGB%n",
                totalSpace / (1024.0 * 1024 * 1024),
                usableSpace / (1024.0 * 1024 * 1024),
                threshold / (1024.0 * 1024 * 1024));
        
        // 如果可用空间大于保留空间,则填充磁盘
        if (usableSpace > threshold) {
            long bytesToFill = usableSpace - threshold;
            System.out.printf("将填充 %.2fGB 数据%n", 
                    bytesToFill / (1024.0 * 1024 * 1024));
            
            AtomicBoolean stopFlag = new AtomicBoolean(false);
            
            // 监控线程:当可用空间接近阈值时停止
            Thread monitorThread = new Thread(() -> {
                try {
                    while (!stopFlag.get()) {
                        long currentUsable = Files.getFileStore(
                            targetDir.toPath()).getUsableSpace();
                        if (currentUsable <= threshold * 1.1) { // 10%缓冲
                            stopFlag.set(true);
                            System.out.println("磁盘空间已接近阈值,停止填充");
                            break;
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            
            monitorThread.start();
            
            // 填充线程
            Thread fillThread = new Thread(() -> {
                long filled = 0;
                int fileIndex = 0;
                
                while (!stopFlag.get() && filled < bytesToFill) {
                    String fileName = targetPath + "/fill_" + fileIndex + ".dat";
                    File file = new File(fileName);
                    
                    try (FileOutputStream fos = new FileOutputStream(file)) {
                        // 每次写入10MB
                        byte[] buffer = new byte[10 * 1024 * 1024];
                        fos.write(buffer);
                        filled += buffer.length;
                        fileIndex++;
                        
                        if (fileIndex % 10 == 0) {
                            System.out.printf("已填充 %.2fGB/% .2fGB%n",
                                    filled / (1024.0 * 1024 * 1024),
                                    bytesToFill / (1024.0 * 1024 * 1024));
                        }
                    } catch (IOException e) {
                        if (e.getMessage().contains("No space left")) {
                            System.out.println("磁盘已满,无法继续写入");
                            stopFlag.set(true);
                            break;
                        }
                    }
                }
            });
            
            fillThread.start();
            fillThread.join();
            monitorThread.join();
            
            System.out.println("磁盘填充完成,开始测试系统行为...");
        } else {
            System.out.println("磁盘空间已不足,直接进行测试");
        }
        
        // 测试系统在磁盘满的情况下的行为
        testSystemBehaviorUnderDiskFull(targetPath);
    }
    
    /**
     * 测试磁盘满时系统的容错行为
     */
    private static void testSystemBehaviorUnderDiskFull(String testPath) {
        System.out.println("\n=== 磁盘满容错测试 ===");
        
        // 测试1: 日志写入
        testLogging(testPath);
        
        // 测试2: 模型保存
        testModelPersistence(testPath);
        
        // 测试3: 临时文件创建
        testTempFileCreation(testPath);
    }
    
    private static void testLogging(String path) {
        System.out.println("测试日志系统...");
        try {
            File logFile = new File(path, "system.log");
            FileWriter writer = new FileWriter(logFile, true);
            writer.write("测试日志条目\n");
            writer.flush();
            writer.close();
            System.out.println("✓ 日志写入成功");
        } catch (IOException e) {
            System.out.println("✗ 日志写入失败: " + e.getMessage());
            
            // 验证降级策略:是否回退到系统日志或控制台
            System.err.println("降级日志: 测试条目 - " + new java.util.Date());
        }
    }
    
    private static void testModelPersistence(String path) {
        System.out.println("测试模型持久化...");
        // 模拟模型保存逻辑
        File modelFile = new File(path, "latest_model.pt");
        
        try {
            // 尝试保存模型
            byte[] mockModelData = new byte[1024 * 1024]; // 1MB模拟数据
            Files.write(modelFile.toPath(), mockModelData, 
                       StandardOpenOption.CREATE,
                       StandardOpenOption.TRUNCATE_EXISTING);
            System.out.println("✓ 模型保存成功");
        } catch (IOException e) {
            System.out.println("✗ 模型保存失败: " + e.getMessage());
            
            // 验证容错策略
            if (e.getMessage().contains("No space")) {
                System.out.println("触发模型保存降级策略:");
                System.out.println("1. 尝试清理旧模型缓存");
                System.out.println("2. 压缩模型保存");
                System.out.println("3. 保存到备用位置");
            }
        }
    }
    
    /**
     * PVC挂载失败测试
     */
    public static void testPVCFailure() {
        System.out.println("\n=== PVC挂载失败测试 ===");
        
        // 模拟K8s PVC挂载失败场景
        String[] mountPoints = {"/data/model", "/data/logs", "/data/cache"};
        
        for (String mountPoint : mountPoints) {
            File mountDir = new File(mountPoint);
            
            if (!mountDir.exists()) {
                System.out.printf("PVC挂载点 %s 不存在%n", mountPoint);
                
                // 测试系统行为
                if (mountPoint.contains("model")) {
                    System.out.println("模型存储不可用,触发降级:");
                    System.out.println("1. 使用内置默认模型");
                    System.out.println("2. 从备用URL下载模型");
                } else if (mountPoint.contains("logs")) {
                    System.out.println("日志存储不可用:");
                    System.out.println("1. 写入临时目录");
                    System.out.println("2. 增加日志缓冲");
                }
            } else {
                // 测试写权限
                File testFile = new File(mountDir, "test_write.tmp");
                try {
                    testFile.createNewFile();
                    testFile.delete();
                    System.out.printf("✓ PVC挂载点 %s 正常%n", mountPoint);
                } catch (IOException e) {
                    System.out.printf("✗ PVC挂载点 %s 写入失败: %s%n", 
                            mountPoint, e.getMessage());
                }
            }
        }
    }
    
    // 清理方法
    public static void cleanup(String path) throws IOException {
        File dir = new File(path);
        if (dir.exists() && dir.isDirectory()) {
            Files.walk(dir.toPath())
                 .sorted(java.util.Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(File::delete);
            System.out.println("测试数据清理完成");
        }
    }
}

2.3 网络故障模拟:Chaos Mesh实战

网络分区、延迟、丢包是微服务架构的常见问题。Chaos Mesh提供了强大的网络故障注入能力:

# network-chaos.yaml
apiVersion: chaos-mesh.org/v1alpha1
kind: NetworkChaos
metadata:
  name: ai-system-network-test
  namespace: ai-production
spec:
  action: partition  # 网络分区
  mode: all
  selector:
    namespaces:
      - ai-production
    labelSelectors:
      "app": "recommendation-service"
  direction: both
  loss:
    loss: "100%"  # 100%丢包,模拟网络断开
    correlation: "100"
  duration: "5m"  # 持续5分钟

Python集成示例:

import time
import requests
from chaosmesh.experiments.network import NetworkChaos
from chaosmesh.k8s.chaos import ChaosExperiment

class AINetworkResilienceTest:
    """AI系统网络容错测试"""
    
    def __init__(self, k8s_config=None):
        self.chaos_experiment = ChaosExperiment(k8s_config)
        
    def test_network_partition(self, service_name, duration_min=5):
        """
        测试网络分区下的服务容错
        """
        print(f"开始网络分区测试,目标服务: {service_name}")
        
        # 1. 创建网络分区
        network_chaos = NetworkChaos(
            name=f"network-partition-{service_name}",
            namespace="ai-production",
            action="partition",
            selector={
                "labelSelectors": {
                    "app": service_name
                }
            },
            direction="both",
            duration=f"{duration_min}m"
        )
        
        # 2. 应用网络故障
        experiment_id = self.chaos_experiment.create(network_chaos)
        print(f"网络分区已创建,实验ID: {experiment_id}")
        
        # 3. 监控系统行为
        start_time = time.time()
        failures = 0
        successes = 0
        
        while time.time() - start_time < duration_min * 60:
            try:
                # 尝试调用服务
                response = requests.get(
                    f"http://{service_name}.ai-production.svc.cluster.local:8080/health",
                    timeout=5
                )
                
                if response.status_code == 200:
                    successes += 1
                    print(f"服务健康检查成功: {response.json()}")
                else:
                    failures += 1
                    print(f"服务返回异常: {response.status_code}")
                    
            except requests.exceptions.ConnectionError:
                failures += 1
                print("连接失败 - 网络分区生效")
            except requests.exceptions.Timeout:
                failures += 1
                print("请求超时 - 符合预期")
            
            time.sleep(10)  # 每10秒检查一次
        
        # 4. 清理故障
        self.chaos_experiment.delete(experiment_id)
        
        # 5. 分析结果
        total_attempts = successes + failures
        success_rate = (successes / total_attempts) * 100 if total_attempts > 0 else 0
        
        print(f"\n网络分区测试结果:")
        print(f"总尝试次数: {total_attempts}")
        print(f"成功次数: {successes}")
        print(f"失败次数: {failures}")
        print(f"成功率: {success_rate:.2f}%")
        
        # 6. 验证恢复能力
        print("验证网络恢复...")
        time.sleep(30)  # 等待网络恢复
        
        recovery_success = False
        for i in range(10):  # 重试10次
            try:
                response = requests.get(
                    f"http://{service_name}.ai-production.svc.cluster.local:8080/health",
                    timeout=5
                )
                if response.status_code == 200:
                    recovery_success = True
                    print("✓ 服务已成功恢复")
                    break
            except Exception:
                print(f"恢复检查 {i+1}/10 失败")
                time.sleep(3)
        
        return {
            "experiment_id": experiment_id,
            "success_rate": success_rate,
            "recovered": recovery_success,
            "total_attempts": total_attempts
        }
    
    def test_network_latency(self, service_name, latency_ms=1000):
        """
        测试网络延迟下的服务行为
        """
        print(f"模拟网络延迟: {latency_ms}ms")
        
        # 创建延迟规则
        latency_chaos = NetworkChaos(
            name=f"network-latency-{service_name}",
            namespace="ai-production",
            action="delay",
            selector={
                "labelSelectors": {
                    "app": service_name
                }
            },
            delay={
                "latency": f"{latency_ms}ms",
                "correlation": "100",
                "jitter": "0ms"
            },
            direction="to",  # 仅对入站流量添加延迟
            duration="10m"
        )
        
        experiment_id = self.chaos_experiment.create(latency_chaos)
        
        # 测试服务超时策略
        import concurrent.futures
        
        def call_with_timeout(url, timeout):
            try:
                response = requests.get(url, timeout=timeout)
                return {"success": True, "status": response.status_code}
            except requests.exceptions.Timeout:
                return {"success": False, "reason": "timeout"}
            except Exception as e:
                return {"success": False, "reason": str(e)}
        
        # 测试不同超时设置
        test_cases = [
            {"name": "超时时间过短", "timeout": 0.5},  # 500ms < 1000ms延迟
            {"name": "超时时间适中", "timeout": 2},    # 2000ms > 1000ms延迟
            {"name": "超时时间合理", "timeout": 3},    # 3000ms,考虑抖动
        ]
        
        results = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_case = {
                executor.submit(
                    call_with_timeout,
                    f"http://{service_name}.ai-production.svc.cluster.local:8080/predict",
                    case["timeout"]
                ): case for case in test_cases
            }
            
            for future in concurrent.futures.as_completed(future_to_case):
                case = future_to_case[future]
                result = future.result()
                results.append({
                    "case": case["name"],
                    "timeout": case["timeout"],
                    **result
                })
        
        # 清理
        self.chaos_experiment.delete(experiment_id)
        
        # 输出建议
        print("\n网络延迟测试建议:")
        for result in results:
            if result["success"]:
                print(f"✓ {result['case']}: 超时{result['timeout']}s配置合理")
            else:
                print(f"✗ {result['case']}: 超时{result['timeout']}s不足,建议调整")
        
        return results
Chaos Mesh Database Service Instance 2 Service Instance 1 Load Balancer Client Chaos Mesh Database Service Instance 2 Service Instance 1 Load Balancer Client 正常流量 开始网络故障注入 故障场景测试 Request delayed by 1000ms 50% packet loss Request timeout alt [数据包成功到达] [数据包丢失] par [重试机制测试] 恢复阶段 HTTP Request Forward Request Query Data Return Data HTTP Response HTTP Response Inject Network Latency(1000ms) Inject Packet Loss(50%) HTTP Request Forward Request Query Data Return Data HTTP Response (delayed) Retry Request (after timeout) Query Data Return Data HTTP Response Fallback Response Remove Network Latency Remove Packet Loss HTTP Request Forward Request Query Data Return Data HTTP Response (normal) HTTP Response

3. 软件故障测试:AI系统的特殊挑战

3.1 依赖服务故障模拟

AI系统通常依赖多个外部服务:特征存储、模型仓库、向量数据库等。以下Python代码展示了如何系统性地测试依赖故障:

from unittest.mock import Mock, patch, MagicMock
import pytest
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

class ServiceStatus(Enum):
    """服务状态枚举"""
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNAVAILABLE = "unavailable"
    TIMEOUT = "timeout"

@dataclass
class DependencyConfig:
    """依赖服务配置"""
    name: str
    timeout_seconds: float
    retry_attempts: int
    retry_delay: float
    fallback_enabled: bool
    circuit_breaker_enabled: bool

class DependencyServiceMock:
    """依赖服务模拟器"""
    
    def __init__(self):
        self.services = {}
        self.failure_scenarios = {}
        
    def register_service(self, name: str, config: DependencyConfig):
        """注册依赖服务"""
        self.services[name] = {
            'config': config,
            'status': ServiceStatus.HEALTHY,
            'failure_schedule': [],
            'call_count': 0,
            'failure_count': 0
        }
    
    def set_failure_scenario(self, service_name: str, scenario: Dict[str, Any]):
        """
        设置故障场景
        scenario格式:
        {
            'start_time': time.time(),
            'duration': 60,
            'failure_type': 'timeout' | 'error' | 'slow',
            'failure_rate': 0.5,  # 故障概率
            'error_code': 500,
            'timeout_seconds': 10
        }
        """
        self.failure_scenarios[service_name] = scenario
    
    def call_service(self, service_name: str, request_data: Dict) -> Dict:
        """
        模拟调用依赖服务,根据故障场景返回相应结果
        """
        if service_name not in self.services:
            raise ValueError(f"服务未注册: {service_name}")
        
        service = self.services[service_name]
        service['call_count'] += 1
        
        # 检查是否在故障场景中
        current_time = time.time()
        scenario = self.failure_scenarios.get(service_name)
        
        if scenario and scenario['start_time'] <= current_time <= scenario['start_time'] + scenario['duration']:
            # 在故障时间窗口内
            import random
            if random.random() < scenario['failure_rate']:
                service['failure_count'] += 1
                service['status'] = ServiceStatus.UNAVAILABLE
                
                failure_type = scenario['failure_type']
                
                if failure_type == 'timeout':
                    time.sleep(scenario['timeout_seconds'])
                    raise TimeoutError(f"服务 {service_name} 响应超时")
                elif failure_type == 'error':
                    return {
                        'success': False,
                        'error_code': scenario['error_code'],
                        'message': f"服务 {service_name} 内部错误"
                    }
                elif failure_type == 'slow':
                    # 慢响应,但不超时
                    delay = scenario.get('slow_delay', 5)
                    time.sleep(delay)
                    service['status'] = ServiceStatus.DEGRADED
        
        # 正常响应
        service['status'] = ServiceStatus.HEALTHY
        return {
            'success': True,
            'data': {
                'processed': True,
                'request_id': f"req_{service['call_count']}",
                'timestamp': current_time
            }
        }

class AIServiceWithDependencies:
    """带有依赖服务的AI服务示例"""
    
    def __init__(self):
        self.dependencies = DependencyServiceMock()
        
        # 注册依赖服务
        self.dependencies.register_service(
            "feature_store",
            DependencyConfig(
                name="feature_store",
                timeout_seconds=2.0,
                retry_attempts=3,
                retry_delay=0.5,
                fallback_enabled=True,
                circuit_breaker_enabled=True
            )
        )
        
        self.dependencies.register_service(
            "model_registry",
            DependencyConfig(
                name="model_registry",
                timeout_seconds=5.0,
                retry_attempts=2,
                retry_delay=1.0,
                fallback_enabled=False,  # 模型服务通常无法降级
                circuit_breaker_enabled=True
            )
        )
        
        self.circuit_breaker_state = {}
        self.cache = {}
        
    def get_features_with_resilience(self, user_id: str) -> Dict:
        """
        具有容错能力的特征获取
        """
        cache_key = f"features_{user_id}"
        
        # 1. 检查缓存
        if cache_key in self.cache:
            cached_data = self.cache[cache_key]
            if time.time() - cached_data['timestamp'] < 300:  # 5分钟缓存
                print(f"使用缓存特征 for user {user_id}")
                return cached_data['data']
        
        # 2. 检查熔断器
        if self.circuit_breaker_state.get('feature_store') == 'open':
            print("特征服务熔断器开启,使用降级特征")
            return self._get_fallback_features(user_id)
        
        # 3. 调用服务(带重试)
        max_retries = self.dependencies.services['feature_store']['config'].retry_attempts
        retry_delay = self.dependencies.services['feature_store']['config'].retry_delay
        
        last_exception = None
        for attempt in range(max_retries + 1):  # +1 for initial attempt
            try:
                if attempt > 0:
                    print(f"特征服务重试 {attempt}/{max_retries}")
                    time.sleep(retry_delay * (2 ** (attempt - 1)))  # 指数退避
                
                features = self.dependencies.call_service(
                    "feature_store",
                    {"user_id": user_id, "action": "get_features"}
                )
                
                # 成功,更新缓存和熔断器状态
                self.circuit_breaker_state['feature_store'] = 'closed'
                
                # 缓存结果
                self.cache[cache_key] = {
                    'data': features,
                    'timestamp': time.time()
                }
                
                return features
                
            except TimeoutError as e:
                last_exception = e
                print(f"特征服务超时: {e}")
                
            except Exception as e:
                last_exception = e
                print(f"特征服务错误: {e}")
                break  # 非超时错误不重试
        
        # 所有重试失败
        print("特征服务所有重试失败")
        
        # 更新熔断器状态
        failure_count = self.circuit_breaker_state.get('feature_store_failures', 0) + 1
        self.circuit_breaker_state['feature_store_failures'] = failure_count
        
        if failure_count >= 5:  # 连续5次失败,开启熔断
            self.circuit_breaker_state['feature_store'] = 'open'
            print("特征服务熔断器开启")
        
        # 返回降级结果
        return self._get_fallback_features(user_id)
    
    def _get_fallback_features(self, user_id: str) -> Dict:
        """降级特征获取策略"""
        # 1. 返回通用特征
        # 2. 使用历史特征
        # 3. 返回空特征但标记为降级
        return {
            'success': True,
            'data': {
                'user_id': user_id,
                'features': [0.1, 0.2, 0.3],  # 通用特征
                'is_fallback': True,
                'reason': 'dependency_failure'
            },
            'metadata': {
                'source': 'fallback',
                'timestamp': time.time()
            }
        }

# 测试用例
def test_dependency_resilience():
    """依赖服务容错测试"""
    
    print("=== 依赖服务容错测试开始 ===")
    
    # 创建AI服务
    ai_service = AIServiceWithDependencies()
    
    # 测试场景1: 正常情况
    print("\n场景1: 正常请求")
    result = ai_service.get_features_with_resilience("user_123")
    print(f"结果: {result['success']}")
    
    # 测试场景2: 设置特征服务超时故障
    print("\n场景2: 特征服务超时故障")
    ai_service.dependencies.set_failure_scenario("feature_store", {
        'start_time': time.time(),
        'duration': 30,
        'failure_type': 'timeout',
        'failure_rate': 1.0,  # 100%故障率
        'timeout_seconds': 3  # 3秒超时
    })
    
    start_time = time.time()
    result = ai_service.get_features_with_resilience("user_456")
    elapsed = time.time() - start_time
    
    print(f"结果: {result['success']}, 耗时: {elapsed:.2f}s")
    print(f"是否降级: {result['data'].get('is_fallback', False)}")
    
    # 测试场景3: 验证熔断器
    print("\n场景3: 验证熔断器生效")
    # 连续多次失败应该触发熔断
    for i in range(6):
        result = ai_service.get_features_with_resilience(f"user_{i}")
        print(f"请求 {i+1}: 熔断状态={ai_service.circuit_breaker_state.get('feature_store')}")
    
    # 测试场景4: 恢复测试
    print("\n场景4: 服务恢复测试")
    # 清除故障场景
    ai_service.dependencies.failure_scenarios.clear()
    
    # 熔断器应在一段时间后尝试恢复(半开状态)
    time.sleep(30)  # 模拟恢复等待时间
    
    result = ai_service.get_features_with_resilience("user_recovery")
    print(f"恢复后结果: {result['success']}")
    print(f"熔断器状态: {ai_service.circuit_breaker_state.get('feature_store')}")

if __name__ == "__main__":
    test_dependency_resilience()

3.2 模型服务故障测试

模型服务是AI系统的核心,其故障测试需要特殊关注:

import java.util.concurrent.*;
import java.util.function.Supplier;

public class ModelServiceResilienceTest {
    
    /**
     * 模型加载失败测试
     */
    public static class ModelLoader {
        private volatile boolean modelLoaded = false;
        private volatile String currentModelVersion = null;
        private final ScheduledExecutorService healthCheckScheduler;
        private final Object loadLock = new Object();
        
        public ModelLoader() {
            this.healthCheckScheduler = Executors.newScheduledThreadPool(1);
            startHealthChecks();
        }
        
        /**
         * 模拟模型加载(可能失败)
         */
        public boolean loadModel(String modelPath, String version) {
            synchronized (loadLock) {
                System.out.printf("开始加载模型: %s, 版本: %s%n", modelPath, version);
                
                // 模拟随机失败(生产环境可能是文件损坏、格式错误等)
                double failureRate = 0.1; // 10%失败率
                if (Math.random() < failureRate) {
                    System.out.println("模型加载失败: 文件损坏");
                    modelLoaded = false;
                    return false;
                }
                
                // 模拟加载时间
                try {
                    Thread.sleep(2000 + (long)(Math.random() * 1000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                currentModelVersion = version;
                modelLoaded = true;
                System.out.println("模型加载成功");
                return true;
            }
        }
        
        /**
         * 带重试的模型加载
         */
        public boolean loadModelWithRetry(String modelPath, String version, 
                                         int maxAttempts, long retryDelayMs) {
            for (int attempt = 1; attempt <= maxAttempts; attempt++) {
                System.out.printf("模型加载尝试 %d/%d%n", attempt, maxAttempts);
                
                boolean success = loadModel(modelPath, version);
                if (success) {
                    return true;
                }
                
                if (attempt < maxAttempts) {
                    System.out.printf("加载失败,%dms后重试...%n", retryDelayMs);
                    try {
                        Thread.sleep(retryDelayMs);
                        // 指数退避
                        retryDelayMs = Math.min(retryDelayMs * 2, 30000); // 最大30秒
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
            
            System.out.println("所有重试尝试均失败");
            return false;
        }
        
        /**
         * 模型推理(模拟可能失败)
         */
        public CompletableFuture<PredictionResult> predictAsync(double[] input) {
            CompletableFuture<PredictionResult> future = new CompletableFuture<>();
            
            // 检查模型是否加载
            if (!modelLoaded) {
                future.completeExceptionally(
                    new IllegalStateException("模型未加载")
                );
                return future;
            }
            
            // 模拟推理执行(可能超时或异常)
            Executors.newSingleThreadExecutor().submit(() -> {
                try {
                    // 模拟推理时间(随机可能超长)
                    long inferenceTime = 100 + (long)(Math.random() * 5000);
                    
                    // 5%概率模拟超长推理
                    if (Math.random() < 0.05) {
                        inferenceTime = 10000; // 10秒超长
                    }
                    
                    Thread.sleep(inferenceTime);
                    
                    // 2%概率模拟推理异常
                    if (Math.random() < 0.02) {
                        throw new RuntimeException("模型推理异常: 数值溢出");
                    }
                    
                    // 模拟推理结果
                    PredictionResult result = new PredictionResult();
                    result.setSuccess(true);
                    result.setPrediction(new double[]{Math.random()});
                    result.setModelVersion(currentModelVersion);
                    result.setInferenceTimeMs(inferenceTime);
                    
                    future.complete(result);
                    
                } catch (InterruptedException e) {
                    future.completeExceptionally(e);
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    future.completeExceptionally(e);
                }
            });
            
            return future;
        }
        
        /**
         * 带超时和降级的预测
         */
        public PredictionResult predictWithFallback(double[] input, 
                                                   long timeoutMs,
                                                   Supplier<PredictionResult> fallback) {
            CompletableFuture<PredictionResult> future = predictAsync(input);
            
            try {
                // 等待结果,超时则使用降级策略
                return future.get(timeoutMs, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                System.out.printf("预测超时 (%dms),使用降级策略%n", timeoutMs);
                return fallback.get();
            } catch (Exception e) {
                System.out.println("预测异常: " + e.getMessage());
                return fallback.get();
            }
        }
        
        /**
         * 健康检查定时任务
         */
        private void startHealthChecks() {
            healthCheckScheduler.scheduleAtFixedRate(() -> {
                if (modelLoaded) {
                    // 执行快速健康检查
                    try {
                        double[] testInput = new double[10];
                        CompletableFuture<PredictionResult> testFuture = predictAsync(testInput);
                        
                        testFuture.get(2000, TimeUnit.MILLISECONDS);
                        System.out.println("模型健康检查: ✓");
                    } catch (TimeoutException e) {
                        System.out.println("模型健康检查: ✗ (超时)");
                        modelLoaded = false; // 标记为不健康
                    } catch (Exception e) {
                        System.out.println("模型健康检查: ✗ (" + e.getMessage() + ")");
                        modelLoaded = false;
                    }
                }
            }, 30, 30, TimeUnit.SECONDS); // 每30秒检查一次
        }
        
        public void shutdown() {
            healthCheckScheduler.shutdown();
        }
    }
    
    /**
     * 预测结果类
     */
    public static class PredictionResult {
        private boolean success;
        private double[] prediction;
        private String modelVersion;
        private long inferenceTimeMs;
        private boolean isFallback;
        private String fallbackReason;
        
        // getters and setters
        public boolean isSuccess() { return success; }
        public void setSuccess(boolean success) { this.success = success; }
        
        public double[] getPrediction() { return prediction; }
        public void setPrediction(double[] prediction) { this.prediction = prediction; }
        
        public String getModelVersion() { return modelVersion; }
        public void setModelVersion(String modelVersion) { this.modelVersion = modelVersion; }
        
        public long getInferenceTimeMs() { return inferenceTimeMs; }
        public void setInferenceTimeMs(long inferenceTimeMs) { this.inferenceTimeMs = inferenceTimeMs; }
        
        public boolean isFallback() { return isFallback; }
        public void setFallback(boolean fallback) { isFallback = fallback; }
        
        public String getFallbackReason() { return fallbackReason; }
        public void setFallbackReason(String fallbackReason) { this.fallbackReason = fallbackReason; }
    }
    
    /**
     * 降级策略提供者
     */
    public static class FallbackStrategies {
        public static PredictionResult defaultPrediction() {
            PredictionResult result = new PredictionResult();
            result.setSuccess(true);
            result.setPrediction(new double[]{0.5}); // 默认预测值
            result.setFallback(true);
            result.setFallbackReason("model_service_failure");
            return result;
        }
        
        public static PredictionResult cachedPrediction(double[] cachedResult) {
            PredictionResult result = new PredictionResult();
            result.setSuccess(true);
            result.setPrediction(cachedResult);
            result.setFallback(true);
            result.setFallbackReason("using_cached_result");
            return result;
        }
        
        public static PredictionResult simplifiedModelPrediction(double[] input) {
            // 使用简化模型计算
            double sum = 0;
            for (double v : input) {
                sum += v;
            }
            double avg = sum / input.length;
            
            PredictionResult result = new PredictionResult();
            result.setSuccess(true);
            result.setPrediction(new double[]{avg});
            result.setFallback(true);
            result.setFallbackReason("simplified_model");
            return result;
        }
    }
    
    /**
     * 运行模型服务容错测试
     */
    public static void runModelServiceTests() throws Exception {
        System.out.println("=== 模型服务容错测试 ===");
        
        ModelLoader modelLoader = new ModelLoader();
        
        try {
            // 测试1: 模型加载重试
            System.out.println("\n测试1: 模型加载重试机制");
            boolean loaded = modelLoader.loadModelWithRetry(
                "/models/recommendation/v1", 
                "v1.2.3", 
                3, 
                1000
            );
            System.out.println("模型加载结果: " + (loaded ? "成功" : "失败"));
            
            if (loaded) {
                // 测试2: 推理超时处理
                System.out.println("\n测试2: 推理超时与降级");
                double[] testInput = new double[100];
                for (int i = 0; i < testInput.length; i++) {
                    testInput[i] = Math.random();
                }
                
                // 设置较短的超时时间,测试降级策略
                PredictionResult result = modelLoader.predictWithFallback(
                    testInput,
                    1000, // 1秒超时
                    FallbackStrategies::defaultPrediction
                );
                
                System.out.println("预测结果:");
                System.out.println("  成功: " + result.isSuccess());
                System.out.println("  降级: " + result.isFallback());
                System.out.println("  原因: " + result.getFallbackReason());
                if (result.getPrediction() != null) {
                    System.out.println("  预测值: " + result.getPrediction()[0]);
                }
                
                // 测试3: 批量请求下的故障处理
                System.out.println("\n测试3: 并发请求压力测试");
                int concurrentRequests = 10;
                CountDownLatch latch = new CountDownLatch(concurrentRequests);
                List<Future<PredictionResult>> futures = new ArrayList<>();
                ExecutorService executor = Executors.newFixedThreadPool(concurrentRequests);
                
                for (int i = 0; i < concurrentRequests; i++) {
                    final int requestId = i;
                    futures.add(executor.submit(() -> {
                        try {
                            double[] input = new double[50];
                            Arrays.fill(input, requestId * 0.1);
                            
                            return modelLoader.predictWithFallback(
                                input,
                                3000,
                                () -> FallbackStrategies.cachedPrediction(new double[]{0.5})
                            );
                        } finally {
                            latch.countDown();
                        }
                    }));
                }
                
                latch.await();
                
                int successCount = 0;
                int fallbackCount = 0;
                for (Future<PredictionResult> future : futures) {
                    try {
                        PredictionResult pr = future.get(100, TimeUnit.MILLISECONDS);
                        if (pr.isSuccess()) successCount++;
                        if (pr.isFallback()) fallbackCount++;
                    } catch (Exception e) {
                        System.out.println("请求异常: " + e.getMessage());
                    }
                }
                
                System.out.println("并发测试结果:");
                System.out.println("  总请求数: " + concurrentRequests);
                System.out.println("  成功数: " + successCount);
                System.out.println("  降级数: " + fallbackCount);
                
                executor.shutdown();
            }
            
        } finally {
            modelLoader.shutdown();
        }
    }
    
    public static void main(String[] args) throws Exception {
        runModelServiceTests();
    }
}

(由于篇幅限制,本文只展示了部分内容。完整文章包含故障注入工具实战、容错机制验证、Vue前端容错测试、实时推荐系统实战案例、以及详细的踩坑指南等内容,总字数约12000字。)

结论

AI系统的容错测试不是一次性的任务,而是一个持续的过程。通过系统的故障注入测试,我们不仅能够发现系统的薄弱点,更能主动设计出更健壮的架构。关键收获包括:

  1. 分层容错策略:从硬件到应用层,每一层都需要有针对性的容错机制
  2. 智能降级:AI系统尤其需要精细化的降级策略,避免"全有或全无"的问题
  3. 主动故障注入:在生产环境安全地进行故障演练,才能真正验证系统的韧性
  4. 全链路监控:没有可观测性的容错是盲目的,需要完善的监控告警体系

记住:容错能力不是靠运气实现的,而是通过刻意设计和系统测试构建的。开始你的故障注入测试之旅,构建真正可靠的AI系统吧!


扩展阅读建议

  • Netflix Chaos Engineering实践
  • Google SRE(站点可靠性工程)原则
  • AWS Well-Architected Framework中的可靠性支柱
  • 微服务模式:重试、熔断、限流、降级
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐