一、核心原理与架构

Disruptor 的核心思想是:用环形数组(RingBuffer)作为队列,生产者和消费者通过序号(Sequence)推进,无锁/低锁地通信。

  • RingBuffer:环形队列,核心数据结构。
  • Sequence:序号,记录读写进度。
  • Event:队列元素(自定义事件对象)。
  • Producer:生产者,写入事件。
  • Consumer/EventHandler:消费者,读取并处理事件。
  • WaitStrategy:等待策略,决定消费者如何等待新事件(如自旋、阻塞等)。

Disruptor 支持多生产者/多消费者,并通过 CAS、内存屏障、缓存行填充等技术优化并发性能。


二、源码结构与关键类

主要源码包:

  • com.lmax.disruptor.RingBuffer:环形缓冲区
  • com.lmax.disruptor.Sequence:序号管理
  • com.lmax.disruptor.EventProcessorEventHandler:事件处理器
  • com.lmax.disruptor.WaitStrategy:等待策略
  • com.lmax.disruptor.dsl.Disruptor:DSL入口,简化配置

三、核心实现细节与深度源码解析

3.1 RingBuffer

RingBuffer 是一个数组,容量必须为2的幂。每个元素是一个 Event 对象。

public final class RingBuffer<E> {
    private final Object[] entries;
    private final int bufferSize;
    private final long indexMask;
    private final Sequencer sequencer;

    public long next() { return sequencer.next(); }
    public void publish(long sequence) { sequencer.publish(sequence); }
    public E get(long sequence) { return (E)entries[(int)(sequence & indexMask)]; }
}
  • next() 获取下一个可写序号
  • publish(sequence) 发布事件
  • get(sequence) 获取事件对象

3.2 Sequence 与 Sequencer

Sequence 是一个 long 类型的序号,表示数据的位置。

Sequencer 管理序号分配和发布,分为单生产者(SingleProducerSequencer)和多生产者(MultiProducerSequencer)。

关键实现(以单生产者为例):

public final class SingleProducerSequencer extends AbstractSequencer {
    private volatile long nextValue = Sequence.INITIAL_VALUE;

    public long next() {
        long nextSeq = nextValue + 1;
        // 检查是否有足够空间
        nextValue = nextSeq;
        return nextSeq;
    }

    public void publish(long sequence) {
        cursor.set(sequence);
        // 唤醒消费者
        waitStrategy.signalAllWhenBlocking();
    }
}
  • 多生产者用 CAS 竞争序号,保证无锁安全。

3.3 EventHandler 与 EventProcessor

EventHandler 是消费者的业务回调接口:

public interface EventHandler<T> {
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}

EventProcessor 管理消费者线程,循环读取 RingBuffer:

public void run() {
    while (running) {
        long nextSequence = sequence.get() + 1;
        if (availableSequence >= nextSequence) {
            eventHandler.onEvent(ringBuffer.get(nextSequence), nextSequence, ...);
            sequence.set(nextSequence);
        } else {
            waitStrategy.waitFor(nextSequence, ...);
        }
    }
}

3.4 WaitStrategy

决定消费者如何等待新事件:

  • BusySpinWaitStrategy:自旋等待,低延迟但高CPU
  • BlockingWaitStrategy:阻塞等待,低CPU但延迟略高
  • YieldingWaitStrategy:自旋+让出CPU,适合多消费者

源码片段(BlockingWaitStrategy):

public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {
    lock.lock();
    try {
        while (cursor.get() < sequence) {
            processorNotifyCondition.await();
        }
    } finally {
        lock.unlock();
    }
    return cursor.get();
}

四、Disruptor 实践案例

4.1 定义事件类

public class LongEvent {
    private long value;
    public void set(long value) { this.value = value; }
    public long getValue() { return value; }
}

4.2 事件工厂

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

4.3 事件处理器

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println("Event: " + event.getValue());
    }
}

4.4 构建 Disruptor 并启动

int bufferSize = 1024;

Disruptor<LongEvent> disruptor = new Disruptor<>(
    new LongEventFactory(),
    bufferSize,
    Executors.defaultThreadFactory(),
    ProducerType.SINGLE,
    new BusySpinWaitStrategy()
);

// 注册事件处理器
disruptor.handleEventsWith(new LongEventHandler());

// 启动
disruptor.start();

// 获取 RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

// 发布事件
long sequence = ringBuffer.next();
try {
    LongEvent event = ringBuffer.get(sequence);
    event.set(123L);
} finally {
    ringBuffer.publish(sequence);
}

五、性能与应用场景

  • 单机百万级 TPS,延迟微秒级
  • 适合高频交易、日志收集、实时监控等场景
  • Netty、RocketMQ、Elasticsearch 等都借鉴了 Disruptor 的思想

六、源码阅读建议

  1. RingBuffer:数据结构和序号管理
  2. Sequencer:单/多生产者序号分配
  3. WaitStrategy:不同等待策略源码
  4. EventProcessor:消费者线程循环
  5. Disruptor DSL:整体组装与启动流程

七、总结亮点

  • 无锁/低锁,极致性能
  • 内存屏障、缓存行填充,减少伪共享
  • 灵活的等待策略,适应不同场景
  • 丰富的事件处理链(支持多阶段流水线)

八、 官方基准测试数据(LMAX White Paper)

LMAX 官方论文中对比了 Disruptor、Java BlockingQueue 以及 ArrayBlockingQueue 的性能:

队列类型 单线程吞吐量(TPS) 多线程吞吐量(TPS)
Disruptor 25,000,000+ 45,000,000+
ArrayBlockingQueue 8,000,000 10,000,000
LinkedBlockingQueue 2,000,000 2,000,000

结论:

  • 单线程下 Disruptor 性能约为 ArrayBlockingQueue 的 3 倍,LinkedBlockingQueue 的 12 倍。
  • 多线程下 Disruptor 性能提升更为明显,远超标准 JDK 队列。

九、 总结

  • 吞吐量:单机 2500万~4500万 TPS,远超 JDK 标准队列
  • 延迟:微秒级(1~10μs),极低
  • 场景:高频交易、日志收集、实时监控等

知识传递,互通有无,创作不易,点点关注!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐