深入理解Java线程池,这一篇就够了
【死记硬背】 1 理解
线程是稀缺资源,它的创建和销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态切换,为避免资源过度消耗需要设法重用线程执行多个任务。线程池就是一个线程缓存,负责对线程进行统一分配、调度和监控。2 线程池的作用
* 限定线程的个数,不会导致由于线程过多导致系统运行缓慢或崩溃。
* 线程池不需要每次都去创建和销毁,节约了资源,并且响应速度快。3 什么时候用线程池
* 单个任务处理的时间比较短。
* 需要处理的任务数量很大。4 如何设置线程池的大小
* CPU密集型任务:
CPU个数+1的线程数
* IO密集型任务:
两倍CPU个数+1的线程数5 线程池的组成
一般的线程池主要由4部分组成:
5.1 线程池管理器:用于创建并管理线程池。
5.2 工作线程:线程池中的线程。
5.3 任务接口:每个任务必须实现的接口,用于工作线程调度其运行。
5.4 任务队列:用于存放待处理的任务,提供一种缓冲机制。6 线程池的实现方式
线程池的主要实现方式有五种:newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool、newSingleThreadExecutor和newWorkStealingPool。是通过Executor框架实现的,主要用到了Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future和FutureTask这几个类。7 线程池涉及到的主要参数
7.1 corePoolSize:指定了线程池中的线程数量。
7.2 maximumPoolSize:指定了线程池中最大线程数量。
7.3 keepAliveTime:当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多少时间内会被销毁。
7.4 unit:keepAliveTime的时间单位。
7.5 workQueue:任务队列,被提交但尚未被执行的任务。
7.6 threadFactory:线程工厂,用于创建线程,一般用默认的即可。
7.7 handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。8 拒绝策略
JDK内置的拒绝策略如下:
8.1 AbortPolicy:直接抛出异常,阻止系统正常运行。
8.2 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
8.3 DiscardOldestPolicy:丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
8.4 DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略扔无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。9 Java线程池的工作过程
9.1 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
9.2 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池
会抛出异常 RejectExecutionException。
9.3 当一个线程完成任务时,它会从队列中取下一个任务来执行。
9.4 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
【答案解析】
五种常用线程池的实现方式:newCachedThreadPoolimport java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 它是一个可以无限扩大的线程池; * 它比较适合处理执行时间比较小的任务; * corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大; * keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死; * 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来, * 就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。 */ public class CachedThreadPoolTest { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i=0;i<1000;i++){ /*try{ Thread.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); }*/ cachedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"正在被执行"); } }); System.out.println("执行完毕"); } } }newFixedThreadPoolimport java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 它是一种固定大小的线程池; * corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads; * keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉; * 但这里keepAliveTime无效; * 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列; * 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务; * 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效 */ public class FixedThreadPoolTest { public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i=0;i<10;i++){ final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { try{ System.out.println(Thread.currentThread().getName()+"正在执行"+index); Thread.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); } } }); } } }newScheduledThreadPoolimport java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * 它接收SchduledFutureTask类型的任务,有两种提交任务的方式: * 1 scheduledAtFixedRate * 2 scheduledWithFixedDelay * SchduledFutureTask接收的参数: * time:任务开始的时间 * sequenceNumber:任务的序号 * period:任务执行的时间间隔 * 它采用DelayQueue存储等待的任务 * DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序; * DelayQueue也是一个无界队列; * 工作线程的执行过程: * 工作线程会从DelayQueue取已经到期的任务去执行; * 执行结束后重新设置任务的到期时间,再次放回DelayQueue */ public class ScheduledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); /**/ scheduledThreadPool.schedule(new Runnable(){ public void run(){ System.out.println("延迟1秒执行"); } }, 1, TimeUnit.SECONDS); scheduledThreadPool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { System.out.println("执行"); } },1,1,TimeUnit.SECONDS); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("执行3"); } },1,3,TimeUnit.SECONDS); } }newSingleThreadExecutorimport java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 它只会创建一条工作线程处理任务; * 采用的阻塞队列为LinkedBlockingQueue; */ public class SingleThreadExecutorTest { public static void main(String[] args) { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { public void run() { try { System.out.println(Thread.currentThread().getName() + "正在被执行,打印的值是:" + index); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }newWorkStealingPoolimport java.util.concurrent.*; public class WorkStealingPoolTest { // 线程数 private static final int threads = 10; // 用于计数线程是否执行完成 static CountDownLatch countDownLatch = new CountDownLatch(threads); public static void main(String[] args) throws Exception{ test3(); } public static void test1() throws Exception{ System.out.println("---- start ----"); ExecutorService executorService = Executors.newWorkStealingPool(); for (int i = 0; i < threads; i++) { executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName()); } catch (Exception e) { System.out.println(e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); System.out.println("---- end ----"); } public static void test2() throws InterruptedException{ System.out.println("---- start ----"); ExecutorService executorService = Executors.newWorkStealingPool(); for (int i = 0; i < threads; i++) { // Callable 带返回值 executorService.submit(new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } })); } countDownLatch.await(); System.out.println("---- end ----"); } public static void test3() throws Exception{ System.out.println("---- start ----"); ExecutorService executorService = Executors.newWorkStealingPool(); for (int i = 0; i < threads; i++) { // Runnable 带返回值 FutureTask<?> futureTask = new FutureTask<>(new Callable() { /** * call * @return currentThreadName */ @Override public String call() { return Thread.currentThread().getName(); } }); executorService.submit(new Thread(futureTask)); System.out.println(futureTask.get()); } System.out.println("---- end ----"); } }
【温馨提示】
点赞+收藏文章,关注我并私信回复【面试题解析】,即可100%免费领取楼主的所有面试题资料!