以下内容节选自我的实战课程《从0到1教你搭建一个基于微信小程序的AI智能体应用平台》,课程包含详细代码和讲解,链接如下: https://edu.csdn.net/course/detail/40753

课程目标

  • 建立 完整的单元测试、集成测试和端到端测试体系
  • 验证 所有功能模块的正确性和稳定性
  • 确保 系统在各种场景下的可靠运行

知识要点

测试类型与策略

  • 单元测试:测试最小可测试单元(函数、方法、类)
  • 集成测试:测试模块间的交互和接口
  • 端到端测试:测试完整的用户场景和业务流程
  • 测试金字塔:单元测试 > 集成测试 > E2E测试

测试覆盖策略

  • 代码覆盖率:语句覆盖、分支覆盖、路径覆盖
  • 功能覆盖率:功能点覆盖、场景覆盖、边界覆盖
  • 数据覆盖率:正常数据、异常数据、边界数据
  • 用户场景覆盖:典型场景、异常场景、边界场景

测试数据管理

  • 测试数据准备:Mock数据、测试数据库、数据工厂
  • 数据隔离:测试环境隔离、数据清理、并发安全
  • 数据验证:数据完整性、数据一致性、数据正确性

测试体系架构

测试执行流程

实战步骤详解

1. 后端单元测试

测试框架配置
import unittest
import pytest
from unittest.mock import Mock, patch, MagicMock
from app.services.user_service import UserService
from app.services.chat_service import ChatService
from app.services.payment_service import PaymentService
from app.models.user import User
from app.models.chat import Chat
from app.models.payment import Payment

class TestUserService(unittest.TestCase):
    def setUp(self):
        """测试前准备"""
        self.user_service = UserService()
        self.mock_db = Mock()
        self.user_service.db = self.mock_db
    
    def test_create_user_success(self):
        """测试用户创建成功"""
        user_data = {
            'openid': 'test_openid_123',
            'nickname': '测试用户',
            'avatar': 'https://example.com/avatar.jpg'
        }
        
        # Mock数据库操作
        self.mock_db.users.find_one.return_value = None  # 用户不存在
        self.mock_db.users.insert_one.return_value = Mock(inserted_id='user_123')
        
        result = self.user_service.create_user(user_data)
        
        # 验证结果
        self.assertTrue(result['success'])
        self.assertEqual(result['user_id'], 'user_123')
        self.assertIn('创建成功', result['message'])
        
        # 验证数据库调用
        self.mock_db.users.find_one.assert_called_once_with({'openid': 'test_openid_123'})
        self.mock_db.users.insert_one.assert_called_once()
    
    def test_create_user_duplicate(self):
        """测试用户创建重复"""
        user_data = {'openid': 'existing_openid'}
        
        # Mock数据库返回已存在用户
        self.mock_db.users.find_one.return_value = {'user_id': 'existing_user'}
        
        result = self.user_service.create_user(user_data)
        
        # 验证结果
        self.assertFalse(result['success'])
        self.assertIn('已存在', result['error'])
        
        # 验证数据库调用
        self.mock_db.users.find_one.assert_called_once_with({'openid': 'existing_openid'})
        self.mock_db.users.insert_one.assert_not_called()
    
    def test_get_user_points(self):
        """测试获取用户积分"""
        user_id = 'user_123'
        expected_points = 100
        
        # Mock数据库返回用户数据
        self.mock_db.users.find_one.return_value = {
            'user_id': user_id,
            'points': expected_points
        }
        
        points = self.user_service.get_user_points(user_id)
        
        # 验证结果
        self.assertEqual(points, expected_points)
        self.mock_db.users.find_one.assert_called_once_with({'user_id': user_id})
    
    def test_deduct_points_success(self):
        """测试扣除积分成功"""
        user_id = 'user_123'
        deduct_amount = 10
        current_points = 50
        
        # Mock数据库操作
        self.mock_db.users.find_one.return_value = {
            'user_id': user_id,
            'points': current_points
        }
        self.mock_db.users.update_one.return_value = Mock(modified_count=1)
        
        result = self.user_service.deduct_points(user_id, deduct_amount)
        
        # 验证结果
        self.assertTrue(result['success'])
        self.assertEqual(result['remaining_points'], current_points - deduct_amount)
        
        # 验证数据库调用
        self.mock_db.users.update_one.assert_called_once_with(
            {'user_id': user_id},
            {'$inc': {'points': -deduct_amount}}
        )
    
    def test_deduct_points_insufficient(self):
        """测试积分不足"""
        user_id = 'user_123'
        deduct_amount = 100
        current_points = 50
        
        # Mock数据库返回积分不足的用户
        self.mock_db.users.find_one.return_value = {
            'user_id': user_id,
            'points': current_points
        }
        
        result = self.user_service.deduct_points(user_id, deduct_amount)
        
        # 验证结果
        self.assertFalse(result['success'])
        self.assertIn('积分不足', result['error'])
        
        # 验证数据库未更新
        self.mock_db.users.update_one.assert_not_called()

class TestChatService(unittest.TestCase):
    def setUp(self):
        """测试前准备"""
        self.chat_service = ChatService()
        self.mock_db = Mock()
        self.chat_service.db = self.mock_db
    
    def test_send_message_success(self):
        """测试发送消息成功"""
        message_data = {
            'user_id': 'user_123',
            'message': '你好,AI助手!',
            'message_type': 'text',
            'agent_id': 'default_agent'
        }
        
        # Mock数据库操作
        self.mock_db.chats.insert_one.return_value = Mock(inserted_id='chat_123')
        
        # Mock AI服务调用
        with patch('app.services.chat_service.ai_service') as mock_ai:
            mock_ai.generate_response.return_value = {
                'success': True,
                'response': '你好!有什么可以帮助你的吗?'
            }
            
            result = self.chat_service.send_message(message_data)
        
        # 验证结果
        self.assertTrue(result['success'])
        self.assertEqual(result['chat_id'], 'chat_123')
        self.assertIn('response', result)
        
        # 验证数据库调用
        self.mock_db.chats.insert_one.assert_called_once()
    
    def test_get_chat_history(self):
        """测试获取聊天历史"""
        user_id = 'user_123'
        expected_chats = [
            {'chat_id': 'chat_1', 'message': '消息1', 'created_at': '2024-01-01'},
            {'chat_id': 'chat_2', 'message': '消息2', 'created_at': '2024-01-02'}
        ]
        
        # Mock数据库返回聊天历史
        self.mock_db.chats.find.return_value = expected_chats
        
        result = self.chat_service.get_chat_history(user_id)
        
        # 验证结果
        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]['message'], '消息1')
        
        # 验证数据库调用
        self.mock_db.chats.find.assert_called_once_with(
            {'user_id': user_id},
            sort=[('created_at', -1)]
        )

class TestPaymentService(unittest.TestCase):
    def setUp(self):
        """测试前准备"""
        self.payment_service = PaymentService()
        self.mock_db = Mock()
        self.payment_service.db = self.mock_db
    
    def test_create_payment_order(self):
        """测试创建支付订单"""
        payment_data = {
            'user_id': 'user_123',
            'amount': 10.0,
            'payment_method': 'wechat'
        }
        
        # Mock数据库操作
        self.mock_db.payments.insert_one.return_value = Mock(inserted_id='order_123')
        
        result = self.payment_service.create_payment_order(payment_data)
        
        # 验证结果
        self.assertTrue(result['success'])
        self.assertEqual(result['order_id'], 'order_123')
        self.assertIn('payment_url', result)
        
        # 验证数据库调用
        self.mock_db.payments.insert_one.assert_called_once()
    
    def test_process_payment_callback(self):
        """测试处理支付回调"""
        callback_data = {
            'order_id': 'order_123',
            'status': 'paid',
            'transaction_id': 'txn_123'
        }
        
        # Mock数据库操作
        self.mock_db.payments.find_one.return_value = {
            'order_id': 'order_123',
            'user_id': 'user_123',
            'amount': 10.0,
            'status': 'pending'
        }
        self.mock_db.payments.update_one.return_value = Mock(modified_count=1)
        self.mock_db.users.update_one.return_value = Mock(modified_count=1)
        
        result = self.payment_service.process_payment_callback(callback_data)
        
        # 验证结果
        self.assertTrue(result['success'])
        self.assertEqual(result['status'], 'paid')
        
        # 验证数据库调用
        self.mock_db.payments.update_one.assert_called_once()
        self.mock_db.users.update_one.assert_called_once()

2. 前端单元测试

测试框架配置
// 使用Jest进行前端单元测试
describe('ChatManager', () => {
  let chatManager;
  let mockApiService;
  
  beforeEach(() => {
    // 创建Mock对象
    mockApiService = {
      sendMessage: jest.fn(),
      getHistory: jest.fn(),
      uploadImage: jest.fn()
    };
    
    chatManager = new ChatManager(mockApiService);
  });
  
  afterEach(() => {
    jest.clearAllMocks();
  });
  
  describe('sendMessage', () => {
    it('should send text message successfully', async () => {
      const message = 'Hello, AI!';
      const mockResponse = {
        success: true,
        data: {
          chat_id: 'chat_123',
          response: 'Hello! How can I help you?'
        }
      };
      
      mockApiService.sendMessage.mockResolvedValue(mockResponse);
      
      const result = await chatManager.sendMessage(message);
      
      expect(result.success).toBe(true);
      expect(result.data.response).toBe('Hello! How can I help you?');
      expect(mockApiService.sendMessage).toHaveBeenCalledWith({
        message: message,
        message_type: 'text'
      });
    });
    
    it('should handle send message failure', async () => {
      const message = 'Hello, AI!';
      const mockError = {
        success: false,
        error: 'Network error'
      };
      
      mockApiService.sendMessage.mockResolvedValue(mockError);
      
      const result = await chatManager.sendMessage(message);
      
      expect(result.success).toBe(false);
      expect(result.error).toBe('Network error');
    });
    
    it('should validate message before sending', async () => {
      const emptyMessage = '';
      
      const result = await chatManager.sendMessage(emptyMessage);
      
      expect(result.success).toBe(false);
      expect(result.error).toContain('消息不能为空');
      expect(mockApiService.sendMessage).not.toHaveBeenCalled();
    });
  });
  
  describe('getChatHistory', () => {
    it('should get chat history successfully', async () => {
      const mockHistory = [
        { chat_id: 'chat_1', message: 'Hello', created_at: '2024-01-01' },
        { chat_id: 'chat_2', message: 'Hi', created_at: '2024-01-02' }
      ];
      
      mockApiService.getHistory.mockResolvedValue({
        success: true,
        data: mockHistory
      });
      
      const result = await chatManager.getChatHistory();
      
      expect(result.success).toBe(true);
      expect(result.data).toHaveLength(2);
      expect(mockApiService.getHistory).toHaveBeenCalled();
    });
  });
  
  describe('uploadImage', () => {
    it('should upload image successfully', async () => {
      const mockFile = new File(['test'], 'test.jpg', { type: 'image/jpeg' });
      const mockResponse = {
        success: true,
        data: {
          image_url: 'https://example.com/image.jpg'
        }
      };
      
      mockApiService.uploadImage.mockResolvedValue(mockResponse);
      
      const result = await chatManager.uploadImage(mockFile);
      
      expect(result.success).toBe(true);
      expect(result.data.image_url).toBe('https://example.com/image.jpg');
      expect(mockApiService.uploadImage).toHaveBeenCalledWith(mockFile);
    });
    
    it('should validate image file type', async () => {
      const invalidFile = new File(['test'], 'test.txt', { type: 'text/plain' });
      
      const result = await chatManager.uploadImage(invalidFile);
      
      expect(result.success).toBe(false);
      expect(result.error).toContain('不支持的图片格式');
      expect(mockApiService.uploadImage).not.toHaveBeenCalled();
    });
  });
});

describe('UserManager', () => {
  let userManager;
  let mockApiService;
  
  beforeEach(() => {
    mockApiService = {
      login: jest.fn(),
      register: jest.fn(),
      getUserProfile: jest.fn(),
      updateProfile: jest.fn()
    };
    
    userManager = new UserManager(mockApiService);
  });
  
  describe('login', () => {
    it('should login successfully', async () => {
      const loginData = { openid: 'test_openid' };
      const mockResponse = {
        success: true,
        data: {
          user_id: 'user_123',
          token: 'jwt_token_123',
          user_info: { nickname: 'Test User' }
        }
      };
      
      mockApiService.login.mockResolvedValue(mockResponse);
      
      const result = await userManager.login(loginData);
      
      expect(result.success).toBe(true);
      expect(result.data.token).toBe('jwt_token_123');
      expect(mockApiService.login).toHaveBeenCalledWith(loginData);
    });
  });
  
  describe('getUserProfile', () => {
    it('should get user profile successfully', async () => {
      const mockProfile = {
        user_id: 'user_123',
        nickname: 'Test User',
        points: 100,
        avatar: 'https://example.com/avatar.jpg'
      };
      
      mockApiService.getUserProfile.mockResolvedValue({
        success: true,
        data: mockProfile
      });
      
      const result = await userManager.getUserProfile();
      
      expect(result.success).toBe(true);
      expect(result.data.points).toBe(100);
    });
  });
});

3. 端到端测试

端到端测试框架
// 使用Puppeteer进行端到端测试
const puppeteer = require('puppeteer');

describe('微信小程序端到端测试', () => {
  let browser;
  let page;
  
  beforeAll(async () => {
    browser = await puppeteer.launch({
      headless: false,
      slowMo: 100,
      args: ['--no-sandbox', '--disable-setuid-sandbox']
    });
    page = await browser.newPage();
    
    // 设置视口大小
    await page.setViewport({ width: 375, height: 667 });
  });
  
  afterAll(async () => {
    await browser.close();
  });
  
  beforeEach(async () => {
    // 每个测试前清理状态
    await page.goto('about:blank');
  });
  
  describe('用户登录流程', () => {
    it('should complete user login successfully', async () => {
      // 1. 访问登录页面
      await page.goto('http://localhost:3000/login');
      
      // 2. 等待页面加载
      await page.waitForSelector('.login-form');
      
      // 3. 模拟微信登录
      await page.click('.wechat-login-btn');
      
      // 4. 等待登录完成
      await page.waitForSelector('.user-profile', { timeout: 10000 });
      
      // 5. 验证登录成功
      const userInfo = await page.$eval('.user-profile', el => el.textContent);
      expect(userInfo).toContain('欢迎');
    });
  });
  
  describe('聊天功能流程', () => {
    it('should complete chat flow successfully', async () => {
      // 1. 登录用户
      await loginUser(page);
      
      // 2. 进入聊天页面
      await page.click('.chat-tab');
      await page.waitForSelector('.chat-container');
      
      // 3. 发送文本消息
      await page.type('.message-input', '你好,AI助手!');
      await page.click('.send-btn');
      
      // 4. 等待AI回复
      await page.waitForSelector('.ai-message', { timeout: 10000 });
      
      // 5. 验证消息显示
      const messages = await page.$$eval('.message-item', items => 
        items.map(item => item.textContent)
      );
      
      expect(messages).toContain('你好,AI助手!');
      expect(messages.some(msg => msg.includes('AI') || msg.includes('助手'))).toBe(true);
    });
    
    it('should handle image upload successfully', async () => {
      // 1. 登录用户
      await loginUser(page);
      
      // 2. 进入聊天页面
      await page.click('.chat-tab');
      await page.waitForSelector('.chat-container');
      
      // 3. 点击图片上传按钮
      await page.click('.image-upload-btn');
      
      // 4. 选择图片文件
      const fileInput = await page.$('input[type="file"]');
      await fileInput.uploadFile('./test-images/test.jpg');
      
      // 5. 等待图片上传完成
      await page.waitForSelector('.image-message', { timeout: 10000 });
      
      // 6. 验证图片显示
      const imageElement = await page.$('.image-message img');
      expect(imageElement).toBeTruthy();
    });
  });
  
  describe('支付功能流程', () => {
    it('should complete payment flow successfully', async () => {
      // 1. 登录用户
      await loginUser(page);
      
      // 2. 进入充值页面
      await page.click('.recharge-tab');
      await page.waitForSelector('.recharge-container');
      
      // 3. 选择充值金额
      await page.click('.amount-option[data-amount="10"]');
      
      // 4. 点击充值按钮
      await page.click('.recharge-btn');
      
      // 5. 等待支付页面
      await page.waitForSelector('.payment-container', { timeout: 5000 });
      
      // 6. 模拟支付成功
      await page.click('.mock-payment-success');
      
      // 7. 等待支付完成
      await page.waitForSelector('.payment-success', { timeout: 10000 });
      
      // 8. 验证积分更新
      const pointsElement = await page.$('.user-points');
      const points = await page.evaluate(el => el.textContent, pointsElement);
      expect(parseInt(points)).toBeGreaterThan(0);
    });
  });
  
  describe('错误处理流程', () => {
    it('should handle network error gracefully', async () => {
      // 1. 登录用户
      await loginUser(page);
      
      // 2. 模拟网络错误
      await page.setOfflineMode(true);
      
      // 3. 尝试发送消息
      await page.type('.message-input', '测试消息');
      await page.click('.send-btn');
      
      // 4. 验证错误提示
      await page.waitForSelector('.error-message', { timeout: 5000 });
      const errorMessage = await page.$eval('.error-message', el => el.textContent);
      expect(errorMessage).toContain('网络错误');
      
      // 5. 恢复网络
      await page.setOfflineMode(false);
    });
  });
});

// 辅助函数
async function loginUser(page) {
  await page.goto('http://localhost:3000/login');
  await page.waitForSelector('.login-form');
  await page.click('.wechat-login-btn');
  await page.waitForSelector('.user-profile', { timeout: 10000 });
}

验收标准

单元测试验收

  • 代码覆盖率 > 80%
  • 关键函数测试覆盖率 100%
  • 所有测试用例通过
  • 测试用例覆盖正常和异常场景

集成测试验收

  • 所有API接口测试通过
  • 数据一致性验证通过
  • 业务流程测试通过
  • 错误处理测试通过

端到端测试验收

  • 核心用户场景测试通过
  • 关键业务流程测试通过
  • 异常场景处理测试通过
  • 性能要求测试通过
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐