范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Java9特性响应式流(ReactiveStream)

  什么是流
  形象的比喻来说就是如同水一样绵绵不绝的数据形式。而抽象点来说,是有一个生产者(source)产生,由一个或者多个消费者(sink)消费的数据元素(item)序列。那从这个抽象的描述就可以看出,使用流来承担数据交互的模式就是咱们经常说的生产者/消费者模型,而这种模型也可以称之为发布者/订阅者模型(后文将使用这个名字,因为JDK中使用的是这个名字)。
  对于流数据来说,一般有两种的数据流转方式: 拉(pull)数据模式:订阅者向发布者索要数据。 推(push)数据模式:发布者向订阅者推送数据(push)。
  这两种模式都是描述的单次信息传递的方式。如果发布者产生信息的速度和订阅者消费信息的速度一致的话,那这两种方法都将是十分有效的数据流转方式。 流有什么问题
  流的问题在于当两端的速度不匹配的时候(考虑一下各种mq主要处理的问题削峰平谷)。而速度的不匹配自然存在以下两种情况:
  订阅者消费速度快
  这种情况的时候会出现订阅者有处理能力了,但是订阅者无信息可以处理的情况。如果这种时候是同步的调用模式,则订阅者将会阻塞,直到有新的信息可以进行处理。而如果这时候是异步的信息处理模式,则订阅者可以在无消息处理的时候挂起,直接切换到其他的任务处理中(对于多核CPU的多线程来说)。也就是说,对于这种情况,比较理想的是 异步推模式 。
  发布者发布速度快
  当发布者发布速度快的时候,会发生订阅者来不及处理数据的情况。如果是同步的情况下发布者会一直阻塞,而如果是异步模式则对于订阅者来说有两种处理方式(可以类比一下线程池设计)可以处理: 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下) 不损失数据:加入队列缓存数据(订阅者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)
  而还有另一种需要发布者加入的处理方式叫做背压(backpressure)。背压的实现方式是:由订阅者发出信号,让发布者降低信息的发布速度,从而让信息速度之间匹配。背压的优点是同样可以处理信息流速不一致问题。而更有意思的是,这时候信息的处理策略可以由发布者来选择: 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下) 不损失数据:加入队列缓存数据(发布者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)
  没错,这两种情况是和订阅者一致的,不过选择权则由订阅者变成了发布者。
  也就是说,在发布者发布速度快的时候,要么发布者直接同步阻塞,要么可以先根据消息的主要关心方(是发布者还是订阅者)来确定是否使用背压,然后再根据数据的类型判断是否接受数据丢弃(不丢弃可能会导致系统崩溃)。往往我们的发布者可以由上层的mq或者程序的应答机制保护消息的可用性。
  那么结论是什么,我们需要异步非阻塞(订阅者消费快)、以及背压(发布者发布快)。 什么是响应式流
  Reactive Streams 是一项非阻塞背压的异步流处理标准的倡议,当然,如果我这个翻译看的不清楚的话就还是看原文吧(http://www.reactive-streams.org/)。
  Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
  响应式流(Reactive Streams)概念被提出是在2013年,旨在处理上一小节中由于流速问题而产生的几种问题:订阅者订阅者的阻塞、由订阅者(数据下游)来选择是依赖无限队列(数据不丢)或直接丢弃数据。
  而对于一项标准而言,它的目是自然是用更少的协议来描述交互。而响应式流的模型也是十分简单: 订阅者异步的向发布者请求N个元素。 发布者一步的向订阅者发送M(0
  对他们的定义为:
  Publisher(发布者)
  是一个假定上游会产生无限数据的信息发布者。他们会向有发送请求的订阅者推送元素。
  Subscriber(订阅者)
  订阅者会从发布者那里领取令牌,然后根据令牌向发布者发送"获取请求"。同时当发布者部分准备好元素的时候,会通过令牌对订阅者进行调用,进行数据消费。
  Subscription(令牌)
  发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。
  Processor(处理器)
  可以通过处理器连接发布者、订阅者以及其他处理器。Processor本身同时继承了Publisher与Subscriber接口,所以可以对元素进行处理转发。主要用于让数据从T转换为R。同时,由于Processor本身也可以接入Processor,所以Processor可以组成链来对数据进行处理。
  一次完整的调用流程大概可以描述为: 订阅者向发布者发送订阅请求。 发布者根据订阅请求生成令牌发送给订阅者。 订阅者根据令牌向发布者发送请求N个数据。 发送者根据订阅者的请求数量返回M(M<=N)个数据 重复3,4 数据发送完毕后由发布者发送给订阅者结束信号
  而Java API中的接口如下所示,其中所有的方法都是void,因为所有的方法都是异步执行的。 public interface Publisher {     //用于1.中订阅请求     public void subscribe(Subscriber<? super T> s); } public interface Subscriber {     //用于2.中回调发送令牌     public void onSubscribe(Subscription s);     //用于3.用于接受4中发送过来的数据     public void onNext(T t);     //用于3,4,5接收中间异常了之后的调用     public void onError(Throwable t);     //用于6.中结束信号的回调     public void onComplete(); } public interface Subscription {     //用于3.的发送请求N个数据     public void request(long n);     //用于3,4,5订阅者异步的向     public void cancel(); } public interface Processor extends Subscriber, Publisher { }JDK中的响应式流
  Java API中的流程使用方式看起来比较简单,但API背后的具体实现由于是全异步交互以及涉及具体背压处理而很困难。而JDK9中为用户提供了Publisher接口的简单实现,让开发人员可以基于此来扩展出自己的实际需求。
  JDK 9中的响应式流功能提供在java.util.concurrent包下,全响应式流的API接口被封装到 Flow接口中,其中包括需要使用的接口以及静态方法,关于上一小节中接口方法的详细描述也可以参见该接口上的方法描述。其中的静态接口为: Flow.Processor Flow.Publisher Flow.Subscriber Flow.Subscription
  除去上一小节说的4个接口外,Flow中还包含了一个默认方法defaultBufferSize(),用于返回默认的令牌中的缓冲区大小,而默认的值为DEFAULT_BUFFER_SIZE = 256。
  除去Flow外,其中还有一个刚刚说到的Publisher的简单实现类SubmissionPublisher。该接口在实现了publisher之外还实现了AutoCloseable接口,所以可以直接用try块来进行资源的管理。
  尽管JDK 9中没有提供Subscriber的简单实现,但是在SubmissionPublisher中提供了一个consume(Consumer<? super T> consumer)方法,用于让开发人员可以直接消费消息发布者的所有元素。实际上是在内部实现了简单的Subscriber为ConsumerSubscriber,但是并不是public的,所以不能直接使用。 简单的例子
  根据JDK 9中提供的SubmissionPublisher咱们来写一个小例子。     public static void main(String[] args) {         // 用于承接返回值的任务         CompletableFuture task;         // try-with-resource来控制资源         try (SubmissionPublisher publisher = new SubmissionPublisher<>()) {             System.out.println("默认缓冲容量: " + publisher.getMaxBufferCapacity());             // 传入打印方法来处理元素             task = publisher.consume(System.out::println);             // 打印数字,调用发布者进行信息处理             IntStream.range(1, 6)                     .forEach(publisher::submit);         }         if (task != null) {             try {                 // 当所有订阅者处理完毕后调用                 task.get();             } catch (InterruptedException | ExecutionException e) {                 e.printStackTrace();             }         }     }
  在这个例子里面进行了以下几件事。 声明一个CompletableFuture用于捕获后续的处理事件。 开启资源用于进行流消息订阅 设置流的订阅方法(订阅者) 进行发布者的信息发送 阻塞主方法等待处理完毕后结束
  其中pub.getMaxBufferCapacity()会打印默认的缓存空间256。在调用publisher.consume的时候,是奖传入的Consumer在内部封装成一个Subscribr的简单实现类,用于订阅信息的发送,实时上后续数据的订阅者就是在这步创建的。
  当publisher进行调用的时候,调用submit发送数据,publisher有两个方法用于发送数据,一个是submit,一个是offer。两个方法下面实际都是调用的doOffer方法,所以,offer方法提供了置顶延迟时间后丢弃的策略,而submit是offer的简单实现,是一致阻塞不丢弃。 最后
  不得不说响应式流是java中响应式编程的基础,而JDK 9中也提供了Reactive Streams的"简单"实现。之所示简单是打引号的是因为实际上还有点绕的,有兴趣的同学可以追一下SubmissionPublisher的实现,有一些思想的经典实现,比如用整数中的7位来作为状态机。在下一篇中我们再聊一下JDK 9中的数据交互顺序。

资深交易员细数交易的七宗罪文SteveBurns资深交易员没有比贪婪更好的词语了(Greed,forthelackofabetterword,isgood。)。1987年的经典电影华尔街中,戈登盖柯(Gor交易量增长100!它成为2021年最热门交易在过去的一年,澳大利亚在线差价合约交易与全球许多在线交易商一样出现激增,为零售交易者带来了便利性和可操作性。活跃交易者数量同比增长20至710,000,交易量在2021年第一季度增扎心了!为什么大多数交易者成不了百万富翁?日内交易的标签之一就是巨大的收益,这是一份非常诱人的职业。既然如此,除了索罗斯等几个老掉牙的八九十年代的交易员,为何我们现在很少听到百万富翁交易者呢?大多数交易者是将存款投入到交易全职旅居交易员分享4个最好的趋势交易策略,你都用过吗?现年33岁的资深交易专家罗尔夫(RolfSchlotmann)是一名全职交易员,也是一位纯粹的价格行为交易者,与其他交易员不同的是,罗尔夫选择了在全球各地旅居的生活,他的人生目标就券商晨会精华指数震荡整理能源转型和价值股修复奏响主旋律财联社11月2日讯,昨日市场,沪指创业板收跌,中证1000指数涨近1。新能源车产业链持续拉升,板块内约30股涨停或涨幅超10光伏风电储能等板块仍表现活跃,但个股分化明显,隆基股份宁财联社11月2日早报(周二)宏观新闻1中央军委主席习近平日前签署命令,发布军队装备订购规定,自2021年11月1日起施行。规定共8章42条,按照军委管总战区主战军种主建的总原则,规范了军队装备订购工作的管理机11月金股推荐集中度升高,10家券商推荐伊利股份,9家推荐隆基股份股价创历史新高,看好大消费新能源财联社(上海,记者卢丹)讯,截至11月1日晚间,合计26家券商公布了11月金股名单,其中,伊利股份获得10家券商联袂推荐,隆基股份获得9家券商联袂推荐,这一推荐集中度较上月有所提升11月2日投资避雷针华夏幸福新增未能如期偿还债务79。16亿元导读1大商所郑商所夜盘收盘,郑煤跌超42华夏幸福新增未能如期偿还债务79。16亿元,累计939。79亿元3河北即日起暂停跨省团队旅游及机票酒店业务4阳光城2022年1月份前到期美元海外财经媒体焦点美银料原油期货到明年年中将触及每桶120美元财联社11月2日讯,隔夜,海外市场聚焦美国经济公司动态以及大宗商品市场。美国经济方面,美国面临上世纪70年代以来最严重供应链问题,短缺成美联储年度词汇美国10月制造业成长放缓,供应英特尔终于低头或抛弃自家晶圆厂,采用台积电6nm工艺生产显卡一直以来英特尔能够坐拥世界第一大半导体厂商的地位,是依赖芯片设计和芯片制造两方面,芯片的设计水平很高,同时制造工艺也很先进,两手都很硬。但是随着自己的制造工艺不再领先,下一代10n比CPU强太多!真正的吊打英特尔,AMD最良心散热器灯效爆炸!众所知周,AMD作为一家老牌的散热器厂商,推出过众多优秀的散热器,但要说到目前最火的AMD信仰散热器,那就是伴随ZEN架构更新的全新幽灵系列散热,其中最火的就是在2700X3700
姚安娜携手摩卡,开启用户主权的智能时颠覆对汽车的所有认知!摩力降临智能汽车秀登陆上海滩秒变老司机!前所未有的试驾模式在上海激情上演智能驾驶登陆上海滩,WEY摩卡演绎智能汽车人黑科技开箱无人驾驶挑战,WEY摩卡再现智能姚安娜携手摩卡,开启用户主权的智能时颠覆对汽车的所有认知!摩力降临智能汽车秀登陆上海滩秒变老司机!前所未有的试驾模式在上海激情上演智能驾驶登陆上海滩,WEY摩卡演绎智能汽车人黑科技开箱无人驾驶挑战,WEY摩卡再现智能数据出炉,春节后抱团股幻灭的逻辑找到了节后两天的A股行情让广大投资者受伤了,股市红包没领到,反倒亏了不少。过节期间欧美股市都涨得挺好,提前开盘两天的港股也走势喜人,到了A股就变了味了,大A向来不走寻常路。今晚的文章试图日系车跑高速公路真的不如德系车稳么,差异在哪最近看了很多小伙伴说拿日系车跑高速很容易发飘,没有德系车稳定,其实这并不是飘,而是的日系车底盘的设计思路不同而使得驾驶感觉发生了改变!德系车除了发动机更偏向于性能取向,底盘的技术材还在相信手动变速箱省油?其实自动变速箱更省油手动变速箱有着很多优点,每当我们提及手动变速箱的时候总会理所当然的认为它更省油,这个说法如果是放在过去,那么无疑是成立的,但是在当今这个自动变速箱横行的时代,手动挡比自动挡省油的说赛车为何不用自动变速器,自动变速器会对赛车的性能造成影响么?现如今自动变速器迅速普及很多车型已经不再配备手动变速器,B级车或者是更高级别的车型已经不再配备手动变速器,甚至很多A级车型也取消了手动挡版本很多车友认为自动变速器性能不行,因为赛车涡轮增压发动机一定要用95号汽油么,是否可以使用92号汽油?在很多车友的思维中涡轮增压发动机是脆弱的,当然这更多是因为思维惯性所导致的判断方式(毕竟在过去涡轮增压发动机对比自然吸气的确脆弱一些),所以很多车友认为涡轮增压发动机要烧95号汽油马上就是618了,想买电脑的来看看这些性价比配件吧山外青山楼外楼,电脑问题找雨星,大家好我是雨星。由于雨星这边的自身问题,这连续好多天都没给大家更新了,今天雨星看了下马上618电商节了,相信很多粉丝都准备到这个时候来装机,那今天就6000以内给粉丝组装的游戏电脑,大家来看看效果如何?山外青山楼外楼,电脑问题找雨星,大家好我是雨星,有粉丝找雨星配置6000以内的主机,主要玩游戏用而且不需要光污染,雨星给粉丝推荐这么一套,希望能帮助到粉丝和大家。首先是CPU,由于想把主板内存条插满或者想要大内的存伙伴们春天来了,快上车!这是台电的16GDDR42400的马甲内存条,雨星觉得这款内存很不错,最起码有很多买过,而且反应良好,雨星觉得这不比那些时序和颗粒都差不多,但是没散热的普条好多了吗?而且价格只有33000左右的手机如何选择?小伙来告诉你该怎么选山外青山楼外楼,电脑问题找雨星,大家好我是雨星。今日雨星也想换一部手机,预算差不多3000块左右,想换一个拍照好点的,颜值高点,电池大点,音质好点,屏幕素质不错的手机,想来想去就只