Disruptor-源码解读

科技资讯 投稿 5200 0 评论

Disruptor-源码解读

前言

    锁和CAS
  • 伪共享和缓存行
  • volatile和内存屏障

原理

此节结合demo来看更容易理解:传送门

    Event

    LongEvent

  • 环形数据缓冲区:这是一个首尾相接的环,用于存放 Event,用于生产者往其存入数据和消费者从其拉取数据

  • 序列:用于跟踪进度(生产进度、消费进度)

  • Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现。

  • 序列屏障,消费者之间的依赖关系就靠序列屏障实现

  • 等待策略,消费者等待生产者将发布的策略

  • 事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler。

  • 事件处理程序,也就是消费者

  • 生产者

Ring Buffer

Object[]。Disruptor生产者发布分两步

    步骤一:申请写入 n 个元素,如果可以写入,这返回最大序列号
  • 步骤二:根据序列号去 RingBuffer 中获取 Event,修改并发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(;
// 获取下一个可用位置的下标(步骤1
long sequence = ringBuffer.next(;
try {
    // 返回可用位置的元素
    LongEvent event = ringBuffer.get(sequence;
    // 设置该位置元素的值
    event.set(l;
} finally {
    // 发布
    ringBuffer.publish(sequence;
}

这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现

Sequencer

单生产者

// 一般不会有以下写法,这里为了讲解源码才使用next(2
// 向RingBuffer申请两个元素
long sequence = ringBuffer.next(2;
for (long i =  sequence-1; i <= sequence; i++ {
    try {
        // 返回可用位置的元素
        LongEvent event = ringBuffer.get(i;
        // 设置该位置元素的值
        event.set(1;
    } finally {
        ringBuffer.publish(i;
    }
}

申请相当于占位置,发布需要一个一个按顺序发布

接下来结合代码看,单生产者的 Sequencer 实现为 SingleProducerSequencer,先看看构造方法

abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy
    {
        super(bufferSize, waitStrategy;
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
    SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy
    {
        super(bufferSize, waitStrategy;
    }

    long nextValue = Sequence.INITIAL_VALUE;
    long cachedValue = Sequence.INITIAL_VALUE;
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy
    {
        super(bufferSize, waitStrategy;
    }
}

这是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的类变量只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充缓存行,这能保证 nextValue 和cachedValue 必定在独立的缓存行,我们可以用ClassLayout打印内存布局看看

// 调用路径
// RingBuffer#next(
// SingleProducerSequencer#next(
public long next(int n
{
    if (n < 1
    {
        throw new IllegalArgumentException("n must be > 0";
    }

    long nextValue = this.nextValue;

    //生产者当前序号值+期望获取的序号数量后达到的序号值
    long nextSequence = nextValue + n;
    //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
    long wrapPoint = nextSequence - bufferSize;
    //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
    //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence(即可。
    long cachedGatingSequence = this.cachedValue;

    //(wrapPoint > cachedGatingSequence : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
    //(cachedGatingSequence > nextValue : https://github.com/LMAX-Exchange/disruptor/issues/76
    // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue
    {
        cursor.setVolatile(nextValue;  // StoreLoad fence

        //gatingSequences就是消费者队列末尾的序列,也就是消费者消费到哪里了
        //实际上就是获得处理的队尾,如果队尾是current的话,说明所有的消费者都执行完成任务在等待新的事件了
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue
        {
            // 等待1纳秒
            LockSupport.parkNanos(1L; // TODO: Use waitStrategy to spin?
        }

        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;

    return nextSequence;
}

public void publish(long sequence
{
    // 更新序列号
    cursor.set(sequence;
    // 等待策略的唤醒
    waitStrategy.signalAllWhenBlocking(;
}

要解释的都在注释里了,gatingSequences 是消费者队列末尾的序列,对应着就是下图中的 ApplicationConsumer 的 Sequence

多生产者

如果有A、B两个消费者都来申请 2 个元素

getHighestPublishedSequence 方法的返回值

MultiProducerSequencer的availableBuffer 来维护。

public final class MultiProducerSequencer extends AbstractSequencer
{
    // 缓存的消费者中最小序号值,相当于SingleProducerSequencerFields的cachedValue
    private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE;
    // 标记元素是否可用
    private final int[] availableBuffer;

    public long next(int n
    {
        if (n < 1
        {
            throw new IllegalArgumentException("n must be > 0";
        }

        long current;
        long next;

        do
        {
            current = cursor.get(;
            next = current + n;

            //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
            long wrapPoint = next - bufferSize;
            //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
            //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence(即可。
            long cachedGatingSequence = gatingSequenceCache.get(;

            //(wrapPoint > cachedGatingSequence : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
            //(cachedGatingSequence > nextValue : https://github.com/LMAX-Exchange/disruptor/issues/76
            // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current;

                if (wrapPoint > gatingSequence
                {
                    LockSupport.parkNanos(1; // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence;
            }
            // 使用cas保证只有一个生产者能拿到next
            else if (cursor.compareAndSet(current, next
            {
                break;
            }
        }
        while (true;

        return next;
    }
......
}
MultiProducerSequencerSingleProducerSequencer的 next(方法逻辑大致一样,只是多了CAS的步骤来保证并发的正确性。接着看发布方法
public void publish(final long sequence
{
    // 记录发布情况
    setAvailable(sequence;
    // 等待策略的唤醒
    waitStrategy.signalAllWhenBlocking(;
}

private void setAvailable(final long sequence
{
    // calculateIndex(sequence:获取序号
    // calculateAvailabilityFlag(sequence:RingBuffer的圈数
    setAvailableBufferValue(calculateIndex(sequence, calculateAvailabilityFlag(sequence;
}

private void setAvailableBufferValue(int index, int flag
{
    long bufferAddress = (index * SCALE + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag;
    // 上面相当于 availableBuffer[index] = flag 的高性能版
}

记录发布情况,其实相当于 availableBuffer[sequence] = 圈数,前面说了,availableBuffer是用来标记元素是否可用的,如果消费者的圈数 ≠ availableBuffer中的圈数,则表示元素不可用

public boolean isAvailable(long sequence
{
    int index = calculateIndex(sequence;
    // 计算圈数
    int flag = calculateAvailabilityFlag(sequence;
    long bufferAddress = (index * SCALE + BASE;
    // UNSAFE.getIntVolatile(availableBuffer, bufferAddress:相当于availableBuffer[sequence] 的高性能版
    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress == flag;
}

private int calculateAvailabilityFlag(final long sequence
{
    // 相当于 sequence % bufferSize,但是位操作更快
    return (int (sequence >>> indexShift;
}

isAvailable( 方法判断元素是否可用,此方法的调用堆栈看完消费者就清楚了。

消费者

消费者的依赖关系实现

下面看源码,这是 disruptor 配置消费者的代码。

EventHandler journalConsumer = xxx;
EventHandler replicaionConsumer = xxx;
EventHandler applicationConsumer = xxx;

disruptor.handleEventsWith(journalConsumer, replicaionConsumer
        .then(applicationConsumer;

// 下面两行等同于上面这行
// disruptor.handleEventsWith(journalConsumer, replicaionConsumer;
// disruptor.after(journalConsumer, replicaionConsumer.then(applicationConsumer;

先看ReplicaionConsumer 和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer

/** 代码都在Disruptor类 **/

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers
{
    // 没有依赖的消费者就创建新的Sequence
    return createEventProcessors(new Sequence[0], handlers;
}

/**
 * 创建消费者
 * @param barrierSequences 当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组
 * @param eventHandlers 事件消费逻辑的EventHandler数组
 */
EventHandlerGroup<T> createEventProcessors(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers
{
    checkNotStarted(;

    // 对应此事件处理器组的序列组
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences;

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++
    {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        // 创建消费者,注意这里传入了SequenceBarrier
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler;

        if (exceptionHandler != null
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler;
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier;
        processorSequences[i] = batchEventProcessor.getSequence(;
    }

    // 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加
    // 所谓门控,就是RingBuffer要知道在消费链末尾的那组消费者(也是最慢的)的进度,避免消息未消费就被写入覆盖
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences;

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences;
}

createEventProcessors( 方法主要做了3件事,创建消费者、保存eventHandler和消费者的映射关系、更新 gatingSequences

    EventProcessor 是消费者
  • SequenceBarrier 是消费者屏障,保证了消费者的依赖关系
  • consumerRepository 保存了eventHandler和消费者的映射关系
// 为消费链下一组消费者,更新门控序列
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
// processorSequences是本次要设置的事件处理器组的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences
{
    if (processorSequences.length > 0
    {
        // 将本组序列添加到Sequencer中的gatingSequences中
        ringBuffer.addGatingSequences(processorSequences;
        // 将上组消费者的序列从gatingSequences中移除
        for (final Sequence barrierSequence : barrierSequences
        {
            ringBuffer.removeGatingSequence(barrierSequence;
        }
        // 取消标记上一组消费者为消费链末端
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences;
    }
}

让我们把视线再回到消费者的设置方法

disruptor.handleEventsWith(journalConsumer, replicaionConsumer
        .then(applicationConsumer;

journalConsumer 和 replicaionConsumer 已经设置了,接下来是 applicationConsumer

/** 代码在EventHandlerGroup类 **/

public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers
{
    return handleEventsWith(handlers;
}

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers
{
    return disruptor.createEventProcessors(sequences, handlers;
}

可以看到,设置 applicationConsumer 最终调用的也是 createEventProcessors( 方法,区别就在于 createEventProcessors( 方法的第一个参数,这里的 sequences 就是 journalConsumer 和 replicaionConsumer 这两个消费者的 Sequence

消费者的消费逻辑

EventProcessor#run(方法中,下面以BatchEventProcessor举例

// BatchEventProcessor#run(
// BatchEventProcessor#processEvents(
private void processEvents(
{
    T event = null;
    long nextSequence = sequence.get( + 1L;

    while (true
    {
        try
        {
            // 获取最大可用序列
            final long availableSequence = sequenceBarrier.waitFor(nextSequence;
            ...

            // 执行消费逻辑
            while (nextSequence <= availableSequence
            {
                // dataProvider就是RingBuffer
                event = dataProvider.get(nextSequence;
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence;
                nextSequence++;
            }

            sequence.set(availableSequence;
        }
        catch (
        {
            // 异常处理
        }
    }
}

方法简洁明了,在死循环中通过 sequenceBarrier 获取最大可用序列,然后从 RingBuffer 中获取 Event 并调用 EventHandler 进行消费。重点在 sequenceBarrier.waitFor(nextSequence; 中

public long waitFor(final long sequence
        throws AlertException, InterruptedException, TimeoutException
{
    checkAlert(;
    // 获取可用的序列,这里返回的是Sequencer#next方法设置成功的可用下标,不是Sequencer#publish
    // cursorSequence:生产者的最大可用序列
    // dependentSequence:依赖的消费者的最大可用序列
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this;

    if (availableSequence < sequence
    {
        return availableSequence;
    }
    // 获取最大的已发布成功的序号(对于发布是否成功的校验在此方法中)
    return sequencer.getHighestPublishedSequence(sequence, availableSequence;
}

熟悉的 getHighestPublishedSequence( 方法,忘了就回去看看生产者小节。waitStrategy.waitFor( 对应着图片中的 waitFor( 。

消费者的启动

disruptor.start(;

// Disruptor#start(
public RingBuffer<T> start(
{
    checkOnlyStartedOnce(;
    for (final ConsumerInfo consumerInfo : consumerRepository
    {
        consumerInfo.start(executor;
    }

    return ringBuffer;
}

class EventProcessorInfo<T> implements ConsumerInfo
{
    public void start(final Executor executor
    {
        // eventprocessor就是消费者
        executor.execute(eventprocessor;
    }
}

还记得 consumerRepository吗,没有就往上翻翻设置消费者那里的 disruptor.handleEventsWith( 方法。

disruptor#start( → ConsumerInfo#start( → Executor#execute( → EventProcessor#run(

总结

本文讲了 Disruptor 大体逻辑和源码,当然其高性能的秘诀不止文中描述的那些。还有不同的等待策略,Sequence 中使用Unsafe而不是JDK中的 Atomic 原子类等等。

编程笔记 » Disruptor-源码解读

赞同 (21) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽