构建高性能内存队列Disruptoryyds
Java中有哪些队列 ArrayBlockingQueue 使用ReentrantLock LinkedBlockingQueue 使用ReentrantLock ConcurrentLinkedQueue 使用CAS 等等
我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。 Disruptor简单使用
github地址:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
先简单介绍下: Disruptor它是一个开源的并发框架,并获得2011 Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单~ 日志框架Log4j2 异步模式采用了Disruptor来处理 局限呢,他就是个内存队列,也就是说无法支撑分布式场景。
简单使用
数据传输对象 @Data public class EventData { private Long value; }
消费者 public class EventConsumer implements WorkHandler { /** * 消费回调 * @param eventData * @throws Exception */ @Override public void onEvent(EventData eventData) throws Exception { Thread.sleep(5000); System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue()); } }
生产者 public class EventProducer { private final RingBuffer ringBuffer; public EventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(Long v){ // cas展位 long next = ringBuffer.next(); try { EventData eventData = ringBuffer.get(next); eventData.setValue(v); } finally { // 通知等待的消费者 System.out.println("EventProducer send success, sequence:"+next); ringBuffer.publish(next); } } }
测试类 public class DisruptorTest { public static void main(String[] args) { // 2的n次方 int bufferSize = 8; Disruptor disruptor = new Disruptor( () -> new EventData(), // 事件工厂 bufferSize, // 环形数组大小 Executors.defaultThreadFactory(), // 线程池工厂 ProducerType.MULTI, // 支持多事件发布者 new BlockingWaitStrategy()); // 等待策略 // 设置消费者 disruptor.handleEventsWithWorkerPool( new EventConsumer(), new EventConsumer(), new EventConsumer(), new EventConsumer()); disruptor.start(); RingBuffer ringBuffer = disruptor.getRingBuffer(); EventProducer eventProducer = new EventProducer(ringBuffer); long i = 0; for(;;){ i++; eventProducer.sendData(i); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } }核心组件
基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢?
RingBuffer: 环形数组,底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作
Sequencer: 核心管家 定义生产同步的实现: SingleProducerSequencer 单生产、MultiProducerSequencer 多生产 当前写的进度Sequence cursor 所有消费者进度的数组 Sequence[] gatingSequences MultiProducerSequencer 可用区availableBuffer 【利用空间换取查询效率】
Sequence: 本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger ; 还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。
workProcessor: 处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler
EventHandler: 负责业务逻辑的handler,自己实现。
WaitStrategy: 消费者 如何等待 事件的策略,定义了如下策略 leepingWaitStrategy :自旋 + yield + sleep BlockingWaitStrategy :加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的 YieldingWaitStrategy :自旋 + yield + 自旋 BusySpinWaitStrategy :自旋,减少线程之前切换 PhasedBackoffWaitStrategy :自旋 + yield + 自定义策略 带着问题来解析代码?
1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】
每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。
接下来我们看下多生产类MultiProducerSequencer 中next方法【获取生产序号】 // 消费者上一次消费的最小序号 // 后续第二点会讲到 private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 当前进度的序号 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 所有消费者的序号 //后续第二点会讲到 protected volatile Sequence[] gatingSequences = new Sequence[0]; public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { // 当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值 current = cursor.get(); // 要申请的序号空间:最大序列号 next = current + n; long wrapPoint = next - bufferSize; // 消费者最小序列号 long cachedGatingSequence = gatingSequenceCache.get(); // 大于一圈 || 最小消费序列号>当前进度 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 说明大于1圈,并没有多余空间可以申请 if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } // 更新最小值到Sequence的value中 gatingSequenceCache.set(gatingSequence); } // CAS成功后更新当前Sequence的value else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】
从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号-buffersize 要小于/等于 最小消费的序列号】的时候, 才能申请到当前写的序号
public final EventHandlerGroup handleEventsWithWorkerPool(final WorkHandler... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } EventHandlerGroup createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); final Sequence[] workerSequences = workerPool.getWorkerSequences(); updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new EventHandlerGroup<>(this, consumerRepository, workerSequences); } private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { // 消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequences ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } }
3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】
这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer 使用了int 数组 【availableBuffer 】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer 中当前序列号置为1标识可以读取。
如此消费者可以读取的的最大序号就是我们availableBuffer 中第一个不可用序号-1。
初始化availableBuffer 流程 public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); // 初始化可用数组 availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); } // 初始化默认availableBuffer为-1 private void initialiseAvailableBuffer() { for (int i = availableBuffer.length - 1; i != 0; i--) { setAvailableBufferValue(i, -1); } setAvailableBufferValue(0, -1); } // 生产者成功生产事件将可用区数组置为1 public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
消费者消费流程 WorkProcessor类中消费run方法 public void run() { boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // 先通过cas获取消费事件的占有权 if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } // 数据就绪,可以消费 if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); // 触发回调函数 workHandler.onEvent(event); processedSequence = true; } else { // 获取可以被读取的下标 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } // ....省略 } notifyShutdown(); running.set(false); } public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); // 这个值获取的current write 下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 通过availableBuffer筛选出第一个不可用序号 -1 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } public long getHighestPublishedSequence(long lowerBound, long availableSequence) { // 从current read下标开始, 循环至 current write,如果碰到availableBuffer 为-1 直接返回 for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; }解决伪共享问题什么是伪共享问题呢?
为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。
伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cache line中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cache line中,每次却要去内存中拉取。
Disruptor是如何解决的呢?
在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存
回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?缓存行填充,避免缓存频繁失效。【java8中也引入 @sun.misc.Contended 注解来避免伪共享】 无锁竞争:通过CAS 【二阶段提交】 环形数组:数据都是覆盖,避免GC 底层更多的使用位运算来提升效率
36岁女排国手远嫁美国!住豪宅带大型游泳池,52岁白人老公宠溺中国女子团体项目发展良好,女排女足女篮都先后在国际赛场为祖国争光添彩,其中成就最高的当属女排,先后出现了3个时代,郎平所在的5连冠时代,2000年初,陈忠和带领的黄金一代再夺2次世
女人入秋后别乱买衣服,有这2裙2裤2外套就足够,特时髦百搭最近全国各地真的是光速入秋,天气一下冷了,但女人们变美的心是不会变化的,在这个天气凉爽的季节,大家也开始准备各式的衣服。但衣服种类太多,裙子裤子风衣西装大家挑花了眼,不知道选哪一个
美知名经济学家美联储犯下两大错误知名经济学家批美联储犯两大错误美国知名经济学家穆罕默德埃里安说,他认为美国联邦储备委员会犯下两大错误,可能引发美国经济严重衰退。埃里安现年64岁,拥有美国和埃及双重国籍。他曾在美国
瑞达利欧原则解读之三第二部分生活原则拥抱现实,应对现实1。1做一个超级现实的人创造伟大事物的人不是空想者,而是彻底地扎根于现实。做一个超级现实的人将帮助你明智地选择自己的梦想并实现它。梦想现实决心成功
文案生活不是为了赶路,而是为了感受路1。hr能开导别人是因为我们身在局外,开导不了自己,只是因我们困在局中2。hr总有一阵风会吹过我再吹过你,总有一个瞬间我们之间的距离是零3。hr都是和芝麻一样的小事,可满地的芝麻足
最好的关系,是彼此在乎,互相倾心,你不怕我打扰,我不怕你麻烦最好的关系,是彼此在乎,互相倾心,你不怕我打扰,我不怕你麻烦很喜欢这样一段话,朋友之间就算不在同一个城市,也没有时间聊天,甚至很久没有联系没有见面,但他在自己心里的位置就是谁也无法
多幸运我有个我们多幸运我有个我们文翊寒穿上新的日子我们随风景步入秋凉一双双水平的黑眼睛点染山水,波光蔓延秋水映星辰,人世如花露云起云飞,卷起千堆雪光伴随我们,叠起时间听到每一个日子的心迹聆听雨滴,
南方杂志社论向着伟大复兴阔步前进文南方杂志编辑部近代以来,中国人民一直有一个梦想,那就是彻底摆脱黑暗屈辱的历史,建设富强繁荣的国家,实现中华民族的伟大复兴。复兴,这是一个大国的希冀热望,这是一个大党的历史使命。它
28岁齐思钧,在芒果台6年披荆斩棘,有望成为何炅的接班人热热闹闹的芒果台综艺披荆斩棘进行到现在,第四次公演正式结束,刘恺威蔡珩被淘汰暂别舞台,节目中只剩下24位哥哥了。此外,公演之后的个人喜爱度榜单也有了更新,苏有朋潘玮柏吴建豪排名前三
考虑过大众,也考虑过丰田,最终被国产车所吸引对于我们普通消费者来说,买车并不是一件小事,省吃俭用攒下来的钱,该如何选择一辆适合自己的新车呢?对我们普通工薪阶层来说,买车并不是一件小事!买一辆车可能是我们几年省吃俭用,攒下来的
遭讽好莱坞贫户D咖,哈里梅根还比这些A咖富有英国哈利王子与妻子梅根坚持和皇室各唱各调,惹怒多数民众和皇室记者。最近英国女记者蒂娜布朗,嘲讽他们比起真正的好莱坞大牌,只能算D咖,还称他们的身家仅2200万美元(约合人民币1。6
人生四乐在生活中最让人舒服放松的状态莫过于四个阶段独特的气质了,一是儿童的童真无邪,让我开怀逗趣,二是少年的无所畏惧,敢于探索的精神,让人看到不屈的动力,三是青年不妥协不低头不抛弃不放弃的
董卓是底层草根,是如何逆袭到人生巅峰,却骤然失败身首异处?董卓在三国历史上不是一个有争议的人物。对于董卓的评价都是一边倒,贴在他身上的标签有权臣残暴乱臣贼子骄奢淫逸等等,都是一些贬义的词语。看到这些标签,我们不禁要问,既然董卓这么坏,也不
人生美好因失去这些而遗憾当我们呱呱坠地的那一刻起,父母便开始憧憬着我们美好的未来,希望我们能够一生一世,一帆风顺。可是,谁的一生能够这样,多多少少都会经历挫折和磨难。大到一个国家领土的沦陷,小到自己的人生
人生就是一个成长过程,习惯决定命运1早睡早起,宁吃早餐,不吃夜宵。2定期运动,人生最后拼的是健康。3不停地学习,建立起自己的读书计划,脑袋里有知识才能让你更清醒。4每天冥想一会儿,让自己脑袋放空,可以提高注意力。5
外交部维护好中英关系符合两国人民的共同利益中国青年报客户端北京10月25日电(中青报中青网见习记者袁洁)当地时间10月24日,英国前财政大臣里希苏纳克成为执政党保守党新党首。在今天下午举行的中国外交部例行记者会上,外交部发
吃苦,是优质人生的基础只有对饥肠辘辘深有体会的人,才能明白粒粒皆辛苦的含义只有面对过冷酷无情的现实,才会懂得人间冷暖只有让自己置之死地,才能体会生命的不易只有拼搏奋斗,才能改变自己的命运。俗话说吃得苦中
为何在宇宙残影空间中生存的冥王星被踢出九大行星?为何冥王星一度被称为病毒的天堂?它又是如何被天文学家们踢出九大行星的呢?自从1930年克莱德汤博发现冥王星后,它就被视为第九大行星,当时天文学家们认为它比地球还要大许多。然而仅仅过
Redis常见延迟问题排查手册!附优化建议Redis作为内存数据库,拥有非常高的性能,单个实例的QPS能够达到10W左右。但我们在使用Redis时,经常时不时会出现访问延迟很大的情况,如果你不知道Redis的内部实现原理,
AI时代什么教育模式合适家长选择?关于AI时代的教育焦虑背景,已经写了两篇文章来说明,那么家长们焦虑的根源有哪些呢?家庭教育面对的不确定性整个社会的导向问题教育焦虑是社会焦虑在家长孩子身上折射的结果那么当今社会下,
MyBatisSpringMyBatis与Spring整合前言mybatisspring的官网地址httpsgithub。commybatisspringdemo项目地址httpsgitee。comshuashuaworldshuashu
对了一眼配置,Redmi真的很强,同期最出色配置对比,是用户最关心的话题,为了选出更高的配置拍照更好的手机,用户总是在比对。RedmiNote12就是这样的存在,这机子的性能肯定不弱的,Note系列一直都是小金刚,而这次比较