Future和CompletableFuture
Future和CompletableFuture
FutureTask
1 | /* |
Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模 了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。
一个简单例子
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
程序结果
1 | doSomethingElse |
上述代码让我们可以以并发得方式调用另一个线程执行耗时操作得同时,也可以去执行一些其他的任务。启动异步任务,后我们可以调用它得get()方法获取操作返回值,如果通过get获取时程序在传入规定时间还没有返回值,则会抛出异常,获取时还没有生成返回值则会进行阻塞。
理解了上述执行流程,我们看一下Future接口提供的几个基本方法
Modifier and Type | Method and Description |
---|---|
boolean |
cancel(boolean mayInterruptIfRunning ) 尝试取消执行此任务。 |
V | get() 等待计算完成,然后检索其结果。 |
V | get(long timeout, TimeUnit unit) 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。 |
boolean |
isCancelled() 如果此任务在正常完成之前被取消,则返回 true 。 |
boolean |
isDone() 返回 true如果任务已完成。 |
我们看到get方法有两个重载得方法,get(long timeout,TimeUnit
unit) 和get()的区别是如果在指定时间内没有得到结果的返回值,则会抛出异常,程序会继续运行,而get()方法则会阻塞主线程进行等待,知道获取返回值,如果异步线程是一个极其耗费时间的操作,则会导致主线程进入漫长的等待
Future 接口的局限性
通过上述例子可以看到Future接口可能会导致程序进入漫长的等待或者直接抛弃获取值操作,虽然我们可以通过isDone()检测程序是否运行完毕,但是始终还是存在一定的运行阻塞与等待
如果我们有这样一个需求:我们需要完成两个计算任务,当前一个计算任务结束后,需要将改计算的结果通知到另一个长时间运行的计算任务,当两个计算任务都结束后,将两个计算任务合并后输出。 新的CompletableFuture
类 会很方便的解决这类问题。
CompletableFuture
1 | public class CompletableFuture<T> implements Future<T>, CompletionStage<T> |
查看类图可知,CompletableFuture
实现了Future接口和CompletionStage
接口,也就是说在原有Future接口的基础上提供了一些扩展,自然功能更为强大
CompletionStage
接口
CompletionStage
是 Java 8 引入的接口,用于支持异步计算和处理结果的操作。它是 Java 中的一个 Future 接口的扩展,并提供了更强大的异步编程模型
CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段- 一个阶段的计算执行可以是一个
Funcation
、Consumer、Runnable。比如:stage.thenApply (x->square(x)).thenAccept(x->System.out.println(x)).thenRun(()->{System.out.println()});
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发.有些类似Linux系统的管道分隔符传参数
示例
1 | public class CompletableFutureExample { |
结果
1 | Main thread continues to execute... |
如上所示CompletableFuture
使用静态方法构造CompletableFuture
对象,常用的静态构造方法如下所示
常用静态构造方法
Modifier and Type | Modifier and Type | describe |
---|---|---|
CompletableFuture |
runAsync (Runnablerunnable ) |
无返回值 |
CompletableFuture |
runAsync (Runnablerunnable , Executorexecutor ) |
无返回值,指定线程池 |
CompletableFuture |
supplyAsync(Suppliersupplier) | 有返回值 |
CompletableFuture |
supplyAsync(Suppliersupplier, Executor executor) | 有返回值,指定线程池 |
为什么不用构造方法呢?我们看一下jdk文档的描述
构造方法摘要
Constructor and Description |
---|
CompletableFuture () 创建一个新的不完整的CompletableFuture 。 |
默认空参构造器只能新建一个不完整的CompletableFuture
对象,原因简略看一下
1 | /** |
1 | static <U> CompletableFuture<U> asyncSupplyStage(Executor e, |
supplyAsync
静态方法在执行时,也new了一个新的ComPletableFuture
对象,不同的是,supplyAsync
方法调用asyncSupplyStage
后使用e.execute
开启了线程。所以ComPletableFuture
空参构造只是一个ComPletableFuture
对象,还需要进一步开启线程,其他静态构造方法原理一致。
线程池相关
通过上面的构造方法可以看到,相同两个方法名之间的区别就是是否指定了新的线程池,如果没有指定线程池,则会使用ComPletableFuture
内置的线程池ForkJoinPool
。
实现源码如下
1 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { |
1 | /** |
可以看到,这里只是做了一个三目运算来判断是否指定了线程池。
简单实践
我们指定一个字符串列表,取出每个字符串并获得第一个字符 ,等待一秒后,与一个随机数相乘, 比如字符串”hello” ,取出h,等待一秒后,用h的ASCII码与一个随机浮点数相乘。
Stream流方式
1 | public static void main(String[] args) { |
执行时间为: 4108毫秒
并行流方式
1 | public static void main(String[] args) { |
改动之处:stream()—》parallelStream()
执行时间为:1067毫秒
CompletableFuture实现
1 | public static void main(String[] args) { |
执行时间为:1072毫秒
可以看到,使用并行流和CompletableFuture实现比原先顺序流要快了很多,而并行流和CompletableFuture好像并没有多大差别。这是因为什么呢?
并行流和CompletableFuture差别
并行流和CompletableFuture内部采用的是同样的通用线程池也就是上文说的ForkJoinPool ,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()
的返回值。
而CompletableFuture不止可以使用默认的线程池,我们也可以根据自己的实际需求提供对应的线程池对象,尤其是线程池的大小,这是并行流API无法提供的,而CompletableFuture也并非仅仅为了实现多线程,更重要是是弥补Future接口使用带来的缺陷和不足。
执行流程图
而CompletableFuture实现则是先用顺序流返回了一个CompletableFuture对象,相当于收集了一个多线程集合,然后顺序启动多线程。
CompletableFuture 常用方法
获取结果
- public T get( ) 实现了Future接口的方法,等待线程执行获取结果,可能会阻塞线程去等待
- public T get(long timeout, TimeUnit unit) 实现了Future接口的方法,等待线程获取结果,超过指定时间抛出异常
- public T getNow(T valuelfAbsent) 相当于三元运算,需要立即获取结果,如果没有则使用valuelfAbsent
- public T join( ): join方法和get( ) 方法作用几乎,不同的是,join方法不抛出异常
简单演示一下getNow()方法
1 | public static void main(String[] args) { |
结果:
1 | now1: 3 |
对计算结果进行处理
(thenApply、handle)
**thenApply(Function<? super T,? extends U> fn)
**:
- 该方法用于在异步任务完成后对结果进行转换,返回一个新的
CompletableFuture
对象。 - 它接受一个
Function
参数,该函数将任务的结果类型T
转换为类型U
。 thenApply()
方法会等待当前任务完成,并将任务的结果作为参数传递给函数,然后返回一个新的CompletableFuture
对象,该对象表示对结果进行转换后的值。
1 | public static void main(String[] args) { |
结果
1 | 588 |
可以看到thenApply就是把上一个结果传递给下一个CompletableFuture
对象,相当于链式调用。
上述代码也可以这样实现,也就是链式调用
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "Hello") |
**handle(BiFunction<? super T,Throwable,? extends U> fn)
**:
- 该方法用于在异步任务完成后对结果进行处理,无论任务是否发生异常,都会执行处理逻辑,并返回一个新的
CompletableFuture
对象。 - 它接受一个
BiFunction
参数,该函数接受任务的结果类型T
和可能发生的异常类型Throwable
,并返回类型U
。 handle()
方法会等待当前任务完成,并将任务的结果或异常作为参数传递给函数,然后返回一个新的CompletableFuture
对象,该对象表示处理后的结果。
handle方法和thenApply方法类似,不同的是handle在中间操作时出现异常不会停止任务,而是把异常传递给下一个Completable对象
1 | public static void main(String[] args) { |
结果
1 | 20 |
稍微改动一点
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1/0) |
结果
1 | Exception occurred: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero |
也就是出现了异常并捕获,但是并没有影响结果执行。
对计算结果进行消费
(thenRun、thenAccept、thenApply)
thenRun(Runnable action)
:
- 该方法在异步任务完成后执行指定的操作,不接收任务的结果。
- 它接受一个
Runnable
参数,表示要执行的操作。 thenRun()
方法会等待当前任务完成后,执行指定的操作,无论前一个任务的结果是什么。
1 | public static void main(String[] args) { |
结果
1 | Task completed |
thenAccept(Consumer<? super T> action)
:
- 该方法在异步任务完成后对结果进行消费,不返回任何结果。
- 它接受一个
Consumer
参数,表示对结果进行消费的操作。 thenAccept()
方法会等待当前任务完成后,将任务的结果作为参数传递给操作进行消费。
通过传参可以看到,该方法是一个消费型接口,对传入数据进行消费,不返回结果。
1 | public static void main(String[] args) { |
结果:
1 | Result: Hello world |
thenApply方法同上
对计算结果进行合并
(thenCombine、thenAcceptBoth、runAfterBoth)
thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
:
- 该方法用于组合两个异步任务的结果,并在两个任务都完成后执行指定的操作,并返回一个新的
CompletableFuture
对象。 - 它接受另一个
CompletableFuture
对象other
和一个BiFunction
参数,该函数接受两个任务的结果类型T
和U
,并返回类型V
。 thenCombine()
方法会等待当前任务和other
任务都完成后,将两个任务的结果作为参数传递给函数,并返回一个新的CompletableFuture
对象,表示对结果进行组合后的值。
1 | public static void main(String[] args) { |
结果
1 | 200 |
thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T, ? super U> action)
:
- 该方法用于在两个异步任务都完成后执行指定的操作,不返回任何结果。
- 它接受另一个
CompletableFuture
对象other
和一个BiConsumer
参数,该函数接受两个任务的结果类型T
和U
,并执行指定的操作。 thenAcceptBoth()
方法会等待当前任务和other
任务都完成后,将两个任务的结果作为参数传递给操作进行消费。
1 | public static void main(String[] args) { |
使用方法与thenCombine类似,区别在于thenCombine有返回,而thenAcceptBoth无返回值
结果不言而喻
1 | Combined result: 30 |
runAfterBoth(CompletableFuture<?> other, Runnable action)
:
该方法用于在两个异步任务都完成后执行指定的操作,不接收任务的结果。
它接受另一个
CompletableFuture
对象other
和一个Runnable
参数,表示要执行的操作。runAfterBoth()
方法会等待当前任务和other
任务都完成后,执行指定的操作。
1 | public static void main(String[] args) { |
相较于上面两个方法,runAfterBoth 又有些不同,他既不接收返回值,也不返回,而是等待两个future执行好了之后在去执行,所以一般用于在程序结束
结果
1 | Both tasks completed |
对计算速度进行选用
(applyToEither、acceptEither、runAfterEither)
applyToEither(CompletableFuture<? extends T> other, Function<? super T, U> fn)
:
- 该方法接受另一个
CompletableFuture
对象other
和一个Function
参数,并返回一个新的CompletableFuture
对象。 applyToEither()
方法会等待当前任务和other
任务中的任意一个完成,并将首先完成的任务的结果作为参数传递给函数进行转换。- 返回的
CompletableFuture
对象的结果类型为U
,表示对首先完成任务的结果进行转换后的值。
1 | public static void main(String[] args) { |
结果
1 | -------------------------- |
很明显返回的是 future2 的20
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
acceptEither
是 CompletableFuture
类中的一个方法,用于处理两个独立的异步任务,并且只接受其中一个操作的结果。可以这么理解:如果第一个异步任务完成了,则使用它的结果进行处理,否则使用第二个异步任务的结果
示例
1 | public static void main(String[] args) { |
结果
1 | 10 |
runAfterEither方法签名
1 | public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action); |
runAfterEither
方法接收两个参数:一个是另一个 CompletionStage
对象,另一个是一个 Runnable
或者 Consumer
对象。当其中任意一个 CompletionStage
完成(无论是正常完成还是异常完成),就会执行指定的操作。
1 | CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { |
我们创建了两个 CompletableFuture
对象 future1
和 future2
,分别代表两个异步任务。然后,我们调用 runAfterEither
方法,传递了这两个 CompletableFuture
对象和一个 Runnable
对象作为参数。当 future1
或者 future2
中的任意一个完成时,即可执行传入的操作,这里是简单地打印一条消息。
多任务组合
allOf(CompletableFuture<?>... futures)
:
- 该方法接受一个
CompletableFuture
数组作为参数,并返回一个新的CompletableFuture
对象。 allOf()
方法等待所有给定的任务都完成后,返回一个新的CompletableFuture
对象。- 返回的
CompletableFuture
对象的结果为Void
,表示所有任务都已完成。
1 | public static void main(String[] args) { |
allof方法使用类似于遍历一个CompletableFuture数组并执行,不同是的,他会等待所有结果出来后才会打印 thenRun()后得内容。而不返回任何内容。
结果不言而喻
1 | All tasks completed |
anyOf(CompletableFuture<?>... futures)
:
- 该方法接受一个
CompletableFuture
数组作为参数,并返回一个新的CompletableFuture
对象。 anyOf()
方法等待任何一个给定的任务完成后,返回一个新的CompletableFuture
对象。- 返回的
CompletableFuture
对象的结果为第一个完成的任务的结果。
1 | public static void main(String[] args) { |
我们创建了三个Completable对象,并一次加入一个新的CompletableFuture.anyof方法中,三个线程分别等待不同的时间,我们预计anyFuture接收第一个运行结束并返回得CompletableFuture对象
结果
1 | 42 |