来点硬的,动态线程池解析
理论基础
线程池本身提供了接口动态设置参数,如下: public void setCorePoolSize(int corePoolSize); public void setMaximumPoolSize(int maximumPoolSize); public void setKeepAliveTime(long time, TimeUnit unit); public void setThreadFactory(ThreadFactory threadFactory); public void setRejectedExecutionHandler(RejectedExecutionHandler handler); public void allowCoreThreadTimeOut(boolean value); protected void beforeExecute(Thread t, Runnable r); protected void afterExecute(Runnable r, Throwable t);
美团技术团队有篇文章讲他们是如何应用的:
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) // 如果小于当前线程数,中断闲置的线程 interruptIdleWorkers(); else if (delta > 0) { // 如果大于当前线程数则增加线程 int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }
市面方案
目前star数比较多的有两个:
hippo-4j:https://hippo4j.cn/docs/getting-started/hippo4j-core-start/
dynamic-tp:https://dynamictp.cn/。
本文以dynamic-tp为例,原理一致可举一反三。最主要的步骤是监听配置变化将变更设置进线程池。
hippo-4j:
dynamic-tp:
使用
将动态线程池注册到Spring ioc容器: /** * 通过{@link DynamicTp} 注解定义普通juc线程池,会享受到该框架监控功能,注解名称优先级高于方法名 * * @return 线程池实例 */ @DynamicTp("commonExecutor") @Bean public ThreadPoolExecutor commonExecutor() { return (ThreadPoolExecutor) Executors.newFixedThreadPool(1); } /** * 通过{@link ThreadPoolCreator} 快速创建一些简单配置的动态线程池 * tips: 建议直接在配置中心配置就行,不用@Bean声明 * * @return 线程池实例 */ @Bean public DtpExecutor dtpExecutor1() { return ThreadPoolCreator.createDynamicFast("dtpExecutor1"); } /** * 通过{@link ThreadPoolBuilder} 设置详细参数创建动态线程池(推荐方式), * ioIntensive,参考tomcat线程池设计,实现了处理io密集型任务的线程池,具体参数可以看代码注释 * * tips: 建议直接在配置中心配置就行,不用@Bean声明 * @return 线程池实例 */ @Bean public DtpExecutor ioIntensiveExecutor() { return ThreadPoolBuilder.newBuilder() .threadPoolName("ioIntensiveExecutor") .corePoolSize(20) .maximumPoolSize(50) .queueCapacity(2048) .ioIntensive(true) .buildDynamic(); } /** * tips: 建议直接在配置中心配置就行,不用@Bean声明 * @return 线程池实例 */ @Bean public ThreadPoolExecutor dtpExecutor2() { return ThreadPoolBuilder.newBuilder() .threadPoolName("dtpExecutor2") .corePoolSize(10) .maximumPoolSize(15) .keepAliveTime(50) .timeUnit(TimeUnit.MILLISECONDS) .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) .waitForTasksToCompleteOnShutdown(true) .awaitTerminationSeconds(5) .buildDynamic(); }
源码实现
通过自动配置注册进容器,BeanPostProcessor和CloudNacosRefresher,前者负责将线程池注册到 DtpRegistry,后者负责监听更新以实现动态更新线程池配置。 @Configuration @EnableConfigurationProperties({DtpProperties.class}) @ConditionalOnProperty( name = {"spring.dynamic.tp.enabled"}, matchIfMissing = true, havingValue = "true" ) public class BaseBeanAutoConfiguration { public BaseBeanAutoConfiguration() { } @Bean public ApplicationContextHolder dtpApplicationContextHolder() { return new ApplicationContextHolder(); } @Bean @ConditionalOnMissingBean public DtpBannerPrinter dtpBannerPrinter(DtpProperties properties) { return new DtpBannerPrinter(properties); } @Bean @DependsOn({"dtpApplicationContextHolder"}) @ConditionalOnMissingBean public DtpPostProcessor dtpPostProcessor() { return new DtpPostProcessor(); } @Bean @ConditionalOnMissingBean public DtpRegistry dtpRegistry() { return new DtpRegistry(); } @Bean @ConditionalOnMissingBean public DtpMonitor dtpMonitor() { return new DtpMonitor(); } @Bean @ConditionalOnMissingBean @ConditionalOnAvailableEndpoint public DtpEndpoint dtpEndpoint() { return new DtpEndpoint(); } }@Configuration @ConditionalOnClass(NacosConfigProperties.class) @ConditionalOnProperty(value = DynamicTpConst.DTP_ENABLED_PROP, matchIfMissing = true, havingValue = "true") @ImportAutoConfiguration({BaseBeanAutoConfiguration.class}) public class DtpAutoConfiguration { @Bean @ConditionalOnMissingBean() @ConditionalOnClass(NacosConfigManager.class) public CloudNacosRefresher cloudNacosRefresher() { return new CloudNacosRefresher(); } }
BeanPostProcessor(DtpPostProcessor)将线程池放入DtpRegistry: @Slf4j public class DtpPostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!(bean instanceof ThreadPoolExecutor)) { return bean; } if (bean instanceof DtpExecutor) { DtpExecutor dtpExecutor = (DtpExecutor) bean; if (bean instanceof EagerDtpExecutor) { ((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor); } registerDtp(dtpExecutor); return dtpExecutor; } ApplicationContext applicationContext = ApplicationContextHolder.getInstance(); DynamicTp dynamicTp; try { dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class); if (dynamicTp == null) { return bean; } } catch (NoSuchBeanDefinitionException e) { log.error("There is no bean with the given name {}", beanName, e); return bean; } String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName; registerCommon(poolName, (ThreadPoolExecutor) bean); return bean; } // 放进去 private void registerDtp(DtpExecutor executor) { DtpRegistry.registerDtp(executor, "beanPostProcessor"); } // 放进去 private void registerCommon(String poolName, ThreadPoolExecutor executor) { ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor); DtpRegistry.registerCommon(wrapper, "beanPostProcessor"); } }
ApplicationRunner(DtpRegistry)在容器启动后统计信息,告警等等一大堆: @Override public void run(ApplicationArguments args) { Set remoteExecutors = Collections.emptySet(); if (CollUtil.isNotEmpty(dtpProperties.getExecutors())) { remoteExecutors = dtpProperties.getExecutors().stream() .map(ThreadPoolProperties::getThreadPoolName) .collect(Collectors.toSet()); } val registeredDtpExecutors = Sets.newHashSet(DTP_REGISTRY.keySet()); val localDtpExecutors = CollUtil.subtract(registeredDtpExecutors, remoteExecutors); val localCommonExecutors = COMMON_REGISTRY.keySet(); log.info("DtpRegistry initialization end, remote dtpExecutors: {}, local dtpExecutors: {}," + " local commonExecutors: {}", remoteExecutors, localDtpExecutors, localCommonExecutors); if (CollUtil.isEmpty(dtpProperties.getPlatforms())) { log.warn("DtpRegistry initialization end, no notify platforms configured."); DTP_REGISTRY.forEach((k, v) -> v.setNotifyItems(Collections.emptyList())); return; } DTP_REGISTRY.forEach((k, v) -> { NotifyHelper.fillPlatforms(dtpProperties.getPlatforms(), v.getNotifyItems()); v.getNotifyItems().forEach(x -> { AlarmLimiter.initAlarmLimiter(k, x); AlarmCounter.init(k, x.getType()); }); }); }
上面初始化就算完成了,可以看到在run方法没有将远端的参数设置到线程池,这是个问题,如果需要生效则需要触发一次refresh。还有一个问题dynamic-tp不支持对JUC里面原生的ThreadPool不支持动态(hippo-4j也是不支持)。
触发刷新
Nacos(配置中心周期性拉取数据): @Override public void startInternal() throws NacosException { executor.schedule(new Runnable() { @Override public void run() { while (!executor.isShutdown() && !executor.isTerminated()) { try { listenExecutebell.poll(5L, TimeUnit.SECONDS); if (executor.isShutdown() || executor.isTerminated()) { continue; } executeConfigListen(); } catch (Exception e) { LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); } } } }, 0L, TimeUnit.MILLISECONDS); }
com.alibaba.nacos.client.config.impl.CacheData#safeNotifyListener将变更发布:(hippo-4j是通过添加Nacos的Listener做到刷新的,也就是说它得自己解析原始数据) ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); cr.setEncryptedDataKey(encryptedDataKey); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listenerWrap.inNotifying = true; // 在此,默认的Listener会发布Refresh事件 listener.receiveConfigInfo(contentTmp); // compare lastContent and content if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; }
如下: private void registerNacosListener(final String groupKey, final String dataKey) { String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> { return new AbstractSharedListener() { public void innerReceive(String dataId, String group, String configInfo) { NacosContextRefresher.refreshCountIncrement(); NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // 发布事件 NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config")); if (NacosContextRefresher.log.isDebugEnabled()) { NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo)); } } }; }); try { this.configService.addListener(dataKey, groupKey, listener); } catch (NacosException var6) { log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6); } }
org.springframework.cloud.endpoint.event.RefreshEventListener处理refresh事件 public void handle(RefreshEvent event) { if (this.ready.get()) { log.debug("Event received " + event.getEventDesc()); Set keys = this.refresh.refresh(); log.info("Refresh keys changed: " + keys); } }
在refresh之后(Apollo的动态配置也是基于这个事件的),会发布RefreshScopeRefreshedEvent事件 public synchronized Set refresh() { Set keys = this.refreshEnvironment(); // 会发布RefreshScopeRefreshedEvent this.scope.refreshAll(); return keys; }
回到了com.dtp.starter.cloud.nacos.refresh.CloudNacosRefresher: @Override public void onApplicationEvent(@NonNull ApplicationEvent event) { if (event instanceof RefreshScopeRefreshedEvent) { doRefresh(dtpProperties); } }
最终会委托到
com.dtp.core.DtpRegistry#refresh(com.dtp.common.config.DtpProperties)
--->
com.dtp.core.DtpRegistry#doRefresh private static void doRefresh(DtpExecutor dtpExecutor, ThreadPoolProperties properties) { if (!Objects.equals(dtpExecutor.getCorePoolSize(), properties.getCorePoolSize())) { dtpExecutor.setCorePoolSize(properties.getCorePoolSize()); } if (!Objects.equals(dtpExecutor.getMaximumPoolSize(), properties.getMaximumPoolSize())) { dtpExecutor.setMaximumPoolSize(properties.getMaximumPoolSize()); } if (!Objects.equals(dtpExecutor.getKeepAliveTime(properties.getUnit()), properties.getKeepAliveTime())) { dtpExecutor.setKeepAliveTime(properties.getKeepAliveTime(), properties.getUnit()); } if (!Objects.equals(dtpExecutor.allowsCoreThreadTimeOut(), properties.isAllowCoreThreadTimeOut())) { dtpExecutor.allowCoreThreadTimeOut(properties.isAllowCoreThreadTimeOut()); } // update reject handler if (!Objects.equals(dtpExecutor.getRejectHandlerName(), properties.getRejectedHandlerType())) { dtpExecutor.setRejectedExecutionHandler(RejectHandlerGetter.getProxy(properties.getRejectedHandlerType())); dtpExecutor.setRejectHandlerName(properties.getRejectedHandlerType()); } // update Alias Name if (!Objects.equals(dtpExecutor.getTheadPoolAliasName(), properties.getTheadPoolAliasName())) { dtpExecutor.setTheadPoolAliasName(properties.getTheadPoolAliasName()); } // update work queue capacity if (!Objects.equals(dtpExecutor.getQueueCapacity(), properties.getQueueCapacity()) && Objects.equals(properties.getQueueType(), VARIABLE_LINKED_BLOCKING_QUEUE.getName())) { val blockingQueue = dtpExecutor.getQueue(); if (blockingQueue instanceof VariableLinkedBlockingQueue) { ((VariableLinkedBlockingQueue)blockingQueue).setCapacity(properties.getQueueCapacity()); } else { log.error("DynamicTp refresh, the blockingqueue capacity cannot be reset, dtpName: {}, queueType {}", dtpExecutor.getThreadPoolName(), dtpExecutor.getQueueName()); } } dtpExecutor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutdown()); dtpExecutor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds()); dtpExecutor.setPreStartAllCoreThreads(properties.isPreStartAllCoreThreads()); dtpExecutor.setRunTimeout(properties.getRunTimeout()); dtpExecutor.setQueueTimeout(properties.getQueueTimeout()); List taskWrappers = TaskWrappers.getInstance().getByNames(properties.getTaskWrapperNames()); dtpExecutor.setTaskWrappers(taskWrappers); // update notify items properties.setNotifyItems(mergeAllNotifyItems(properties.getNotifyItems())); val items = NotifyHelper.fillNotifyItems(properties.getNotifyItems(), dtpProperties.getPlatforms()); NotifyHelper.initAlarm(dtpExecutor.getThreadPoolName(), dtpExecutor.getNotifyItems(), items); dtpExecutor.setNotifyItems(items); }
大致过程就是这样。集中在DtpRegistry这个类上面。
缺陷
1.不够自动化
其实我们希望可以根据当前机器的负载与机器配置可以动态在机器允许的范围内自动调整参数。这个问题可以通过梯度下降算法优化。
2.适配不足
未对ThreadPoolExecutor和ThreadPoolTaskExecutor做很好的适配。这个问题可以通过装饰者/委托/代理模式优化。
注:转载请标明作者与出处