范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

来点硬的,动态线程池解析

  理论基础
  线程池本身提供了接口动态设置参数,如下: public void setCorePoolSize(int corePoolSize); public void setMaximumPoolSize(int maximumPoolSize); public void setKeepAliveTime(long time, TimeUnit unit); public void setThreadFactory(ThreadFactory threadFactory); public void setRejectedExecutionHandler(RejectedExecutionHandler handler); public void allowCoreThreadTimeOut(boolean value);  protected void beforeExecute(Thread t, Runnable r); protected void afterExecute(Runnable r, Throwable t);
  美团技术团队有篇文章讲他们是如何应用的:
  https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html public void setCorePoolSize(int corePoolSize) {     if (corePoolSize < 0 || maximumPoolSize < corePoolSize)         throw new IllegalArgumentException();     int delta = corePoolSize - this.corePoolSize;     this.corePoolSize = corePoolSize;     if (workerCountOf(ctl.get()) > corePoolSize)         // 如果小于当前线程数,中断闲置的线程         interruptIdleWorkers();     else if (delta > 0) {         // 如果大于当前线程数则增加线程         int k = Math.min(delta, workQueue.size());         while (k-- > 0 && addWorker(null, true)) {             if (workQueue.isEmpty())                 break;         }     } }
  市面方案
  目前star数比较多的有两个:
  hippo-4j:https://hippo4j.cn/docs/getting-started/hippo4j-core-start/
  dynamic-tp:https://dynamictp.cn/。
  本文以dynamic-tp为例,原理一致可举一反三。最主要的步骤是监听配置变化将变更设置进线程池。
  hippo-4j:
  dynamic-tp:
  使用
  将动态线程池注册到Spring ioc容器: /**      * 通过{@link DynamicTp} 注解定义普通juc线程池,会享受到该框架监控功能,注解名称优先级高于方法名      *      * @return 线程池实例      */     @DynamicTp("commonExecutor")     @Bean     public ThreadPoolExecutor commonExecutor() {         return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);     }      /**      * 通过{@link ThreadPoolCreator} 快速创建一些简单配置的动态线程池      * tips: 建议直接在配置中心配置就行,不用@Bean声明      *      * @return 线程池实例      */     @Bean     public DtpExecutor dtpExecutor1() {         return ThreadPoolCreator.createDynamicFast("dtpExecutor1");     }      /**      * 通过{@link ThreadPoolBuilder} 设置详细参数创建动态线程池(推荐方式),      * ioIntensive,参考tomcat线程池设计,实现了处理io密集型任务的线程池,具体参数可以看代码注释      *      * tips: 建议直接在配置中心配置就行,不用@Bean声明      * @return 线程池实例      */     @Bean     public DtpExecutor ioIntensiveExecutor() {         return ThreadPoolBuilder.newBuilder()                 .threadPoolName("ioIntensiveExecutor")                 .corePoolSize(20)                 .maximumPoolSize(50)                 .queueCapacity(2048)                 .ioIntensive(true)                 .buildDynamic();     }      /**      * tips: 建议直接在配置中心配置就行,不用@Bean声明      * @return 线程池实例      */     @Bean     public ThreadPoolExecutor dtpExecutor2() {         return ThreadPoolBuilder.newBuilder()                 .threadPoolName("dtpExecutor2")                 .corePoolSize(10)                 .maximumPoolSize(15)                 .keepAliveTime(50)                 .timeUnit(TimeUnit.MILLISECONDS)                 .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)                 .waitForTasksToCompleteOnShutdown(true)                 .awaitTerminationSeconds(5)                 .buildDynamic();     }
  源码实现
  通过自动配置注册进容器,BeanPostProcessor和CloudNacosRefresher,前者负责将线程池注册到 DtpRegistry,后者负责监听更新以实现动态更新线程池配置。 @Configuration @EnableConfigurationProperties({DtpProperties.class}) @ConditionalOnProperty(     name = {"spring.dynamic.tp.enabled"},     matchIfMissing = true,     havingValue = "true" ) public class BaseBeanAutoConfiguration {     public BaseBeanAutoConfiguration() {     }      @Bean     public ApplicationContextHolder dtpApplicationContextHolder() {         return new ApplicationContextHolder();     }      @Bean     @ConditionalOnMissingBean     public DtpBannerPrinter dtpBannerPrinter(DtpProperties properties) {         return new DtpBannerPrinter(properties);     }      @Bean     @DependsOn({"dtpApplicationContextHolder"})     @ConditionalOnMissingBean     public DtpPostProcessor dtpPostProcessor() {         return new DtpPostProcessor();     }      @Bean     @ConditionalOnMissingBean     public DtpRegistry dtpRegistry() {         return new DtpRegistry();     }      @Bean     @ConditionalOnMissingBean     public DtpMonitor dtpMonitor() {         return new DtpMonitor();     }      @Bean     @ConditionalOnMissingBean     @ConditionalOnAvailableEndpoint     public DtpEndpoint dtpEndpoint() {         return new DtpEndpoint();     } }@Configuration @ConditionalOnClass(NacosConfigProperties.class) @ConditionalOnProperty(value = DynamicTpConst.DTP_ENABLED_PROP, matchIfMissing = true, havingValue = "true") @ImportAutoConfiguration({BaseBeanAutoConfiguration.class}) public class DtpAutoConfiguration {      @Bean     @ConditionalOnMissingBean()     @ConditionalOnClass(NacosConfigManager.class)     public CloudNacosRefresher cloudNacosRefresher() {         return new CloudNacosRefresher();     } }
  BeanPostProcessor(DtpPostProcessor)将线程池放入DtpRegistry: @Slf4j public class DtpPostProcessor implements BeanPostProcessor {      @Override     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {          if (!(bean instanceof ThreadPoolExecutor)) {             return bean;         }          if (bean instanceof DtpExecutor) {             DtpExecutor dtpExecutor = (DtpExecutor) bean;             if (bean instanceof EagerDtpExecutor) {                 ((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);             }             registerDtp(dtpExecutor);             return dtpExecutor;         }          ApplicationContext applicationContext = ApplicationContextHolder.getInstance();         DynamicTp dynamicTp;         try {             dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);             if (dynamicTp == null) {                 return bean;             }         } catch (NoSuchBeanDefinitionException e) {             log.error("There is no bean with the given name {}", beanName, e);             return bean;         }          String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName;         registerCommon(poolName, (ThreadPoolExecutor) bean);         return bean;     } // 放进去     private void registerDtp(DtpExecutor executor) {         DtpRegistry.registerDtp(executor, "beanPostProcessor");     } // 放进去     private void registerCommon(String poolName, ThreadPoolExecutor executor) {         ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor);         DtpRegistry.registerCommon(wrapper, "beanPostProcessor");     } }
  ApplicationRunner(DtpRegistry)在容器启动后统计信息,告警等等一大堆: @Override public void run(ApplicationArguments args) {     Set remoteExecutors = Collections.emptySet();     if (CollUtil.isNotEmpty(dtpProperties.getExecutors())) {         remoteExecutors = dtpProperties.getExecutors().stream()             .map(ThreadPoolProperties::getThreadPoolName)             .collect(Collectors.toSet());     }      val registeredDtpExecutors = Sets.newHashSet(DTP_REGISTRY.keySet());     val localDtpExecutors = CollUtil.subtract(registeredDtpExecutors, remoteExecutors);     val localCommonExecutors = COMMON_REGISTRY.keySet();     log.info("DtpRegistry initialization end, remote dtpExecutors: {}, local dtpExecutors: {}," +              " local commonExecutors: {}", remoteExecutors, localDtpExecutors, localCommonExecutors);      if (CollUtil.isEmpty(dtpProperties.getPlatforms())) {         log.warn("DtpRegistry initialization end, no notify platforms configured.");         DTP_REGISTRY.forEach((k, v) -> v.setNotifyItems(Collections.emptyList()));         return;     }     DTP_REGISTRY.forEach((k, v) -> {         NotifyHelper.fillPlatforms(dtpProperties.getPlatforms(), v.getNotifyItems());         v.getNotifyItems().forEach(x -> {             AlarmLimiter.initAlarmLimiter(k, x);             AlarmCounter.init(k, x.getType());         });     }); }
  上面初始化就算完成了,可以看到在run方法没有将远端的参数设置到线程池,这是个问题,如果需要生效则需要触发一次refresh。还有一个问题dynamic-tp不支持对JUC里面原生的ThreadPool不支持动态(hippo-4j也是不支持)。
  触发刷新
  Nacos(配置中心周期性拉取数据): @Override public void startInternal() throws NacosException {     executor.schedule(new Runnable() {         @Override         public void run() {             while (!executor.isShutdown() && !executor.isTerminated()) {                 try {                     listenExecutebell.poll(5L, TimeUnit.SECONDS);                     if (executor.isShutdown() || executor.isTerminated()) {                         continue;                     }                     executeConfigListen();                 } catch (Exception e) {                     LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);                 }             }         }     }, 0L, TimeUnit.MILLISECONDS);  }
  com.alibaba.nacos.client.config.impl.CacheData#safeNotifyListener将变更发布:(hippo-4j是通过添加Nacos的Listener做到刷新的,也就是说它得自己解析原始数据) ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); cr.setEncryptedDataKey(encryptedDataKey); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listenerWrap.inNotifying = true; // 在此,默认的Listener会发布Refresh事件 listener.receiveConfigInfo(contentTmp); // compare lastContent and content if (listener instanceof AbstractConfigChangeListener) {     Map data = ConfigChangeHandler.getInstance()         .parseChangeData(listenerWrap.lastContent, content, type);     ConfigChangeEvent event = new ConfigChangeEvent(data);     ((AbstractConfigChangeListener) listener).receiveConfigChange(event);     listenerWrap.lastContent = content; }
  如下: private void registerNacosListener(final String groupKey, final String dataKey) {         String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);         Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {             return new AbstractSharedListener() {                 public void innerReceive(String dataId, String group, String configInfo) {                     NacosContextRefresher.refreshCountIncrement();                     NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);                     // 发布事件                     NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));                     if (NacosContextRefresher.log.isDebugEnabled()) {                         NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));                     }                  }             };         });          try {             this.configService.addListener(dataKey, groupKey, listener);         } catch (NacosException var6) {             log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);         }      }
  org.springframework.cloud.endpoint.event.RefreshEventListener处理refresh事件 public void handle(RefreshEvent event) {     if (this.ready.get()) {         log.debug("Event received " + event.getEventDesc());         Set keys = this.refresh.refresh();         log.info("Refresh keys changed: " + keys);     }  }
  在refresh之后(Apollo的动态配置也是基于这个事件的),会发布RefreshScopeRefreshedEvent事件 public synchronized Set refresh() {     Set keys = this.refreshEnvironment();     // 会发布RefreshScopeRefreshedEvent     this.scope.refreshAll();     return keys; }
  回到了com.dtp.starter.cloud.nacos.refresh.CloudNacosRefresher: @Override public void onApplicationEvent(@NonNull ApplicationEvent event) {     if (event instanceof RefreshScopeRefreshedEvent) {         doRefresh(dtpProperties);     } }
  最终会委托到
  com.dtp.core.DtpRegistry#refresh(com.dtp.common.config.DtpProperties)
  --->
  com.dtp.core.DtpRegistry#doRefresh private static void doRefresh(DtpExecutor dtpExecutor, ThreadPoolProperties properties) {          if (!Objects.equals(dtpExecutor.getCorePoolSize(), properties.getCorePoolSize())) {             dtpExecutor.setCorePoolSize(properties.getCorePoolSize());         }          if (!Objects.equals(dtpExecutor.getMaximumPoolSize(), properties.getMaximumPoolSize())) {             dtpExecutor.setMaximumPoolSize(properties.getMaximumPoolSize());         }          if (!Objects.equals(dtpExecutor.getKeepAliveTime(properties.getUnit()), properties.getKeepAliveTime())) {             dtpExecutor.setKeepAliveTime(properties.getKeepAliveTime(), properties.getUnit());         }          if (!Objects.equals(dtpExecutor.allowsCoreThreadTimeOut(), properties.isAllowCoreThreadTimeOut())) {             dtpExecutor.allowCoreThreadTimeOut(properties.isAllowCoreThreadTimeOut());         }          // update reject handler         if (!Objects.equals(dtpExecutor.getRejectHandlerName(), properties.getRejectedHandlerType())) {             dtpExecutor.setRejectedExecutionHandler(RejectHandlerGetter.getProxy(properties.getRejectedHandlerType()));             dtpExecutor.setRejectHandlerName(properties.getRejectedHandlerType());         }          // update Alias Name         if (!Objects.equals(dtpExecutor.getTheadPoolAliasName(), properties.getTheadPoolAliasName())) {             dtpExecutor.setTheadPoolAliasName(properties.getTheadPoolAliasName());         }          // update work queue capacity         if (!Objects.equals(dtpExecutor.getQueueCapacity(), properties.getQueueCapacity()) &&                 Objects.equals(properties.getQueueType(), VARIABLE_LINKED_BLOCKING_QUEUE.getName())) {             val blockingQueue = dtpExecutor.getQueue();             if (blockingQueue instanceof VariableLinkedBlockingQueue) {                 ((VariableLinkedBlockingQueue)blockingQueue).setCapacity(properties.getQueueCapacity());             } else {                 log.error("DynamicTp refresh, the blockingqueue capacity cannot be reset, dtpName: {}, queueType {}",                         dtpExecutor.getThreadPoolName(), dtpExecutor.getQueueName());             }         }         dtpExecutor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutdown());         dtpExecutor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());         dtpExecutor.setPreStartAllCoreThreads(properties.isPreStartAllCoreThreads());         dtpExecutor.setRunTimeout(properties.getRunTimeout());         dtpExecutor.setQueueTimeout(properties.getQueueTimeout());          List taskWrappers = TaskWrappers.getInstance().getByNames(properties.getTaskWrapperNames());         dtpExecutor.setTaskWrappers(taskWrappers);          // update notify items         properties.setNotifyItems(mergeAllNotifyItems(properties.getNotifyItems()));         val items = NotifyHelper.fillNotifyItems(properties.getNotifyItems(), dtpProperties.getPlatforms());         NotifyHelper.initAlarm(dtpExecutor.getThreadPoolName(), dtpExecutor.getNotifyItems(), items);         dtpExecutor.setNotifyItems(items);     }
  大致过程就是这样。集中在DtpRegistry这个类上面。
  缺陷
  1.不够自动化
  其实我们希望可以根据当前机器的负载与机器配置可以动态在机器允许的范围内自动调整参数。这个问题可以通过梯度下降算法优化。
  2.适配不足
  未对ThreadPoolExecutor和ThreadPoolTaskExecutor做很好的适配。这个问题可以通过装饰者/委托/代理模式优化。
  注:转载请标明作者与出处

痛风发作不用慌,1种草药,1个方剂,中医给你支招痛风有多痛,只有痛过的人才懂,严重时无法行走,甚至瘫痪在床,这都不是吓唬人。根据现代研究显示,痛风与饮食有着重大的关系,常吃海鲜动物肝脏肉汁肉汤经常酗酒的人,更容易患上痛风。因为这被称之为菌中之王的松茸价比黄金,补虚益气,吃好身体少生病大家好,我是张医生,前面我们一起了解了香菇冬季吃香菇最养人,用它煲汤补中益气,健胃养脾,还能预防疾病,今天我们一起来了解另外一种菌菇松茸,松茸是一种比较难得的菌菇,因为营养价值极高中医治疗泌尿系统感染(淋证)古传良方1则黄柏苦参贝母汤泌尿系统感染属于中医学淋证范畴。淋证的病位在肾与膀胱,且与肝脾有关。其病机主要是肾虚,膀胱湿热,气化失司。肾与膀胱相表里,肾气的盛衰直接影响膀胱的气化与开合。淋证日久不愈,热伤阴,老年人喝茶应注意的那些事茶是日常生活中喜爱的饮品,含有很多有益的成分,如茶多酚多种维生素和氨基酸等,有助于延缓衰老,抑制心血管疾病,提神醒脑。尽管茶具有明显的保健功效,但在饮用中要讲求适时适量,所谓过犹不水果茶和葡萄酒的抗氧化黄酮醇可减缓记忆力下降根据新的研究,如果人们吃或喝更多含有抗氧化黄酮醇的食物(茶和酒以及几种水果和蔬菜中都含有黄酮醇),记忆力下降的速度可能会减慢。这表明做出特定的饮食可能会减缓认知能力下降速度。像多吃14年糖尿病肾病,成功逆转!只因在治疗上选对了这三个方向糖尿病肾病是糖尿病的并发症之一,糖尿病患者如果没有做好防护,病情发展为糖尿病肾病的可能性是很大的,由于糖尿病肾病造成的肾损害不可逆,所以糖尿病肾病算是一种较为严重的并发症,但也还是卡塔尔世界杯无奈禁止俄罗斯参加,欧洲国家用退赛威胁国际足联?欧洲七国的足球队为什么认怂了?前两天我在讲到世界杯开幕的新闻时就提到了,因为这届世界杯受到政治的干预很大,这个政治也包括这种社会性的议题,特别像同性恋的议题,因为卡塔尔是个伊斯兰国宏远喜讯!二飞卸任队长赵睿顶上,朱芳雨报价考神遭拒,出征诸暨CBA第二阶段即将开打,十一冠王广东宏远也已经准备就绪,正式开拔诸暨赛区。此前宏远曾经吃下大亏,缺席常规赛首战,导致球队在积分榜上处于天然劣势,朱芳雨吸取教训决定提前出发。第二阶段前国青主帅中国篮球要多打国际比赛,牵手卡特彼勒挖掘潜力今年夏天,中国U18国青男篮在亚青赛上惨遭韩国19分大逆转,中国男篮在亚洲杯上无缘四强,一片哀鸿遍野。问题在哪儿呢?前国青主帅范斌一语中的,中国篮球要让青少年多打国际比赛,培养国际丁俊晖1战3重喜!坐稳世界第3,中国7人进32强,2冠军将登场今晨,斯诺克德国大师赛资格赛结束第4日争夺,丁俊晖51淘汰乔丹布朗,1战3重喜,坐稳单赛季世界第3中国军团7人打进32强,创造佳绩,2大冠军范争一傅家俊即将在今晚出战,冲击正赛资格36氪研究院2022年中国电子竞技产业洞察报告电子竞技是指以锦标赛或联赛等组织形式呈现的专业竞技游戏项目,参与竞技的选手和团队通过在软硬件与信息技术搭建的虚拟环境中,利用各类型电子设备作为运动器械进行智力与体力相结合的运动。近
年货节数码好物推荐手表平板手机全都有,性价比超高入手不后悔春节前,无论是计划假期远行旅游还是回家过年,很多人都会提前为自己入手一些数码好物,凭借新品的独特体验可以让人在旅途或是在家的时候能够带来更好的帮助,当然每种数码产品都有自己的亮点,iPhone13价格下跌1600,是否还有必要入性能更好的iPhone14?您在阅读前请点击上面的关注二字,后续会第一时间为您提供更多有价值的相关内容,感谢您的支持。iPhone14是最新一代的苹果手机,目前iPhone13的价格跌了1600元,所以很多人随着人工智能时代的到来,人工智能对我们的生活有何影响?今天,人工智能在社会中的应用已经十分广泛,随处可见。在人们的日常生活中,我们有智能手机,它不仅具有传统手机接打电话,收发短信的功能。更是在此基础上创造出了人脸识别语音识别指纹解锁等2022科技产品年度优选TOP10一文看遍年度优秀数码产品2022年,我们经历了不平凡的一年,但是,科技还是向着以人为本的方向前行。这一年,科技产品看上去波澜不惊,但它们又按着自己的节奏更新换代。从年初到年末,当我们回头望时,不论是全新的配1。5T双电机四驱,4。3秒破百,比亚迪唐DMp究竟强在了哪里?新能源技术发展至几年,也磨过了很长一段时光,但目前仍有部分问题尚未攻克,而基于这一条件,作为混动车型,优势反倒有所被承托,本期就来看看,唐DMp2022款,28。98万32。98万最新!多国出台签证护照新规近日,多国政府发布了与签证和护照相关的政策,涉及证件费用居留方式工作许可等多个方面,小侨对此进行梳理,为大家提供参考。西班牙推出数字人才签证日前,西班牙政府近期批准实施数字人才签证CBA官方最新积分榜,吴前3分8中0爆冷门,广东辽宁只差浙江1胜场CBA联赛第25轮在1月12日进行了最后两场比赛的较量,至此,25轮比赛全部打完。本轮最大冷门出现,吴前3分8中0,领头羊浙江稠州主场不敌倒数第3名球队天津先行者。CBA官方最新积西北首座室内冰雪运动中心,来了!来源西咸新区融媒体中心热爱滑雪的小伙伴们,今年年底,就可以来西咸新区滑雪了。去年的北京冬奥会引发了全民对冰雪运动的关注,掀起了冰雪运动的新风潮。如今,西咸新区也将有自己的冰雪运动中经济一线观察贵州贵安新区抢抓新基建数据中心创新机冬雾笼罩的山城贵阳美景如画,在其临近的贵安新区,还有一番景观地势平坦视野开阔,一批超大型数据中心拔地而起,在鳞次栉比的机房内,整齐排列的服务器嗡嗡作响。2014年获批设立时,贵安新投资的常识大道至简投资是一件非常简单的事,每个人都可以进行投资但是投资也是一件非常难的事,想要通过投资赚钱并不容易。为大家推荐投资的常识这本书,作者是美国伯顿马尔基尔和查尔斯埃利斯。这两位华尔街投资除了俞敏洪,宁波人也把副业做成主业,苏州人还赚600亿,为什么文龙溪来源商业传奇当年马老师调侃俞敏洪说,教育行业永远在,但新东方可能会不在。几年之后,新东方仍在,但新东方所在的教培行业没有了一大半。2021年,好未来跟谁学等几乎销声匿迹了,股