范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

JDK9响应式流使用详解

  上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理流程,然后再以几个调用者的例子来帮助大家理解。 JDK9中的实现
  再放上一下上文中的响应式流的交互流程: 订阅者向发布者发送订阅请求。 发布者根据订阅请求生成令牌发送给订阅者。 订阅者根据令牌向发布者发送请求N个数据。 发送者根据订阅者的请求数量返回M(M<=N)个数据 重复3,4 数据发送完毕后由发布者发送给订阅者结束信号
  该流程的角度是以接口调用的交互来说的,而考虑实际的coding工作中,我们的调用流程其实为: 创建发布者 创建订阅者 订阅令牌交互 发送信息
  接下来我们按照这个流程来梳理一下代码细节。 创建发布者
  对于实现响应流的最开始的步骤,便是创建一个发布者。之前提到在JDK9中提供了一个发布者的简单实现SubmissionPublisher。SubmissionPublisher继承自Flow.Publisher,他有三种构造函数:     public SubmissionPublisher() {         this(ASYNC_POOL, Flow.defaultBufferSize(), null);     }          public SubmissionPublisher(Executor executor, int maxBufferCapacity) {         this(executor, maxBufferCapacity, null);     }      public SubmissionPublisher(Executor executor, int maxBufferCapacity,                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)
  SubmissionPublisher将使用Executor作为"线程池"向订阅者发送信息。如果需要需要设置线程池的话可以自己传入,否则的话再无参的构造函数中将默认使用ForkJoinPool类的commonPool()方法获取,即无餐构造方法中的ASYNC_POOL静态变量。
  SubmissionPublisher会为 每一个订阅者 单独的建立一个缓冲空间,其大小由入参maxBufferCapacity决定。默认情况下直接使用Flow.defaultBufferSize()来设置,默认为256。如果缓冲区满了之后会根据发送信息时候的策略确定是阻塞等待还是抛弃数据。
  SubmissionPublisher会在订阅者发生异常的时候(onNext处理中),会调用最后一个参数handler方法,然后才会取消订阅。默认的时候为null,也就是不会处理异常。
  最简单的创建SubmissionPublisher的方法就是直接使用无参构造方法: SubmissionPublisher publisher = new SubmissionPublisher<>();
  上文书说到,因为SubmissionPublisher实现了AutoCloseable接口,所以可以用try来进行资源回收可以省略close()的调用: try (SubmissionPublisher publisher = new SubmissionPublisher<>()){ }
  但是也可以手动的调用close()方法来显示的关闭发布者,关闭后再发送数据就会抛出异常: if (complete)     throw new IllegalStateException("Closed");创建订阅者
  上文中咱们没有手动创建订阅者,而是直接调用SubmissionPublisher中的consume方法使用其内部的订阅者来消费消息。在本节可以实现接口Flow.Subscriber创建一个SimpleSubscriber类: public class SimpleSubscriber implements Flow.Subscriber {     private Flow.Subscription subscription;     /**      * 订阅者名称      */     private String name;     /**      * 定义最大消费数量      */     private final long maxCount;     /**      * 计数器      */     private long counter;     public SimpleSubscriber(String name, long maxCount) {         this.name = name;         this.maxCount = maxCount <= 0 ? 1 : maxCount;     }     @Override     public void onSubscribe(Flow.Subscription subscription) {         this.subscription = subscription;         System.out.printf("订阅者:%s,最大消费数据: %d。%n", name, maxCount);         // 实际上是等于消费全部数据         subscription.request(maxCount);     }     @Override     public void onNext(Integer item) {         counter++;         System.out.printf("订阅者:%s 接收到数据:%d.%n", name, item);         if (counter >= maxCount) {             System.out.printf("准备取消订阅者: %s。已处理数据个数:%d。%n", name, counter);             // 处理完毕,取消订阅             subscription.cancel();         }     }     @Override     public void onError(Throwable t) {         System.out.printf("订阅者: %s,出现异常: %s。%n", name, t.getMessage());     }     @Override     public void onComplete() {         System.out.printf("订阅者: %s 处理完成。%n", name);     } }
  SimpleSubscriber是一个简单订阅者类,其逻辑是根据构造参数可以定义其名称name与最大处理数据值maxCount,最少处理一个数据。
  当发布者进行一个订阅的时候会生成一个令牌Subscription作为参数调用onSubscribe方法。在订阅者需要捕获该令牌作为后续与发布者交互的纽带。一般来说在onSubscribe中至少调用一次request且参数需要>0,否则发布者将无法向订阅者发送任何信息,这也是为什么maxCount需要大于0。
  当发布者开始发送数据后,会异步的调用onNext方法并将数据传入。该类中使用了一个计数器对数据数量进行校验,当达到最大值的时候,则会通过令牌(subscription)异步通知发布者订阅结束,然后发送者再异步的调用发订阅者的onComplete方法,以处理完成流程。
  其中的onError和onComplete方法只进行打印,这里就不再说了。
  以上的这个订阅者可以看作是一个push模型的实现,因为当开始订阅时订阅者就约定了需要接受的数量,然后在后续的处理(onNext)中不再请求新数据。
  我们可以用以下的代码创建一个名称为S1,消费2个元素的订阅者: SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);订阅令牌交互
  当我们可以创建了发送者和订阅者之后,我们需要确认一下进行交互的顺序,由于响应流的处理就是对于事件的处理,所以事件的顺序十分重要,具体顺序如下: 我们创建一个发布者publisher一个订阅者subscriber 订阅者subscriber通过调用发布者的subscribe()方法进行信息订阅。如果订阅成功,则发布者将生成一个令牌(Subscription)并作为入参调用订阅者的订阅事件方法onSubscribe()。如果调用异常则会直接调用订阅者的onError错误处理方法,并抛出IllegalStateException异常然后结束订阅。 在onSubscribe()中,订阅者需要通过调用令牌(Subscription)的请求方法request(long)来异步的向发布者请求数据。 当发布者有数据可以发布的时候,则会异步的调用订阅者的onNext()方法,直到所有消息的总数已经满足了订阅者调用request的数据请求上限。所以当订阅者请求订阅的消息数为Long.MAX_VALUE时,实际上是消费所有数据,即push模式。如果发布者没有数据要发布了,则可以会调用发布者自己的close()方法并异步的调用所有订阅者的onComplete()方法来通知订阅结束。 发布者可以随时向发布者请求更多的元素请求(一般在onNext里),而不用等到之前的处理完毕,一般是与之前的数据数量进行累加。 放发布者遇到异常的时候会调用订阅者的onError()方法。
  上面的描述中是只使用的一个订阅者来进行描述的,后面的例子中将说明发布者可以拥有多个订阅者(甚至0个订阅者)。 发送信息
  当发布者需要推送消息的时候会调用submit方法或者offer方法,上文中我们提到submit实际上是offer的一种简单实现,本节咱们自己比较一下。
  首先他们的方法签名为: int offer(T item, long timeout, TimeUnit unit, BiPredicate,? super T> onDrop) int offer(T item, BiPredicate,? super T> onDrop) int submit(T item)
  而submit 和 offer的直接方法为:     public int submit(T item) {         return doOffer(item, Long.MAX_VALUE, null);     }          public int offer(T item,                      BiPredicate, ? super T> onDrop) {     return doOffer(item, 0L, onDrop);
  可以看到他们的底层调用的都是 doOffer 方法,而doOffer的方法签名为:     private int doOffer(T item, long nanos,                         BiPredicate, ? super T> onDrop)
  所以我们可以直接看doOffer()方法。doOffer()方法是可选阻塞时长的,而时长根据入参数nanos来决定。而onDrop()是一个删除判断器,如果调用BiPredicate的test()方法结果为true则会再次重试(根据令牌中的nextRetry属性与发布器中的retryOffer()方法组合判断,但是具体实现还没梳理明白);如果结果为flase则直接删除内容。doOffer()返回的结果为正负两种,正数的结果为发送了数据,但是订阅者还未消费的数据(估计值,因为是异步多线程的);如果为负数,则返回的是重拾次数。
  所以,根据submit()的参数我们可以发现,submit会一直阻塞直到数据可以被消费(因为不会阻塞超时,所以不需要传入onDrop()方法)。而我们可以根据需要配置offer()选择器。如果必须要求数据都要被消费的话,那就可以直接选择submit(),如果要设置重试次数的话就可以选择使用offer() 异步调用的例子
  下面看一个具体的程序例子,程序将以3秒为周期进行数据发布: public class PeriodicPublisher {      public static final int WAIT_TIME = 2;     public static final int SLEEP_TIME = 3;      public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();         // 创建4订阅者         SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);         SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);         SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);         SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);         // 前三个订阅者直接进行订阅         publisher.subscribe(subscriber1);         publisher.subscribe(subscriber2);         publisher.subscribe(subscriber3);         // 第四个方法延迟订阅         delaySubscribeWithWaitTime(publisher, subscriber4);         // 开始发送消息         Thread pubThread = publish(publisher, 5);         try {             // 等待处理完成             pubThread.join();         } catch (InterruptedException e) {             e.printStackTrace();         }     }     public static Thread publish(SubmissionPublisher publisher, int count) {         Thread t = new Thread(() -> {             IntStream.range(1,count)                     .forEach(item ->{                         publisher.submit(item);                         sleep(item);                     });             publisher.close();         });         t.start();         return t;     }               private static void sleep(Integer item) {         try {             System.out.printf("推送数据:%d。休眠 3 秒。%n", item);             TimeUnit.SECONDS.sleep(SLEEP_TIME);         } catch (InterruptedException e) {             e.printStackTrace();         }     }     private static void delaySubscribeWithWaitTime(SubmissionPublisher publisher, Flow.Subscriber sub) {         new Thread(() -> {             try {                 TimeUnit.SECONDS.sleep(WAIT_TIME);                 publisher.subscribe(sub);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }).start();     }  }
  代码后是运行结果如下: 订阅者:S1,最大消费数据: 2。 推送数据:1。休眠 3 秒。 订阅者:S3,最大消费数据: 6。 订阅者:S2,最大消费数据: 4。 订阅者:S2 接收到数据:1. 订阅者:S3 接收到数据:1. 订阅者:S1 接收到数据:1. 订阅者:S4,最大消费数据: 10。 推送数据:2。休眠 3 秒。 订阅者:S2 接收到数据:2. 订阅者:S3 接收到数据:2. 订阅者:S1 接收到数据:2. 订阅者:S4 接收到数据:2. 准备取消订阅者: S1。已处理数据个数:2。 推送数据:3。休眠 3 秒。 订阅者:S4 接收到数据:3. 订阅者:S2 接收到数据:3. 订阅者:S3 接收到数据:3. 推送数据:4。休眠 3 秒。 订阅者:S4 接收到数据:4. 订阅者:S3 接收到数据:4. 订阅者:S2 接收到数据:4. 准备取消订阅者: S2。已处理数据个数:4。 推送数据:5。休眠 3 秒。 订阅者:S3 接收到数据:5. 订阅者:S4 接收到数据:5. 订阅者: S3 处理完成。 订阅者: S4 处理完成。
  由于是异步执行,所以在"接收数据"部分的顺序可能不同。
  我们分析一下程序的执行流程。 创建一个发布者实例 创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10。 前三个订阅者立即订阅消息。 S4的订阅者单独创建一个线程等待WAIT_TIME秒(2秒)之后进行数据的订阅。 新建一个线程来以SLEEP_TIME秒(3秒)为间隔发布5个数据。 将publish线程join()住等待流程结束。
  执行的日志满足上述流程而针对一些关键点为: S4在发送者推送数据"1"的时候还未订阅,所以S4没有接收到数据"1"。 当发送数据"2"的时候S1已经接收够了预期数据2个,所以取消了订阅。之后只剩下S2、S3、S4。 当发送数据"4"的时候S2已经接收够了预期数据4个,所以取消了订阅。之后只剩下S3、S4。 当发送数据"5"的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成。
  需要注意的是,如果在最后submit完毕之后直接close()然后结束进行的话可能订阅者并不能执行完毕。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以执行完毕的。 最后
  本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程。同时本文没有提供Processor的例子,各位也可以自行学习。
  总结一下流程: 订阅者向发布者进行订阅,然后发布者向订阅者发送令牌。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息。订阅者可以随时异步追加需要的更多信息。
  JDK9中在Flow接口中实现了Java API的4个接口,并提供了SubmissionPublisher作为Publisher接口的简单实现。

年底手机芯片杀疯了!联发科天玑下一代旗舰多项黑科技释出最近,联发科大招不断,秀出了一系列天玑旗舰技术,展示了在5GAI游戏天玑5G开放架构四个领域的技术实力。不出意外,这些旗舰技术都会被用到下一代天玑旗舰中,这些技术究竟有多厉害,大家299元,华为荣耀HonorxSport运动蓝牙耳机值不值如果在跑步的时候不听音乐,那跟咸鱼有什么区别。音乐可以让枯燥无味的跑步变得轻松惬意,分散跑者的注意力从而缓解疲劳。当然运动耳机很多,有线的听诊器效果太强果断放弃,无线的蓝牙运动耳机加强科技创新,德邦快递全面构建智慧物流网络体系一个行业要达到降本增效的终极目标,缺不了科技的加持,快递行业同样如此。作为快递行业的龙头企业,德邦快递抓住科技浪潮的风口,积极构建智慧物流网络体系,让整个物流业务的运作更智能,让消元宇宙火出圈,不仅是区块链与科技圈,小说里也出现了提到最近区块链圈与科技圈里最火热概念,非元宇宙莫属,这个概念出圈的终极一击,便是扎克伯格将公司名从Facebook改为Meta,宣布正式进军元宇宙。扎克伯格表示随着时间的推移,我希QuattroRobotics(墨西哥)应用中望3D进行自动化解决方案设计夸特罗机器人公司(QuattroRobotics)成立于墨西哥圣路易斯波托西市,致力于在各级教育机构和工业行业内推广自动化解决方案,并开设系列自动化课程培训。随着工业4。0时代背景如何借助中望CAD快速完成电气系统图的绘制及输出电气系统图是由各种电子元件或电气设备符号按一定方式连接起来的原理布局图,如电源电压电流电阻导线等,主要用来表明设备电气的工作原理各电气元件的作用及相互之间的关系。一张完整准确的电气阿根廷学生应用中望3D,技术重现达芬奇飞碟坦克原型阿根廷学生艾伦巴特(AlanBat)主修机械工程专业,最近他用中望3D将列奥纳多达芬奇(LeonardodaVinci)武装装甲车手稿灵感带进现实,技术重现最早的坦克原型。艾伦巴特如何使用中望3D2022的CAM方案加工塑胶模具CNC加工在模具行业主要用于加工模具零部件,如塑胶模具型腔和型芯滑块等。为进一步提升用户在产品加工生产流程中的效率和体验,最新版本的中望3D在CAM加工功能上进行了增强与优化。本文两个CAD小知识,搞懂就赚了CAD小知识,搞懂就赚了还记得刚开始学习CAD的时候,很多小伙伴都觉得,掌握了一些功能命令的用法后会有点小确幸,恨不得在日常绘图中得到展示。为了让大家继续保留这点小确幸,今天带来几超强升级表格功能,尽在全新中望CAD2022无论是建筑行业机械行业或者是其他行业,设计师使用CAD软件进行设计后常常会用到表格功能。常见的表格有明细表工序卡图样目录等形式,将项目重要数据以表格的形式插入到图纸当中,才能最终完2分钟掌握的命令我用了120秒才学会听君一席话,如听一席话有人问,你什么时候转修废话文学了?大家别误会,标题部分我希望通过反复强调,来让大家明白今天的CAD技巧真的用2分钟就可以掌握。但如果不行的话,请用下一个2分钟
公司装机究竟怎么选路由器?(本文开始前,首先声明是穷鬼向,有钱人左转网络公司)路由器是网络的核心,对于企业来说也是重中之重。传统企业多采用企业级路由器配合acap模式,达到自己的使用需求。但是这套配置轻则大2021不谈信仰的路由器推荐路由器是家庭网络的核心,一个家网络上的爽不爽一个好的路由器十分关键。今天不谈信仰,只谈体验。说说我在2021年推荐的800元以内的路由器。在此强调,以下推荐路由器含有部分魔改路由,vivoampampquot最惨ampampquot5G手机,44W快冲,降价一千多无人问津在当今的手机市场中,4G手机正在悄然退出市场,而5G手机已成为主要制造商推出的流行机型。在短短的一年内,市场上的5G手机层出不穷,vivo首次发布了该型号。5G手机已被遗忘,而正是郭世英之死重估郭沫若郭世英的中学时代是在北京有名的干部子弟学校101中度过的。在中学里他一直是班上的三好学生,模范共青团员,还被誉为高干子女的表率。那时,郭世英不仅成绩优秀,还是一个体育爱好者,是学校网站SEO优化做好能抵销售?杭州石炭纪用网站来干了销售的活在数字化时代该怎么做生意?首先你得有切实的产品,接着你需要搭建好自己的宣传矩阵,然后就需要做好营销推广,把自己的名头打响出去,让更多客户知道你,最后才是收获客户线索,达成生意。这其这样就能把新客户变成回头客?你的官网是否支持会员注册呢?客户对于一个企业来说就是运转下去的动力,所以如何获客成为了所有老板们最关心的事情,但仅仅是获得新客户就够了吗?当然不是,企业要做的绝对不仅仅是开拓新用户,想办法留住老客户,让客户愿如何用好在线表单功能获客?在使用在线表单时还遇到过哪些问题?在线表单这一功能在如今的各种类型网站中都很常见,有时候点开一家企业的官网,就能够直接在网站首页填写表单,注册网站会员。就算网站首页没有设置表单,往下翻一翻,往往也能够在网页底部看到为啥不推荐使用黑帽SEO?其实用LTD也能快速做好SEO优化想要让自己的网站被更多用户浏览,自然就需要提升网站在搜索引擎中的排名,这一过程被称为SEO优化。现如今绝大多数企业的网站都会进行SEO优化,但效果却各不相同。有些官网的排名上升速度营销物料多次上传太麻烦?没事,我们可以批量解决前不久有用户向我们反映我们现在要做一个新网站,可原来网站上的文章内容要一篇一篇手动上传到后台,真的太费时间了!的确,我们可以想象这么一个场景当你搭建好了自己的新官网,看着空荡荡的内使用官微中心文章编辑器时碰上了这几个问题?别担心工欲善其事,必先利其器。企业想要做好数字化营销,一个强大的企业官网后台是必不可少的。而其中,企业官网后台具备的营销物料的上传和分享功能则是会直接影响到企业官网数字化营销的结果。因此等等党的福利!这几款机型非常合适,网友不会再错过了RedmiK30至尊纪念版价格为2000元价位的产品,首先想到的肯定是刚刚发布的RedmiK30至尊纪念版,这部手机应该是这个价格下无脑购买的代表。总而言之一句话,放心大胆的购买就