说一说Java19中的虚拟线程
#头条创作挑战赛#
Java 19 即将发布(2022年9月20日)。虽然 Java 19 只是一个非LTS版本,但是其中的虚拟线程(Virtual Threads)和结构化并发(Structured Concurrency)是两个让人非常期待的功能。虚拟线程在 Java 19 虽然是预览功能,但是按照 Java 的发布周期,在下一个 Java LTS 版本 Java 21 中,虚拟线程就会成为正式功能。虚拟线程会为 Java 的并发编程带来颠覆性的影响。
结构化并发在 Java 19 是孵化功能。要成为正式功能还有很长的一段路要走。结构化并发会在另外的文章中介绍。
当然了,我知道不少人看到虚拟线程之后,第一反应就是:这不就是协程吗?Go 早就有了,Kotlin 也早就有了;也会有人吐槽说,虚拟线程就这?怎么不支持 ABC,也没有 DEF?比 XXX 语言的差远啦,我还不如继续回去用 XXX 呢。
其实不止是虚拟线程,Java 之前的一些新特性,被人吐槽的也非常多。Java 的泛型早就不知道被吐槽多少回了。Java 9 的模块系统,当初设想的前景很美好,但是直到现在,采用模块系统的第三方库和应用仍然非常之少。可以预期的是,虚拟线程肯定会让很多 Java 开发人员不满意。不过这是没办法的事情。作为一名 Java 程序员,这就是我们现在可以用的东西。如果它能让你的开发更简单,深入了解一下总没有坏处。
吐槽结束,言归正传。使用 Java 线程时的常见问题
作为一名 Java 程序员,在接触到多线程开发时,都会了解到这样一个重要的前提,那就是线程是稀缺资源。这个前提本身并没有什么问题,因为 Java 里面的线程封装了操作系统的线程,而线程由于其自身的开销较大, 总体的数量是存在一个上限的。在这个前提之下,或者说限制之下,Java 的多线程开发会有一些固有的模式:线程通常由线程池来管理。道理也很简单。线程是稀缺资源,创建的开销也大,就得复用。一个任务通常会在多个线程之间不断倒手。在高性能服务器端开发框架中,会有一个使用非阻塞I/O的I/O线程来接收请求。对于耗时较长的任务,这些请求会被转发给工作线程。处理完成之后再把结果通过I/O线程来返回。
由于这些固有模式,就引申出来一系列的问题,包括但不限于如下这些:Java 线程池的创建和使用。在网上你可以搜索到与线程池相关的一大堆面试八股文。在多个线程之间使用 thread local 来传递上下文对象。对于一个请求,由于整个处理流程涉及多个线程池中的线程,追踪和调试变得非常困难。
虚拟线程的出现,可以为解决这些问题提供新的思路。虚拟线程概述
在 Java 19 之前,Java 只有一种类型的线程,Java 19 中称为 平台线程 (platform thread)。平台线程与操作系统的内核线程是一一对应的。与平台线程对应的就是新增的虚拟线程。
虚拟线程是用户模式线程 ,由 Java 运行时进行调度,而不是由操作系统来调度。虚拟线程和内核线程是 M 对 N 的对应关系,也就是说, M 个虚拟线程会被映射到 N 个内核线程上 。
平台线程和虚拟线程都使用 java.lang.Thread 来表示。这就意味为开发人员不需要学习新的 API 来使用虚拟线程。
一个增加虚拟线程的重要动机是提供一种可扩展的方式来实现使用独占线程处理每个请求(thread-per-request)的并发风格。在编写服务端应用时,最自然的方式是对于每个请求,使用独占的线程来处理该请求,因为请求是相互独立的。这就是 thread-per-request 的并发风格。这种方式易于理解和编程实现,也易于调试和性能调优。
然而,thread-per-request 风格并不能简单地使用平台线程来实现。在实现上,平台线程是操作系统中线程的封装。操作系统的线程会占用资源,存在数量上限。对于一个要并发处理海量请求的服务器端应用来说,对每个请求都创建一个平台线程是不现实的。
对于这个问题,很多框架都提供了解决方案。常用的思路是依赖非阻塞 I/O 和异步编程。当某个请求在等待 I/O 操作时,它会暂时让出线程,并在 I/O 操作完成之后继续执行。通过这种方式,可以用少量的线程来同时处理大量的请求。这些框架可以提升系统的吞吐量,但是要求开发人员必须熟悉所使用的底层框架,并按照特定的风格来编写代码。
在使用虚拟线程之后,开发人员可以使用最自然的方式来编写代码,把请求的处理逻辑全部在一个虚拟线程中完成。在完成对请求的处理之后,相应的线程也会被自动销毁。这极大地降低了编写高并发服务端应用的难度。
虚拟线程是轻量级的,并不需要放入线程池中,在需要的时候创建即可。虚拟线程的调度
虚拟线程由 JDK 负责调度。JDK 把虚拟线程分配给平台线程,平台线程则由操作系统进行调度。
一个虚拟线程所分配的平台线程被称为该虚拟线程的载体。在整个生命周期过程中,一个虚拟线程可能就会被调度到多个载体上。载体的标识对于虚拟线程是不可见的。
JDK 调度虚拟线程时,使用的是一个以 FIFO 模式工作的 work-stealing ForkJoinPool。该 ForkJoinPool 的 parallelism 决定了调度时可以使用的平台线程的数量。该数量默认等于处理器的数量(通过 Runtime.availableProcessors() 获取),也可以通过系统属性 jdk.virtualThreadScheduler.parallelism 来设置。虚拟线程如何执行代码?
在执行虚拟线程的代码时,JDK 的线程调度器把虚拟线程分配到一个平台线程上执行。这个过程称为把虚拟线程绑定(mount)到平台线程。这个平台线程就成为了该虚拟线程的载体。在执行了某些代码之后,该虚拟线程可以从平台线程上解除绑定(unmount)。
当虚拟线程在等待 I/O 或是执行某些阻塞操作时,可以从平台线程上解除绑定。等阻塞操作完成之后,该虚拟线程可以被调度到新的平台线程上继续执行。虚拟线程的绑定和解除绑定操作,对于应用代码来说是透明的。
有些 JDK 中的阻塞操作并不会解除对平台线程的绑定,因此会阻塞平台线程和底层的操作系统线程。这是由于操作系统或 JDK 自身的限制,比如很多文件操作以及 Object.wait() 方法调用都会产生这个效果。这些阻塞操作的实现会在内部对此进行补偿。具体的做法是临时增加 JDK 的调度器可以使用的线程数量。因此,JDK 调度器的 ForkJoinPool 中的线程数量可能会超过 parallelism 指定的值。可以使用系统属性 jdk.virtualThreadScheduler.maxPoolSize 来指定调度器所允许的线程的最大值。
在下面两种情况下,虚拟线程在执行阻塞操作时,会被锁定(pin)在载体上而无法解除绑定:在执行 synchronized 方法或块时,在执行 native 方法或外部方法时。
虚拟线程的锁定可能会对应用的可伸缩性产生影响。当锁定发生时,调度器并不会对此进行补偿。为了避免经常出现的较长时间的锁定,可以考虑把 synchronized 方法或块替换成 java.util.concurrent.locks.ReentrantLock。不过这种替换,应该建立在进行了充分性能测试的基础上。在大多数时候,锁定的影响并没有很大。虚拟线程代码展示
介绍了这么多虚拟线程的内容之后,我们来看看到底如何在代码中使用虚拟线程。
首先是如何创建虚拟线程。创建虚拟线程的第一种方式是使用 Thread.ofVirtual() 方法。在下面的代码中,一个新的虚拟线程被创建并启动。返回值 thread 是 java.lang.Thread 类型的对象。var thread = Thread.ofVirtual().name("my virtual thread") .start(() -> System.out.println("运行中"))
第二种方式是使用 Thread.startVirtualThread(Runnable task) 方法。这个方法等同于 Thread.ofVirtual().start(task)。
第三种方式是使用 ThreadFactory,如下面的代码所示。首先创建一个 ThreadFactory,再使用 ThreadFactory 的 newThread 方法。var factory = Thread.ofVirtual().factory(); var thread = factory.newThread(() -> System.out.println("在工厂中创建"));
另外一种更常用的方式是使用 ExecutorService。ExecutorService 可以为每个任务启动一个虚拟线程。这一类的 ExecutorService 对象可以使用 Executors.newVirtualThreadPerTaskExecutor() 或 Executors.newThreadPerTaskExecutor(ThreadFactory threadFactory) 方法来创建。这一类的 Executor 对象所能创建的线程数量理论上没有上限(受限于内存)。
在下面的代码中,创建了一个使用虚拟线程的 ExecutorService 对象,并向该 ExecutorService 提交了10000个任务。每个任务会休眠 1 秒钟。运行这段代码可以发现,所需要的执行时间很短,也不需要太多的资源。try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { IntStream.range(0, 10_000).forEach(i -> executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return i; })); }
最后展示一下 JDK 内部库对虚拟线程的支持。很多 JDK 内部库已经对虚拟线程提供了支持,主要是与 HTTP 和 TCP 相关的库,可以采用 thread-per-request 的模式。
下面的代码使用 JDK 自带的 HTTP 服务器功能来实现一个返回当前时间的服务。HTTP服务器使用的Executor 对象由 Executors.newVirtualThreadPerTaskExecutor() 方法创建,对每个请求使用虚拟线程来处理 。import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.net.InetSocketAddress; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.Executors; public class SimpleHttpServer { public static void main(String[] args) throws IOException { new SimpleHttpServer().start(); } public void start() throws IOException { var server = HttpServer.create(new InetSocketAddress(8000), 0); server.createContext("/time", new TimeHandler()); server.setExecutor( Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("time-server-", 1).factory())); server.start(); System.out.println("Time server started"); } private static class TimeHandler implements HttpHandler { @Override public void handle(HttpExchange exchange) throws IOException { var response = String.format( "%s,线程是 %s", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), Thread.currentThread().getName()); exchange.sendResponseHeaders(200, response.length()); try (var out = exchange.getResponseBody()) { out.write(response.getBytes()); } } } }
运行服务器之后,访问 http://localhost:8000/time 时可以看到类似 "2022-08-31T22:38:54.1251881,线程是 time-server-4" 这样的返回结果。
以上就是关于虚拟线程的基本内容,更多的内容会在后续的文章中介绍。