单机最快的队列Disruptor解析和使用

科技资讯 投稿 7000 0 评论

单机最快的队列Disruptor解析和使用

前言

Disruptor是什么

Disruptor是外汇和加密货币交易所运营商 LMAX group 建立高性能的金融交易所的结果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。可以对标JDK中的ArrayBlockingQueue。是目前单机且基于内存存储的最高性能的队列实现。见 与ArrayBlockingQueue性能对比。

Disruptor高性能秘诀

使用CAS代替锁

比如多生产者模式中com.lmax.disruptor.MultiProducerSequencer就是用了Java里sun.misc.Unsafe类基于CAS实现的API。

独占缓存行

比如com.lmax.disruptor.RingBuffer中属性前后都用未赋值的long来独占。com.lmax.disruptor.SingleProducerSequencerPad也有相同处理方式。

环形队列

    使用有界队列,减少线程争用
    使用环形数组,避免生产和消费速度差异导致队列头和尾争用

Disruptor在逻辑上将数组的的头尾看成是相连的,即一个环形数组(RingBuffer)。

    Sequence

生产方只维护一个代表生产的最后一个元素的序号。代表生产的最后一个元素的序号。每次向Disruptor发布一个元素都调用Sequenced.next(来获取下个位置的写入权。

多个消费者各自维护自己的消费序列值(Sequence)保存数组中。

见com.lmax.disruptor.RingBuffer.elementAt(long sequence

预分配内存

见com.lmax.disruptor.RingBuffer.fill(EventFactory eventFactory

消费者8种等待策略

策略类名 描述
BlockingWaitStrategy(常用) 使用ReentrantLock,失败则进入等待队列等待唤醒重试。当吞吐量和低延迟不如CPU资源重要时使用。
YieldingWaitStrategy(常用) 尝试100次,全失败后调用Thread.yield(让出CPU。该策略将使用100%的CPU,如果其他线程请求CPU资源,这种策略更容易让出CPU资源。
SleepingWaitStrategy(常用) 尝试200次 。前100次直接重试,后100次每次失败后调用Thread.yield(让出CPU,全失败线程睡眠(默认100纳秒 )。
BusySpinWaitStrategy 线程一直自旋等待,比较耗CPU。最好是将线程绑定到特定的CPU核心上使用。
LiteBlockingWaitStrategy 与BlockingWaitStrategy类似,区别在增加了原子变量signalNeeded,如果两个线程同时分别访问waitFor(和signalAllWhenBlocking(,可以减少ReentrantLock加锁次数。
LiteTimeoutBlockingWaitStrategy 与LiteBlockingWaitStrategy类似,区别在于设置了阻塞时间,超过时间后抛异常。
TimeoutBlockingWaitStrategy 与BlockingWaitStrategy类似,区别在于设置了阻塞时间,超过时间后抛异常。
PhasedBackoffWaitStrategy 根据时间参数和传入的等待策略来决定使用哪种等待策略。当吞吐量和低延迟不如CPU资源重要时,可以使用此策略。

消费者序列

所有消费者的消费序列(Sequence)都放在一个数组中,见com.lmax.disruptor.AbstractSequencer,通过SEQUENCE_UPDATER来更新对应的序列值。

消费太慢队列满了怎么办

生产者线程被阻塞。生产者调用Sequenced.next(争夺写入权的时候需要判断最小的消费序列值进行比较。如果写入的位置还未消费则会进入循环不断获取最小消费序列值进行比较。

Disruptor开发步骤

    创建Event、EventFactory、EventHandler和ExceptionHandler类

另外有必要定义一个消费异常处理器ExceptionHandler,它是和EventHandler绑定的。当EventHandler.onEvent(执行抛出异常时会执行对应的异常回调方法。

    实例化Disruptor

EventFactory是上面定义的Event工厂类;

threadFactory是定义消费者线程创建方式的工厂类;

waitStrategy指明消费者等待生产时的策略。

    设置消费者

也可以指明多个WorkHandler,每个WorkHandler分配一个线程并行消费队列中的Event,一个Event只会被一个WorkHandler处理。

    创建/实例化EventTranslator
    最后用Disruptor.publishEvent( 来发布元素指明EventTranslator和参数

例子程序

    先引入Maven依赖
<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.4</version>
</dependency>
    Event
/**
 * 事件
 *
 * @param <T>发布的数据类型
 */
public class MyEvent<T> {

    private T data;

    public T getData( {
        return data;
    }

    public MyEvent<T> setData(T data {
        this.data = data;
        return this;
    }
}
    EventFactory
import com.lmax.disruptor.EventFactory;

/**
 * 创建事件的工厂
 *
 * @param <T>发布的数据类型
 */
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {

    @Override
    public MyEvent<T> newInstance( {
        return new MyEvent<>(;
    }
}
    EventHandler
import com.lmax.disruptor.EventHandler;

/**
 * 事件消费方法
 *
 * @param <T>发布的数据类型
 */
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {

    @Override
    public void onEvent(MyEvent<T> tMyEvent, long l, boolean b throws Exception {
        System.out.println(Thread.currentThread(.getName( + "MyEventHandler消费:" + tMyEvent.getData(;
    }
}
    ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;

/**
 * 消费者异常处理器
 *
 * @param <T>发布的数据类型
 */
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {

    @Override
    public void handleEventException(Throwable ex, long sequence, MyEvent<T> event {
        System.out.println("handleEventException";
    }

    @Override
    public void handleOnStartException(Throwable ex {
        System.out.println("handleOnStartException";
    }

    @Override
    public void handleOnShutdownException(Throwable ex {
        System.out.println("handleOnShutdownException";
    }
}

单消费者

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 单消费者
 */
public class SingleConsumerSample {

    public static void main(String[] args {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>(;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory(;
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy(;
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy;

        // 指定一个处理器
        MyEventHandler<String> eventHandler = new MyEventHandler<>(;
        disruptor.handleEventsWith(eventHandler;
        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>(;
        disruptor.setDefaultExceptionHandler(exceptionHandler;

        disruptor.start(;

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 这里是一个参数的转换器,另外还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)
        // 和多个(EventTranslatorVararg)参数的转换器可以使用,参数类型可以不一样
        EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
                new EventTranslatorOneArg<MyEvent<String>, String>( {
                    @Override
                    public void translateTo(MyEvent<String> event, long sequence, String arg0 {
                        event.setData(arg0;
                    }
                };

        // 发布
        for (int i = 0; i < 10; i++ {
            disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i;
        }

        disruptor.shutdown(;
    }
}

单消费者Lambda写法

这种只是迎合Java8 Lambda语法特性,代码更简洁。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class LambdaSample {


    public static void main(String[] args {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory(;
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy(;
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy;

        // 指定一个处理器
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch ->
                System.out.println(Thread.currentThread(.getName( + "MyEventHandler消费:" + event.getData(;
        disruptor.handleEventsWith(eventHandler;
        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>(;
        disruptor.setDefaultExceptionHandler(exceptionHandler;

        disruptor.start(;

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 一个参数的转换器
        disruptor.publishEvent((event, sequence, param -> event.setData(param, "One arg ";
        // 两个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB -> event.setData(pA + pB, "Two arg ", 1;
        // 三个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB, pC -> event.setData(pA + pB + pC
                , "Three arg ", 1, false;
        // 多个参数的转换器
        disruptor.getRingBuffer(.publishEvent((event, sequence, params -> {
            List<String> paramList = Arrays.stream(params.map(Object::toString.collect(Collectors.toList(;
            event.setData("Var arg " + String.join(",", paramList;
        }, "param1", "param2", "param3";

        disruptor.shutdown(;
    }
}

多消费者重复消费元素

关键只在于指定多个EventHandler,并且EventHandler还可以分别绑定不同的ExceptionHandler。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 一个元素多个消费者重复消费
 */
public class RepetitionConsumerSample {

    public static void main(String[] args {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>(;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory(;
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy(;
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy;


        // 这里指定了2个消费者,那就会产生2个消费线程,一个事件会被消费2次
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch ->
                System.out.println(Thread.currentThread(.getName( + "MyEventHandler消费:" + event.getData(;
        EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch ->
                System.out.println(Thread.currentThread(.getName( + "MyEventHandler——2消费:" + event.getData(;
        disruptor.handleEventsWith(eventHandler, eventHandler2;
        // 分别指定异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>(;
        disruptor.handleExceptionsFor(eventHandler.with(exceptionHandler;
        disruptor.handleExceptionsFor(eventHandler2.with(exceptionHandler;

        disruptor.start(;

        for (int i = 0; i < 10; i++ {
            disruptor.publishEvent((event, sequence, param -> event.setData(param, "One arg " + i;
        }

        disruptor.shutdown(;
    }
}

多消费者

关键只在于定义WorkHandler,然后实例化多个来消费。

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class MultiConsumerSample {

    public static void main(String[] args {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>(;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory(;
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy(;
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy;

        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>(;
        disruptor.setDefaultExceptionHandler(exceptionHandler;

        // 设置2个消费者,2个线程,一个Event只被一个消费者消费
        WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
                System.out.println(Thread.currentThread(.getName( + "WorkHandler消费:" + tMyEvent.getData(;
        disruptor.handleEventsWithWorkerPool(workHandler, workHandler2;

        disruptor.start(;

        for (int i = 0; i < 10; i++ {
            disruptor.publishEvent((event, sequence, param -> event.setData(param, "One arg " + i;
        }

        disruptor.shutdown(;
    }
}

参考链接

Disruptor 主页

GitHub Disruptor

Maven Repository Disruptor Framework

LMAX 官网

编程笔记 » 单机最快的队列Disruptor解析和使用

赞同 (34) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽