Disruptor深度源码解析以及实践案例
Disruptor是一种高性能无锁消息队列框架,其核心采用环形数组(RingBuffer)作为数据结构,通过序号(Sequence)管理实现生产者和消费者的高效通信。关键技术包括:CAS操作、内存屏障和缓存行填充,支持单/多生产者和消费者模式。主要组件包括:RingBuffer(环形队列)、Sequencer(序号管理)、EventHandler(事件处理器)和WaitStrategy(等待策略)
·
一、核心原理与架构
Disruptor 的核心思想是:用环形数组(RingBuffer)作为队列,生产者和消费者通过序号(Sequence)推进,无锁/低锁地通信。
- RingBuffer:环形队列,核心数据结构。
- Sequence:序号,记录读写进度。
- Event:队列元素(自定义事件对象)。
- Producer:生产者,写入事件。
- Consumer/EventHandler:消费者,读取并处理事件。
- WaitStrategy:等待策略,决定消费者如何等待新事件(如自旋、阻塞等)。
Disruptor 支持多生产者/多消费者,并通过 CAS、内存屏障、缓存行填充等技术优化并发性能。
二、源码结构与关键类
主要源码包:
com.lmax.disruptor.RingBuffer
:环形缓冲区com.lmax.disruptor.Sequence
:序号管理com.lmax.disruptor.EventProcessor
、EventHandler
:事件处理器com.lmax.disruptor.WaitStrategy
:等待策略com.lmax.disruptor.dsl.Disruptor
:DSL入口,简化配置
三、核心实现细节与深度源码解析
3.1 RingBuffer
RingBuffer 是一个数组,容量必须为2的幂。每个元素是一个 Event 对象。
public final class RingBuffer<E> {
private final Object[] entries;
private final int bufferSize;
private final long indexMask;
private final Sequencer sequencer;
public long next() { return sequencer.next(); }
public void publish(long sequence) { sequencer.publish(sequence); }
public E get(long sequence) { return (E)entries[(int)(sequence & indexMask)]; }
}
next()
获取下一个可写序号publish(sequence)
发布事件get(sequence)
获取事件对象
3.2 Sequence 与 Sequencer
Sequence 是一个 long 类型的序号,表示数据的位置。
Sequencer 管理序号分配和发布,分为单生产者(SingleProducerSequencer)和多生产者(MultiProducerSequencer)。
关键实现(以单生产者为例):
public final class SingleProducerSequencer extends AbstractSequencer {
private volatile long nextValue = Sequence.INITIAL_VALUE;
public long next() {
long nextSeq = nextValue + 1;
// 检查是否有足够空间
nextValue = nextSeq;
return nextSeq;
}
public void publish(long sequence) {
cursor.set(sequence);
// 唤醒消费者
waitStrategy.signalAllWhenBlocking();
}
}
- 多生产者用 CAS 竞争序号,保证无锁安全。
3.3 EventHandler 与 EventProcessor
EventHandler 是消费者的业务回调接口:
public interface EventHandler<T> {
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
EventProcessor 管理消费者线程,循环读取 RingBuffer:
public void run() {
while (running) {
long nextSequence = sequence.get() + 1;
if (availableSequence >= nextSequence) {
eventHandler.onEvent(ringBuffer.get(nextSequence), nextSequence, ...);
sequence.set(nextSequence);
} else {
waitStrategy.waitFor(nextSequence, ...);
}
}
}
3.4 WaitStrategy
决定消费者如何等待新事件:
- BusySpinWaitStrategy:自旋等待,低延迟但高CPU
- BlockingWaitStrategy:阻塞等待,低CPU但延迟略高
- YieldingWaitStrategy:自旋+让出CPU,适合多消费者
源码片段(BlockingWaitStrategy):
public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {
lock.lock();
try {
while (cursor.get() < sequence) {
processorNotifyCondition.await();
}
} finally {
lock.unlock();
}
return cursor.get();
}
四、Disruptor 实践案例
4.1 定义事件类
public class LongEvent {
private long value;
public void set(long value) { this.value = value; }
public long getValue() { return value; }
}
4.2 事件工厂
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
4.3 事件处理器
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event.getValue());
}
}
4.4 构建 Disruptor 并启动
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<>(
new LongEventFactory(),
bufferSize,
Executors.defaultThreadFactory(),
ProducerType.SINGLE,
new BusySpinWaitStrategy()
);
// 注册事件处理器
disruptor.handleEventsWith(new LongEventHandler());
// 启动
disruptor.start();
// 获取 RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 发布事件
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.set(123L);
} finally {
ringBuffer.publish(sequence);
}
五、性能与应用场景
- 单机百万级 TPS,延迟微秒级
- 适合高频交易、日志收集、实时监控等场景
- Netty、RocketMQ、Elasticsearch 等都借鉴了 Disruptor 的思想
六、源码阅读建议
- RingBuffer:数据结构和序号管理
- Sequencer:单/多生产者序号分配
- WaitStrategy:不同等待策略源码
- EventProcessor:消费者线程循环
- Disruptor DSL:整体组装与启动流程
七、总结亮点
- 无锁/低锁,极致性能
- 内存屏障、缓存行填充,减少伪共享
- 灵活的等待策略,适应不同场景
- 丰富的事件处理链(支持多阶段流水线)
八、 官方基准测试数据(LMAX White Paper)
LMAX 官方论文中对比了 Disruptor、Java BlockingQueue 以及 ArrayBlockingQueue 的性能:
队列类型 | 单线程吞吐量(TPS) | 多线程吞吐量(TPS) |
---|---|---|
Disruptor | 25,000,000+ | 45,000,000+ |
ArrayBlockingQueue | 8,000,000 | 10,000,000 |
LinkedBlockingQueue | 2,000,000 | 2,000,000 |
结论:
- 单线程下 Disruptor 性能约为 ArrayBlockingQueue 的 3 倍,LinkedBlockingQueue 的 12 倍。
- 多线程下 Disruptor 性能提升更为明显,远超标准 JDK 队列。
九、 总结
- 吞吐量:单机 2500万~4500万 TPS,远超 JDK 标准队列
- 延迟:微秒级(1~10μs),极低
- 场景:高频交易、日志收集、实时监控等
知识传递,互通有无,创作不易,点点关注!
更多推荐
所有评论(0)