ProcessingSequenceBarrier

ProcessingSequenceBarrier 是 Disruptor 框架中的一个核心组件,它扮演着协调生产者和消费者之间,以及消费者与消费者之间处理进度的“栅栏”或“屏障”的角色。每个 EventProcessor(事件处理器,即消费者)都会有一个与之关联的 SequenceBarrier

ProcessingSequenceBarrier 实现了 SequenceBarrier 接口。它的主要职责是:

  • 等待可用序号 (Sequence)EventProcessor 在处理下一个事件之前,会调用 waitFor(sequence) 方法。该方法会阻塞,直到它所请求的序号对应的事件已经由生产者发布,并且所有它所依赖的前置消费者(如果有的话)也已经处理完毕。
  • 获取依赖的游标:提供一个 getCursor() 方法,返回它所依赖的消费者组处理到的最小序号。这个返回值对于下游的消费者至关重要,因为下游消费者需要确保其上游的进度。
  • 中断与唤醒:提供 alert 机制,用于在需要时(例如关闭 Disruptor 时)中断正在等待的 EventProcessor,使其能够优雅地退出。

简单来说,ProcessingSequenceBarrier 就是 EventProcessor 的“通行证发放处”。EventProcessor 必须拿到“通行证”(即一个可用的序号)才能继续处理事件。

内部成员变量分析

我们先来看一下它的成员变量,这有助于理解它的内部工作机制。


// ... existing code ...
final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    private final Sequence cursorSequence;
    private final Sequencer sequencer;
// ... existing code ...
  • waitStrategy等待策略。这是 EventProcessor 等待新事件时的具体行为策略。Disruptor 提供了多种策略,如 BlockingWaitStrategy(使用 Lock/Condition,CPU 消耗低,延迟高)、YieldingWaitStrategy(循环检查并调用 Thread.yield(),中等 CPU 消耗和延迟)、BusySpinWaitStrategy(忙等待,CPU 消耗最高,延迟最低)等。ProcessingSequenceBarrier 将具体的等待逻辑委托给了这个策略对象。

  • dependentSequence依赖的序号。这是 ProcessingSequenceBarrier 最关键的字段之一。它代表了当前 EventProcessor 所依赖的“上游”进度。

    • 如果当前 EventProcessor 没有任何前置依赖(即它直接消费生产者发布的数据),那么 dependentSequence 会被设置为 cursorSequence
    • 如果当前 EventProcessor 前置依赖(例如在一个处理流水线中,它依赖于前一个 EventProcessor),那么 dependentSequence 会是一个 FixedSequenceGroup 对象,该对象包含了所有它所依赖的 EventProcessor 的 SequenceFixedSequenceGroup 的 get() 方法会返回所有这些 Sequence 中的最小值。
  • alerted警报状态。这是一个 volatile 的布尔值,用于中断屏障。当 alert() 方法被调用时,它被设为 truewaitFor() 方法在开始等待前会检查此状态,如果为 true,则立即抛出 AlertException,使得 EventProcessor 可以中断其主循环并退出。

  • cursorSequence生产者的游标序号。这个 Sequence 对象代表了生产者(Sequencer)当前已经发布到的最大序号。任何消费者都不能超过这个序号。

  • sequencer序号生成器。这是 Disruptor 的核心,负责管理 RingBuffer 的所有序号。ProcessingSequenceBarrier 持有它的引用,主要是为了在 waitFor 的最后阶段调用 getHighestPublishedSequence 方法,这在多生产者模式下尤其重要,可以确保获取到的序号范围内的所有事件都已发布。

构造函数分析

构造函数清晰地展示了各个成员变量是如何被初始化的。

// ... existing code ...
    ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }
// ... existing code ...

这里的逻辑非常关键:

  • sequencerwaitStrategycursorSequence 都是从外部传入并直接赋值。
  • dependentSequences 是一个 Sequence 数组,代表了当前屏障所依赖的所有上游消费者的 Sequence
  • if (0 == dependentSequences.length): 如果这个数组为空,意味着当前消费者没有上游依赖,它直接依赖于生产者的进度。因此,dependentSequence 就被设置为生产者的游标 cursorSequence
  • else: 如果数组不为空,意味着存在上游依赖。这时会创建一个 FixedSequenceGroup 对象来包装这些依赖的 SequenceFixedSequenceGroup 的作用就是实时获取这些 Sequence 中的最小值,作为当前消费者可以处理的“水位线”。

这个设计是 Disruptor 能够构建复杂消费依赖关系图(如流水线、菱形、并行等)的基础。

 waitFor() 详解

这是 EventProcessor 与屏障交互的核心方法。

// ... existing code ...
    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }


//MultiProducerSequencer
    public long getHighestPublishedSequence(final long lowerBound, final long availableSequence)
    {
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }

        return availableSequence;
    }
// ... existing code ...

我们来分解一下执行流程:

  1. checkAlert(): 首先检查 alerted 标志。如果为 true,直接抛出 AlertException,消费者线程捕获后即可退出。
  2. waitStrategy.waitFor(...): 这是实际的等待操作。EventProcessor 传入它想要处理的下一个序号 sequenceWaitStrategy 会一直等待,直到满足以下两个条件:
    • dependentSequence 的值 >= sequence。这意味着所有它依赖的上游消费者都已经处理到至少 sequence-1 的位置了。
    • cursorSequence 的值 >= sequence。这意味着生产者至少已经发布到 sequence 这个位置了。 waitStrategy 返回的是 dependentSequence 和 cursorSequence 中较小的值,但至少是 sequence-1。通常情况下,它会返回生产者游标的值。
  3. if (availableSequence < sequence): 在某些等待策略下(如带超时的策略),等待可能会提前返回一个小于请求 sequence 的值。在这种情况下,直接返回这个较小的值,让 EventProcessor 知道它请求的序号还不可用。
  4. sequencer.getHighestPublishedSequence(sequence, availableSequence): 这是最后一步,也是一个重要的保证。在多生产者场景下,cursorSequence 的值可能已经跳跃式地前进了(例如,生产者A申请了10-15,生产者B申请了16-20,cursor 可能更新为20),但序号为15的事件可能还未发布。这一步就是为了确保消费者拿到的最大可用序号 (availableSequence) 范围内的所有事件都已经被生产者真正“发布”(publish)了,防止消费者读到尚未初始化的数据。

其他方法

  • getCursor(): 返回 dependentSequence.get()。这个方法很有意思,它返回的不是生产者的游标,而是依赖链的游标。这对于构建下游消费者的 SequenceBarrier 至关重要。下游消费者看到的“游标”是它上游消费者的处理进度。
  • alert(): 将 alerted 设为 true,并调用 waitStrategy.signalAllWhenBlocking()。这个调用会唤醒所有通过 BlockingWaitStrategy 等策略阻塞在 Condition 上的线程,使它们能够重新检查 alerted 状态并快速退出。
  • clearAlert(): 重置 alerted 状态为 false
  • checkAlert(): 检查 alerted 状态,如果为 true 则抛出异常。

总结

ProcessingSequenceBarrier 是 Disruptor 精巧设计的体现。它通过组合 WaitStrategySequence 和 Sequencer,以一种高效且灵活的方式解决了消费者之间的同步问题。

  • 它将等待的逻辑(做什么) 与 等待的策略(怎么做) 分离。
  • 通过 dependentSequence 的设计,优雅地实现了任意复杂的消费者依赖关系。
  • 通过 alert 机制,提供了一种可靠、低延迟的线程中断方式,用于实现优雅停机。
  • 它是连接生产者、当前消费者和下游消费者的关键枢纽,确保了数据处理的顺序性和正确性。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐