Disruptor

1、Disruptor简介

Disruptor它是一个开源的并发框架,并获得2011 Oracle Duke's 程序框架创新奖,该框架研发的初衷是为了解决高并发下列队锁的问题,最早由英国外汇交易公司LMAX(一种新型零售金融交易平台)开发并开源的,能够在无锁的情况下实现队列的高并发操作,这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单,业务逻辑处理器完全是运行在内存中,使用事件源驱动方式,业务逻辑处理器的核心是Disruptor;

Github:https://github.com/LMAX-Exchange/disruptor`

队列的特性

先进先出(FIFO),先进入队列的元素先出队列(可以理解为我们生活中的排队情况,早办完,早结cer)往队列里发布(publish)事件,消费者(Consumer)获得事件通知并消费,如果队列中没有事件时,消费者堵塞,直到生产者发布了新的事件,消费者再继续消费;

Java中的队列
在Java中的java.util.concurrent包下有大量的队列供我们使用:
ArrayBlockingQueue:基于数组结构的队列,通过加锁的方式,来保证多线程情况下数据的安全;
LinkedBlockingQueue:基于链表结构的队列,也通过加锁的方式,来保证多线程情况下数据的安全;
ConcurrentLinkedQueue:基于链表结构的队列,通过compare and swap(简称CAS)的方式,来保证多线程情况下数据的安全,不加锁,主要使用了Java中的sun.misc.Unsafe类来实现;
LinkedTransferQueue:与上面的ConcurrentLinkedQueue一样;

Java中的队列特点

1、使用加锁实现队列的类,虽然是有界的(可以设置队列的大小),但是有锁的存在,性能上有了很大的影响,线程由于锁的竞争被挂起,直到锁的释放,才能恢复。
2、使用CAS实现队列的类,都是无界的,无法保证队列的长度,理论上来说可以是无限扩展,那么如果生产者生产过快,消费者还没来得及消费,最终可能会导致内存溢出,影响系统稳定;
3、Disruptor解决了以上的问题,实现了有界无锁队列操作,主要是使用了环形数组RingBuffer;

2、Disruptor结构

Disruptor是一个开源的框架,可以在无锁的情况下对队列进行高并发操作,那么这个队列的设计就是Disruptor的核心所在;

环形数组RingBuffer

image.png 在Disruptor中,采用了RingBuffer来作为队列的数据结构,RingBuffer就是一个环形的数组,既然是数组,我们便可对其设置大小; 在这个ringBuffer中,除了数组之外,还有一个序列号,用来指向数组中的下一个可用元素,供生产者使用或者消费者使用,也就是生产者可以生产的地方,或者消费者可以消费的地方(序列号和数组索引是两个概念); 序列号= 2的63次方-1. (30万年才能用完)

RingBuffer的优点

Disruptor使用数组作为队列的一个好处,就是可以快速定位到所需元素,通常使用取摸运算(序列号%数组大小=所需元素角标),在Disruptor中使用的是位运算,具体实现:UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT))), 效率更高,定位更快,此外,在Disruptor中数组内的元素并不会被删除,而是新数据来覆盖原有数据;

3、Disruptor极速体验及代码

1、添加maven依赖
2、声明Event来包含需要传递的事件(数据)
3、使用EventFactory来实例化Event对象
4、声明消费者事件(数据)处理器
5、生产者发送事件(数据)
6、测试验证

1、Application.class

public class Application {
    public static void main(String[] args) {
        //创建一个线程工厂提供线程来触发Consumer的事件处理
        ThreadFactory threadFactory = Executors.defaultThreadFactory();

        //创建工厂
        EventFactory<StringEvent> eventEventFactory = new StringEventFactory();
        //创建ringBuffer大小,一定要2的n次方
        int ringBufferSize = 32;

        //创建Disruptor
        Disruptor<StringEvent> disruptor = new Disruptor<StringEvent>(eventEventFactory,  //事件工厂
                ringBufferSize, //环形数组大小
                threadFactory,  //线程工厂
                ProducerType.SINGLE, //单个生产者
                new TimeoutBlockingWaitStrategy(10,TimeUnit.MICROSECONDS)); //等待策略


//        1,BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
//        2,BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
//        3,LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.
//        4,LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。
//        5,PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略
//        6,TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常
//        7,YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

        //连接消费端方法
        disruptor.handleEventsWith(new StringEventHandler());

        //启动
        disruptor.start();

        //开始生产数据
        //拿到ringbuffer
        RingBuffer<StringEvent> stringEventRingBuffer = disruptor.getRingBuffer();

        StringEventProducer producer = new StringEventProducer(stringEventRingBuffer);
        for (int i = 0; i < 10009; i++) {
            producer.onData(String.valueOf(i));
        }

        disruptor.shutdown();
    }
}

2、StringEvent.class

public class StringEvent {
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

3、StringEventFactory.class

public class StringEventFactory implements EventFactory<StringEvent> {
    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }
}

4、StringEventHandler.clss

public class StringEventHandler implements EventHandler<StringEvent> {
    @Override
    public void onEvent(StringEvent stringEvent, long l, boolean b) throws Exception {
        System.out.println("消费者:" + stringEvent.getValue() + "-->sequence=" + l + "-->endOfBatch=" + b);
        Thread.sleep(101);
    }
}

5、StringEventProducer.class

public class StringEventProducer {

    public final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String data) {
        //取序号
        long seqence = ringBuffer.next();
        try {
            //获取事件
            StringEvent stringEvent = ringBuffer.get(seqence);
            //设置值
            stringEvent.setValue(data);
        } finally {
            //发布事件
            ringBuffer.publish(seqence);
            System.out.println("生产数据:" + data);
        }
    }

}

4、Disruptor主要实现类

Disruptor
Disruptor的入口,主要封装了环形队列RingBuffer、消费者集合ConsumerRepository的引用,主要提供了获取环形队列、添加消费者、生产者向RingBuffer中添加事件(即生产者生产数据的操作);

RingBuffer
Disruptor中队列具体的实现,底层封装了Object[]数组,在初始化时,会使用Event事件对数组进行填充,填充的大小就是bufferSize设置的值,此外该对象内部还维护了Sequencer(序列生成器)具体的实现;

Sequencer
序列生成器,分别有MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。在Sequencer中,维护了消费者的Sequence(序列对象)和生产者自己的Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略WaitStrategy;

Sequence
序列对象,内部维护了一个long型的value,这个序列指向了RingBuffer中Object[]数组具体的角标,生产者和消费者各自维护自己的Sequence,但都是指向RingBuffer的Object[]数组;

WaitStrategy
等待策略,当没有可消费的事件时,消费者根据特定的策略进行等待,当没有可生产的地方时,生产者根据特定的策略进行等待;

Event
事件对象,就是我们Ringbuffer中存在的数据,在Disruptor中用Event来定义数据,并不存在Event类,它只是一个定义,是一个概念,表示要传递的数据;

EventProcessor
事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用自定义的事件处理器,也就是是否可以进行消费;

EventHandler
事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现EventHandler接口;

Producer
事件生产者,我们定义的发送事件的对象;

5、Disruptor的生产和消费

(1)当Disruptor框架启动: image (1).png (2)此时,还没有数据进行写入 image (2).png (3)准备写入数据前的准备,获取可以写入数据的最大序列; image (3).png (4)写入数据完成,更新生产者序列对象的值; image (4).png 以上就是单生产者写入数据的过程,要注意的是,无论是生产者还是消费者,序列的初始值都是-1; 当引入消费者后,生产者在获取可写入的序列之前,都会判断消费者所处的序列。

假设一种情况,当在我们的消费者端使用Thread.sleep (很大的值) 的时候,消费者使用被等待,无法进行消费。 那么此时,生产者会一直在对数组中的元素进行生产,当生产到7准备生产序列8的时候,通过计算序列8对应的是index = 0的元素,我们此时会判断覆盖点所对应的角标是否大于消费者的序列大小,如果大于消费者序列,那么生产者不会进行生产,直到消费者消费了此角标下的元素;

源码如下:

public long next(int n){
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }
    long nextValue = this.nextValue;
    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){
        cursor.setVolatile(nextValue);  // StoreLoad fence
        long minSequence;
        //此处进行判断,如果覆盖点的大小,超过了消费者的序列,
那么会一直while循环进行判断
        while (wrapPoint > (minSequence = 
Util.getMinimumSequence(gatingSequences, nextValue))) {
            waitStrategy.signalAllWhenBlocking();
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        this.cachedValue = minSequence;
    }
    this.nextValue = nextSequence;
    return nextSequence;
}

6、Disruptor常用等待策略

com.lmax.disruptor.WaitStrategy
决定一个消费者如何等待生产者将Event置入Disruptor;
其所有实现都是针对消费者线程的;
主要策略有
com.lmax.disruptor.BlockingWaitStrategy
com.lmax.disruptor.SleepingWaitStrategy
com.lmax.disruptor.YieldingWaitStrategy
 
com.lmax.disruptor.BlockingWaitStrategy
最低效的策略,但其对CPU的消耗最小,并且在各种部署环境中能提供更加一致的性能表现;
内部维护了一个重入锁ReentrantLock和Condition;
 
com.lmax.disruptor.SleepingWaitStrategy
性能表现和com.lmax.disruptor.BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
是一种无锁的方式,比如log4j2使用了Disruptor框架;
 
com.lmax.disruptor.YieldingWaitStrategy
性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略,例如CPU开启超线程的特性;

7、Disruptor为什么这么快

环形数组结构 为了避免垃圾回收,采用数组而非链表,同时,数组对处理器的缓存机制更加友好。

元素位置定位 数组长度2^n,通过位运算,加快定位的速度,下标采取递增的形式,不用担心index溢出的问题,index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

无锁设计 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

总结:

Disruptor是基于jvm的,是单jvm下的无锁队列技术,不是分布式下的,如果你是在分布式下,你需要考虑分布式相关问题;

生产者 -->  队列 -->  消费者
Disruptor生产者 生产数据 -->RingBuffer -->直接推给消费者
基于事件监听的;(观察者模式)

已有 0 条评论

    欢迎您,新朋友,感谢参与互动!