Disruptor核心组件ProcessingSequenceBarrier解析
ProcessingSequenceBarrier是Disruptor框架的核心组件,负责协调生产者与消费者以及消费者之间的处理进度。它通过waitFor()方法确保事件处理器(EventProcessor)在获取可用序号前等待依赖条件满足,并提供alert机制实现优雅中断。该组件整合了等待策略(WaitStrategy)、依赖序号(dependentSequence)和生产者游标(cursorS
ProcessingSequenceBarrier
ProcessingSequenceBarrier
是 Disruptor 框架中的一个核心组件,它扮演着协调生产者和消费者之间,以及消费者与消费者之间处理进度的“栅栏”或“屏障”的角色。每个 EventProcessor
(事件处理器,即消费者)都会有一个与之关联的 SequenceBarrier
。
ProcessingSequenceBarrier
实现了 SequenceBarrier
接口。它的主要职责是:
- 等待可用序号 (Sequence):
EventProcessor
在处理下一个事件之前,会调用waitFor(sequence)
方法。该方法会阻塞,直到它所请求的序号对应的事件已经由生产者发布,并且所有它所依赖的前置消费者(如果有的话)也已经处理完毕。 - 获取依赖的游标:提供一个
getCursor()
方法,返回它所依赖的消费者组处理到的最小序号。这个返回值对于下游的消费者至关重要,因为下游消费者需要确保其上游的进度。 - 中断与唤醒:提供
alert
机制,用于在需要时(例如关闭 Disruptor 时)中断正在等待的EventProcessor
,使其能够优雅地退出。
简单来说,ProcessingSequenceBarrier
就是 EventProcessor
的“通行证发放处”。EventProcessor
必须拿到“通行证”(即一个可用的序号)才能继续处理事件。
内部成员变量分析
我们先来看一下它的成员变量,这有助于理解它的内部工作机制。
// ... existing code ...
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
// ... existing code ...
-
waitStrategy
: 等待策略。这是EventProcessor
等待新事件时的具体行为策略。Disruptor 提供了多种策略,如BlockingWaitStrategy
(使用 Lock/Condition,CPU 消耗低,延迟高)、YieldingWaitStrategy
(循环检查并调用Thread.yield()
,中等 CPU 消耗和延迟)、BusySpinWaitStrategy
(忙等待,CPU 消耗最高,延迟最低)等。ProcessingSequenceBarrier
将具体的等待逻辑委托给了这个策略对象。 -
dependentSequence
: 依赖的序号。这是ProcessingSequenceBarrier
最关键的字段之一。它代表了当前EventProcessor
所依赖的“上游”进度。- 如果当前
EventProcessor
没有任何前置依赖(即它直接消费生产者发布的数据),那么dependentSequence
会被设置为cursorSequence
。 - 如果当前
EventProcessor
有前置依赖(例如在一个处理流水线中,它依赖于前一个EventProcessor
),那么dependentSequence
会是一个FixedSequenceGroup
对象,该对象包含了所有它所依赖的EventProcessor
的Sequence
。FixedSequenceGroup
的get()
方法会返回所有这些Sequence
中的最小值。
- 如果当前
-
alerted
: 警报状态。这是一个volatile
的布尔值,用于中断屏障。当alert()
方法被调用时,它被设为true
。waitFor()
方法在开始等待前会检查此状态,如果为true
,则立即抛出AlertException
,使得EventProcessor
可以中断其主循环并退出。 -
cursorSequence
: 生产者的游标序号。这个Sequence
对象代表了生产者(Sequencer
)当前已经发布到的最大序号。任何消费者都不能超过这个序号。 -
sequencer
: 序号生成器。这是 Disruptor 的核心,负责管理RingBuffer
的所有序号。ProcessingSequenceBarrier
持有它的引用,主要是为了在waitFor
的最后阶段调用getHighestPublishedSequence
方法,这在多生产者模式下尤其重要,可以确保获取到的序号范围内的所有事件都已发布。
构造函数分析
构造函数清晰地展示了各个成员变量是如何被初始化的。
// ... existing code ...
ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
// ... existing code ...
这里的逻辑非常关键:
sequencer
,waitStrategy
,cursorSequence
都是从外部传入并直接赋值。dependentSequences
是一个Sequence
数组,代表了当前屏障所依赖的所有上游消费者的Sequence
。if (0 == dependentSequences.length)
: 如果这个数组为空,意味着当前消费者没有上游依赖,它直接依赖于生产者的进度。因此,dependentSequence
就被设置为生产者的游标cursorSequence
。else
: 如果数组不为空,意味着存在上游依赖。这时会创建一个FixedSequenceGroup
对象来包装这些依赖的Sequence
。FixedSequenceGroup
的作用就是实时获取这些Sequence
中的最小值,作为当前消费者可以处理的“水位线”。
这个设计是 Disruptor 能够构建复杂消费依赖关系图(如流水线、菱形、并行等)的基础。
waitFor()
详解
这是 EventProcessor
与屏障交互的核心方法。
// ... existing code ...
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
//MultiProducerSequencer
public long getHighestPublishedSequence(final long lowerBound, final long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
// ... existing code ...
我们来分解一下执行流程:
checkAlert()
: 首先检查alerted
标志。如果为true
,直接抛出AlertException
,消费者线程捕获后即可退出。waitStrategy.waitFor(...)
: 这是实际的等待操作。EventProcessor
传入它想要处理的下一个序号sequence
。WaitStrategy
会一直等待,直到满足以下两个条件:dependentSequence
的值 >=sequence
。这意味着所有它依赖的上游消费者都已经处理到至少sequence-1
的位置了。cursorSequence
的值 >=sequence
。这意味着生产者至少已经发布到sequence
这个位置了。waitStrategy
返回的是dependentSequence
和cursorSequence
中较小的值,但至少是sequence-1
。通常情况下,它会返回生产者游标的值。
if (availableSequence < sequence)
: 在某些等待策略下(如带超时的策略),等待可能会提前返回一个小于请求sequence
的值。在这种情况下,直接返回这个较小的值,让EventProcessor
知道它请求的序号还不可用。sequencer.getHighestPublishedSequence(sequence, availableSequence)
: 这是最后一步,也是一个重要的保证。在多生产者场景下,cursorSequence
的值可能已经跳跃式地前进了(例如,生产者A申请了10-15,生产者B申请了16-20,cursor
可能更新为20),但序号为15的事件可能还未发布。这一步就是为了确保消费者拿到的最大可用序号 (availableSequence
) 范围内的所有事件都已经被生产者真正“发布”(publish
)了,防止消费者读到尚未初始化的数据。
其他方法
getCursor()
: 返回dependentSequence.get()
。这个方法很有意思,它返回的不是生产者的游标,而是依赖链的游标。这对于构建下游消费者的SequenceBarrier
至关重要。下游消费者看到的“游标”是它上游消费者的处理进度。alert()
: 将alerted
设为true
,并调用waitStrategy.signalAllWhenBlocking()
。这个调用会唤醒所有通过BlockingWaitStrategy
等策略阻塞在Condition
上的线程,使它们能够重新检查alerted
状态并快速退出。clearAlert()
: 重置alerted
状态为false
。checkAlert()
: 检查alerted
状态,如果为true
则抛出异常。
总结
ProcessingSequenceBarrier
是 Disruptor 精巧设计的体现。它通过组合 WaitStrategy
、Sequence
和 Sequencer
,以一种高效且灵活的方式解决了消费者之间的同步问题。
- 它将等待的逻辑(做什么) 与 等待的策略(怎么做) 分离。
- 通过
dependentSequence
的设计,优雅地实现了任意复杂的消费者依赖关系。 - 通过
alert
机制,提供了一种可靠、低延迟的线程中断方式,用于实现优雅停机。 - 它是连接生产者、当前消费者和下游消费者的关键枢纽,确保了数据处理的顺序性和正确性。
更多推荐
所有评论(0)