1、Disruptor简介
Disruptor它是一个开源的并发框架,并获得2011 Oracle Duke's 程序框架创新奖,该框架研发的初衷是为了解决高并发下列队锁的问题,最早由英国外汇交易公司LMAX(一种新型零售金融交易平台)开发并开源的,能够在无锁的情况下实现队列的高并发操作,这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单,业务逻辑处理器完全是运行在内存中,使用事件源驱动方式,业务逻辑处理器的核心是Disruptor;
Github:https://github.com/LMAX-Exchange/disruptor`
队列的特性
先进先出(FIFO),先进入队列的元素先出队列(可以理解为我们生活中的排队情况,早办完,早结cer)往队列里发布(publish)事件,消费者(Consumer)获得事件通知并消费,如果队列中没有事件时,消费者堵塞,直到生产者发布了新的事件,消费者再继续消费;
Java中的队列
在Java中的java.util.concurrent包下有大量的队列供我们使用:
ArrayBlockingQueue:基于数组结构的队列,通过加锁的方式,来保证多线程情况下数据的安全;
LinkedBlockingQueue:基于链表结构的队列,也通过加锁的方式,来保证多线程情况下数据的安全;
ConcurrentLinkedQueue:基于链表结构的队列,通过compare and swap(简称CAS)的方式,来保证多线程情况下数据的安全,不加锁,主要使用了Java中的sun.misc.Unsafe类来实现;
LinkedTransferQueue:与上面的ConcurrentLinkedQueue一样;
Java中的队列特点
1、使用加锁实现队列的类,虽然是有界的(可以设置队列的大小),但是有锁的存在,性能上有了很大的影响,线程由于锁的竞争被挂起,直到锁的释放,才能恢复。
2、使用CAS实现队列的类,都是无界的,无法保证队列的长度,理论上来说可以是无限扩展,那么如果生产者生产过快,消费者还没来得及消费,最终可能会导致内存溢出,影响系统稳定;
3、Disruptor解决了以上的问题,实现了有界无锁队列操作,主要是使用了环形数组RingBuffer;
2、Disruptor结构
Disruptor是一个开源的框架,可以在无锁的情况下对队列进行高并发操作,那么这个队列的设计就是Disruptor的核心所在;
环形数组RingBuffer
在Disruptor中,采用了RingBuffer来作为队列的数据结构,RingBuffer就是一个环形的数组,既然是数组,我们便可对其设置大小; 在这个ringBuffer中,除了数组之外,还有一个序列号,用来指向数组中的下一个可用元素,供生产者使用或者消费者使用,也就是生产者可以生产的地方,或者消费者可以消费的地方(序列号和数组索引是两个概念); 序列号= 2的63次方-1. (30万年才能用完)
RingBuffer的优点
Disruptor使用数组作为队列的一个好处,就是可以快速定位到所需元素,通常使用取摸运算(序列号%数组大小=所需元素角标),在Disruptor中使用的是位运算,具体实现:UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT))), 效率更高,定位更快,此外,在Disruptor中数组内的元素并不会被删除,而是新数据来覆盖原有数据;
3、Disruptor极速体验及代码
1、添加maven依赖
2、声明Event来包含需要传递的事件(数据)
3、使用EventFactory来实例化Event对象
4、声明消费者事件(数据)处理器
5、生产者发送事件(数据)
6、测试验证
1、Application.class
public class Application {
public static void main(String[] args) {
//创建一个线程工厂提供线程来触发Consumer的事件处理
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//创建工厂
EventFactory<StringEvent> eventEventFactory = new StringEventFactory();
//创建ringBuffer大小,一定要2的n次方
int ringBufferSize = 32;
//创建Disruptor
Disruptor<StringEvent> disruptor = new Disruptor<StringEvent>(eventEventFactory, //事件工厂
ringBufferSize, //环形数组大小
threadFactory, //线程工厂
ProducerType.SINGLE, //单个生产者
new TimeoutBlockingWaitStrategy(10,TimeUnit.MICROSECONDS)); //等待策略
// 1,BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
// 2,BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
// 3,LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.
// 4,LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。
// 5,PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略
// 6,TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常
// 7,YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu
//连接消费端方法
disruptor.handleEventsWith(new StringEventHandler());
//启动
disruptor.start();
//开始生产数据
//拿到ringbuffer
RingBuffer<StringEvent> stringEventRingBuffer = disruptor.getRingBuffer();
StringEventProducer producer = new StringEventProducer(stringEventRingBuffer);
for (int i = 0; i < 10009; i++) {
producer.onData(String.valueOf(i));
}
disruptor.shutdown();
}
}
2、StringEvent.class
public class StringEvent {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
3、StringEventFactory.class
public class StringEventFactory implements EventFactory<StringEvent> {
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
4、StringEventHandler.clss
public class StringEventHandler implements EventHandler<StringEvent> {
@Override
public void onEvent(StringEvent stringEvent, long l, boolean b) throws Exception {
System.out.println("消费者:" + stringEvent.getValue() + "-->sequence=" + l + "-->endOfBatch=" + b);
Thread.sleep(101);
}
}
5、StringEventProducer.class
public class StringEventProducer {
public final RingBuffer<StringEvent> ringBuffer;
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String data) {
//取序号
long seqence = ringBuffer.next();
try {
//获取事件
StringEvent stringEvent = ringBuffer.get(seqence);
//设置值
stringEvent.setValue(data);
} finally {
//发布事件
ringBuffer.publish(seqence);
System.out.println("生产数据:" + data);
}
}
}
4、Disruptor主要实现类
Disruptor
Disruptor的入口,主要封装了环形队列RingBuffer、消费者集合ConsumerRepository的引用,主要提供了获取环形队列、添加消费者、生产者向RingBuffer中添加事件(即生产者生产数据的操作);
RingBuffer
Disruptor中队列具体的实现,底层封装了Object[]数组,在初始化时,会使用Event事件对数组进行填充,填充的大小就是bufferSize设置的值,此外该对象内部还维护了Sequencer(序列生成器)具体的实现;
Sequencer
序列生成器,分别有MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。在Sequencer中,维护了消费者的Sequence(序列对象)和生产者自己的Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略WaitStrategy;
Sequence
序列对象,内部维护了一个long型的value,这个序列指向了RingBuffer中Object[]数组具体的角标,生产者和消费者各自维护自己的Sequence,但都是指向RingBuffer的Object[]数组;
WaitStrategy
等待策略,当没有可消费的事件时,消费者根据特定的策略进行等待,当没有可生产的地方时,生产者根据特定的策略进行等待;
Event
事件对象,就是我们Ringbuffer中存在的数据,在Disruptor中用Event来定义数据,并不存在Event类,它只是一个定义,是一个概念,表示要传递的数据;
EventProcessor
事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用自定义的事件处理器,也就是是否可以进行消费;
EventHandler
事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现EventHandler接口;
Producer
事件生产者,我们定义的发送事件的对象;
5、Disruptor的生产和消费
(1)当Disruptor框架启动: (2)此时,还没有数据进行写入 (3)准备写入数据前的准备,获取可以写入数据的最大序列; (4)写入数据完成,更新生产者序列对象的值; 以上就是单生产者写入数据的过程,要注意的是,无论是生产者还是消费者,序列的初始值都是-1; 当引入消费者后,生产者在获取可写入的序列之前,都会判断消费者所处的序列。
假设一种情况,当在我们的消费者端使用Thread.sleep (很大的值) 的时候,消费者使用被等待,无法进行消费。 那么此时,生产者会一直在对数组中的元素进行生产,当生产到7准备生产序列8的时候,通过计算序列8对应的是index = 0的元素,我们此时会判断覆盖点所对应的角标是否大于消费者的序列大小,如果大于消费者序列,那么生产者不会进行生产,直到消费者消费了此角标下的元素;
源码如下:
public long next(int n){
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
//此处进行判断,如果覆盖点的大小,超过了消费者的序列,
那么会一直while循环进行判断
while (wrapPoint > (minSequence =
Util.getMinimumSequence(gatingSequences, nextValue))) {
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
6、Disruptor常用等待策略
com.lmax.disruptor.WaitStrategy
决定一个消费者如何等待生产者将Event置入Disruptor;
其所有实现都是针对消费者线程的;
主要策略有
com.lmax.disruptor.BlockingWaitStrategy
com.lmax.disruptor.SleepingWaitStrategy
com.lmax.disruptor.YieldingWaitStrategy
com.lmax.disruptor.BlockingWaitStrategy
最低效的策略,但其对CPU的消耗最小,并且在各种部署环境中能提供更加一致的性能表现;
内部维护了一个重入锁ReentrantLock和Condition;
com.lmax.disruptor.SleepingWaitStrategy
性能表现和com.lmax.disruptor.BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
是一种无锁的方式,比如log4j2使用了Disruptor框架;
com.lmax.disruptor.YieldingWaitStrategy
性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略,例如CPU开启超线程的特性;
7、Disruptor为什么这么快
环形数组结构 为了避免垃圾回收,采用数组而非链表,同时,数组对处理器的缓存机制更加友好。
元素位置定位 数组长度2^n,通过位运算,加快定位的速度,下标采取递增的形式,不用担心index溢出的问题,index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
总结:
Disruptor是基于jvm的,是单jvm下的无锁队列技术,不是分布式下的,如果你是在分布式下,你需要考虑分布式相关问题;
生产者 --> 队列 --> 消费者
Disruptor生产者 生产数据 -->RingBuffer -->直接推给消费者
基于事件监听的;(观察者模式)
注意:本文归作者所有,未经作者允许,不得转载