使用Locust设计并发场景
注册自定义指标pay_success_count = stats.Counter("pay_success", "薪资支付成功数")pay_failure_count = stats.Counter("pay_failure", "薪资支付失败数")@taskelse:先明确业务场景(递增/峰值/稳定/分布式),匹配HR支付的真实流量特征;用HttpUser@task定义用户行为,通过权重、思考时
Locust并发场景设计全指南(结合HR支付模块实战)
Locust是基于Python的分布式性能测试框架,其核心优势是用代码灵活定义用户行为、支持任意复杂度的并发场景。设计并发场景的本质是“模拟真实用户的操作模式+控制并发用户数/请求频率”,结合HR支付模块(薪资计算、支付、查询)的业务特点,以下从核心基础、典型并发场景设计、进阶技巧、验证与调优四方面拆解落地方法。
一、Locust并发场景设计核心基础
在设计并发场景前,需掌握Locust的核心概念(这些是场景设计的“积木”):
| 核心概念 | 作用 | 示例(HR支付场景) |
|---|---|---|
HttpUser类 |
定义单个并发用户的行为模板(所有并发用户都基于此类实例化) | class SalaryPayUser(HttpUser): |
@task装饰器 |
定义用户的核心操作(可设置权重,模拟不同操作的调用频率) | @task(3) 标记查询接口(调用频率最高) |
wait_time |
定义用户两次操作的“思考时间”(模拟真实用户停顿) | wait_time = between(1, 3)(1-3秒间隔) |
on_start/on_stop |
用户启动/停止时的钩子(如初始化数据、清理资源) | on_start中为每个用户分配唯一emp_id |
| 并发核心参数 | 控制整体并发规模: - users(总并发用户数)- spawn_rate(每秒启动用户数)- run_time(压测时长) |
启动命令:locust -f xxx.py -u 300 -r 10 -t 2m(300用户、每秒启10个、持续2分钟) |
二、HR支付模块典型并发场景设计(附完整脚本)
场景1:递增并发(爬坡)—— 定位性能拐点
设计思路
从0开始逐步增加并发用户数(如0→50→100→200→300),观察响应时间/错误率的变化,找到接口的“性能拐点”(如200用户时响应时间突增)。适合新接口性能摸底、瓶颈定位(如HR薪资计算接口)。
核心配置
- 总用户数(
users):300 - 每秒启动用户数(
spawn_rate):10(30秒内启动完所有用户) - 持续时间:启动完成后稳定运行5分钟
完整脚本(HR薪资计算接口)
# locustfile_salary_calc_ramp.py
from locust import HttpUser, task, between, events, LoadTestShape
import os
from utils.api_utils import PerfAPIClient # 复用之前的请求封装
from utils.log_utils import log
# 自定义递增负载模型(更精细控制爬坡速度)
class StepLoadShape(LoadTestShape):
"""阶梯递增负载:每10分钟增加50用户,共300用户"""
step_time = 600 # 每个阶梯持续时间(秒)
step_load = 50 # 每个阶梯增加的用户数
spawn_rate = 10 # 每秒启动用户数
time_limit = 3600 # 总压测时长(秒)
def tick(self):
run_time = self.get_run_time()
# 超过总时长则停止
if run_time > self.time_limit:
return None
# 计算当前阶梯
current_step = int(run_time // self.step_time) + 1
# 计算当前用户数
current_users = current_step * self.step_load
# 不超过最大用户数300
current_users = min(current_users, 300)
return (current_users, self.spawn_rate)
# 压测结束后校验性能指标
@events.test_stop.add_listener
def validate_metrics(environment):
stats = environment.stats.get("/salary/calc")
p95 = stats.response_time.get_percentile(95)
error_rate = stats.num_failures / stats.num_requests if stats.num_requests > 0 else 0
# 断言HR薪资计算接口核心指标
assert p95 <= 500, f"薪资计算接口P95响应时间{p95}ms > 500ms"
assert error_rate <= 0.005, f"薪资计算接口错误率{error_rate:.2%} > 0.5%"
log.info(f"递增并发场景校验通过:P95={p95}ms,错误率={error_rate:.2%}")
# 并发用户行为定义
class SalaryCalcRampUser(HttpUser):
wait_time = between(1, 3) # 真实用户思考时间
host = os.getenv("PERF_BASE_URL") # 从环境变量获取HR支付环境地址
def on_start(self):
"""每个用户启动时初始化:分配唯一测试员工ID"""
self.api_client = PerfAPIClient(locust_client=self.client)
self.emp_id = f"perf_ramp_{self.user_id + 1000}" # 避免用户间数据冲突
@task(1) # 唯一任务,权重1
def test_salary_calc(self):
"""模拟用户调用薪资计算接口"""
try:
self.api_client.send_request(
method="POST",
endpoint="/salary/calc",
json={
"emp_id": self.emp_id,
"pay_month": "2024-10",
"gross_salary": 12000.0, # 模拟真实薪资范围
"social_insurance": 960.0,
"special_deduction": 1500.0
}
)
except Exception as e:
log.error(f"用户{self.user_id}薪资计算请求失败:{str(e)}")
raise # 标记为失败请求,计入错误率
执行命令
# Web UI模式(可视化控制爬坡过程)
locust -f locustfile_salary_calc_ramp.py --web-port 8089
# 无界面模式(按自定义阶梯负载执行)
locust -f locustfile_salary_calc_ramp.py --headless -t 3600s --csv=reports/ramp_perf
场景2:峰值并发(突发)—— 模拟发薪日峰值
设计思路
瞬间启动大量并发用户(如300用户),持续短时间(2-5分钟),模拟HR支付模块“发薪日9点集中支付”的突发流量。适合验证核心接口的峰值承载能力(如薪资支付接口)。
核心配置
- 总用户数:300
- 每秒启动用户数:50(6秒内启动完所有用户,模拟突发)
- 持续时间:2分钟(发薪日核心峰值时段)
完整脚本(HR薪资支付接口)
# locustfile_salary_pay_peak.py
from locust import HttpUser, task, between, events
import os
import random
from utils.api_utils import PerfAPIClient
from utils.db_utils import check_pay_duplicate, count_pay_records
# 峰值并发后校验数据一致性(HR支付核心:无重复支付)
@events.test_stop.add_listener
def validate_pay_data(environment):
# 1. 性能指标校验
stats = environment.stats.get("/salary/pay")
p95 = stats.response_time.get_percentile(95)
error_rate = stats.num_failures / stats.num_requests if stats.num_requests > 0 else 0
assert p95 <= 800, f"薪资支付接口P95={p95}ms > 800ms"
assert error_rate <= 0.01, f"薪资支付接口错误率={error_rate:.2%} > 1%"
# 2. 数据一致性校验(HR支付财务安全底线)
total_success = stats.num_requests - stats.num_failures
db_count = count_pay_records(pay_month="2024-10")
assert db_count == total_success, f"支付记录数不一致:DB={db_count},请求={total_success}"
assert not check_pay_duplicate(), "薪资支付接口存在重复支付记录"
class SalaryPayPeakUser(HttpUser):
wait_time = between(0.5, 1.5) # 峰值时用户操作间隔更短(模拟紧急发薪)
host = os.getenv("PERF_BASE_URL")
def on_start(self):
self.api_client = PerfAPIClient(locust_client=self.client)
# 随机分配测试员工ID(避免并发操作同一数据)
self.emp_id = f"perf_peak_{random.randint(1000, 4000)}"
@task(1)
def test_salary_pay(self):
"""模拟用户调用薪资支付接口"""
self.api_client.send_request(
method="POST",
endpoint="/salary/pay",
json={
"emp_id": self.emp_id,
"pay_month": "2024-10",
"bank_card": f"622202{random.randint(10000000, 99999999)}" # 随机银行卡
}
)
执行命令
# 无界面模式(峰值并发)
locust -f locustfile_salary_pay_peak.py --headless -u 300 -r 50 -t 2m \
--html=reports/peak_pay_report.html
场景3:稳定并发(长时间)—— 验证服务稳定性
设计思路
保持固定并发用户数(如150),持续长时间运行(30分钟+),模拟HR支付模块“日常业务负载”,验证服务是否存在内存泄漏、连接池耗尽等问题。适合核心接口的长期稳定性验证(混合接口调用)。
核心配置
- 总用户数:150
- 每秒启动用户数:5(缓慢启动,避免突发)
- 持续时间:30分钟
完整脚本(HR混合接口:计算+支付+查询)
# locustfile_mixed_stable.py
from locust import HttpUser, task, between, events
import os
import random
class MixedStableUser(HttpUser):
wait_time = between(1, 4) # 日常操作间隔
host = os.getenv("PERF_BASE_URL")
def on_start(self):
self.api_client = PerfAPIClient(locust_client=self.client)
self.emp_id = f"perf_mixed_{random.randint(1000, 5000)}"
self.pay_month = "2024-10"
# 任务权重:查询(3) > 计算(2) > 支付(1)(贴合真实业务比例)
@task(3)
def test_pay_query(self):
"""模拟用户查询支付结果(高频操作)"""
self.api_client.send_request(
method="GET",
endpoint="/salary/pay/query",
params={"emp_id": self.emp_id, "pay_month": self.pay_month}
)
@task(2)
def test_salary_calc(self):
"""模拟用户计算薪资(中频操作)"""
self.api_client.send_request(
method="POST",
endpoint="/salary/calc",
json={
"emp_id": self.emp_id,
"pay_month": self.pay_month,
"gross_salary": random.uniform(5000, 20000), # 随机薪资
"social_insurance": random.uniform(500, 2000),
"special_deduction": random.uniform(0, 3000)
}
)
@task(1)
def test_salary_pay(self):
"""模拟用户发起支付(低频操作)"""
self.api_client.send_request(
method="POST",
endpoint="/salary/pay",
json={
"emp_id": self.emp_id,
"pay_month": self.pay_month,
"bank_card": f"622202{random.randint(10000000, 99999999)}"
}
)
# 长时间运行后校验资源与指标
@events.test_stop.add_listener
def validate_stability(environment):
# 1. 整体错误率校验
total_stats = environment.stats.total
total_error_rate = total_stats.num_failures / total_stats.num_requests if total_stats.num_requests > 0 else 0
assert total_error_rate <= 0.005, f"混合接口整体错误率={total_error_rate:.2%} > 0.5%"
# 2. 各接口P95校验
calc_p95 = environment.stats.get("/salary/calc").response_time.get_percentile(95)
pay_p95 = environment.stats.get("/salary/pay").response_time.get_percentile(95)
query_p95 = environment.stats.get("/salary/pay/query").response_time.get_percentile(95)
assert calc_p95 <= 500 and pay_p95 <= 800 and query_p95 <= 300, "部分接口P95超标"
执行命令
locust -f locustfile_mixed_stable.py --headless -u 150 -r 5 -t 30m \
--csv=reports/stable_mixed_perf --logfile=logs/stable_perf.log
场景4:分布式并发—— 支撑超大规模并发
设计思路
当单台Locust节点无法模拟足够并发(如1000+用户)时,启动多个Locust节点(slave),由一个主节点(master)统一控制,适合生产级超大并发验证(如全公司发薪场景)。
执行步骤
- 启动主节点(Master):
locust -f locustfile_salary_pay_peak.py --master --web-port 8089
- 启动多个从节点(Slave,每台机器执行):
# 从节点1
locust -f locustfile_salary_pay_peak.py --slave --master-host=192.168.1.100
# 从节点2
locust -f locustfile_salary_pay_peak.py --slave --master-host=192.168.1.100
- 主节点Web UI中设置总用户数(如1000)、生成速率(20),启动压测(所有从节点协同模拟并发)。
三、并发场景设计进阶技巧
1. 参数化请求数据—— 避免“假并发”
若所有并发用户使用相同请求参数(如同一emp_id),会触发缓存/数据库锁,导致性能指标失真。解决方案:
# 方法1:基于用户ID生成唯一参数
def on_start(self):
self.emp_id = f"perf_{self.user_id}" # self.user_id是Locust内置唯一ID
# 方法2:从文件加载参数池,随机选取
import csv
def load_emp_data():
emp_list = []
with open("test_data/emp_list.csv", "r") as f:
reader = csv.DictReader(f)
for row in reader:
emp_list.append(row)
return emp_list
emp_pool = load_emp_data()
class MyUser(HttpUser):
def on_start(self):
self.emp_data = random.choice(emp_pool) # 随机选员工数据
2. 模拟异常场景并发—— 验证容错能力
在并发场景中注入异常(如银行接口超时、数据库慢查询),验证HR支付接口的容错性:
@task(1)
def test_pay_with_exception(self):
"""模拟10%的请求触发银行接口超时"""
if random.random() < 0.1: # 10%概率
# 手动构造超时请求(或修改请求参数触发后端超时)
self.api_client.send_request(
method="POST",
endpoint="/salary/pay",
json={
"emp_id": self.emp_id,
"pay_month": self.pay_month,
"bank_card": "timeout_123456" # 后端识别该卡号触发超时
}
)
else:
# 正常请求
self.api_client.send_request(
method="POST",
endpoint="/salary/pay",
json={...}
)
3. 自定义并发指标—— 监控业务维度
除Locust默认指标(响应时间、RPS),可自定义监控HR支付的业务指标(如支付成功率、计算准确率):
from locust import stats
# 注册自定义指标
pay_success_count = stats.Counter("pay_success", "薪资支付成功数")
pay_failure_count = stats.Counter("pay_failure", "薪资支付失败数")
@task
def test_salary_pay(self):
response = self.api_client.send_request(...)
if response.json()["code"] == 0:
pay_success_count.inc()
else:
pay_failure_count.inc()
四、并发场景设计与执行注意事项
1. 保障接口幂等性
HR支付接口(尤其是支付类)必须实现幂等性(如基于emp_id+pay_month做唯一索引),避免并发场景下重复支付、数据错乱。
2. 梯度压测,逐步提升并发
不要直接启动峰值并发(如300用户),先从50用户开始,验证指标正常后再逐步提升,避免直接压垮服务。
3. 环境与数据隔离
- 压测环境必须与生产/测试环境隔离,避免影响真实业务;
- 每次压测前清理历史数据(如测试员工的支付记录),保证测试基线一致。
4. 实时监控资源
压测时同步监控:
- 应用服务器:CPU、内存、线程数、GC(Python则监控进程内存);
- 数据库:QPS、慢查询、连接数、锁等待;
- 网络:带宽、延迟。
5. 控制思考时间(wait_time)
思考时间直接影响真实RPS:
- 峰值场景:思考时间短(0.5-1.5秒),模拟用户集中操作;
- 日常场景:思考时间长(1-4秒),贴合真实用户操作节奏。
五、总结
Locust设计并发场景的核心是**“业务行为建模+并发参数控制”**:
- 先明确业务场景(递增/峰值/稳定/分布式),匹配HR支付的真实流量特征;
- 用
HttpUser+@task定义用户行为,通过权重、思考时间模拟真实操作; - 用
users/spawn_rate/run_time控制并发规模,复杂场景用LoadTestShape自定义负载模型; - 并发场景不仅要验证响应时间/吞吐量,还要校验HR支付的核心诉求——数据一致性;
- 超大规模并发通过分布式Locust实现,同时做好环境隔离和资源监控。
通过以上方法,可灵活设计覆盖HR支付模块全场景的并发测试,既验证接口的高并发承载能力,又保障财务数据的安全性。
更多推荐




所有评论(0)