简介
说明
本文介绍CompletableFuture这个异步编程API。
JDK8新加CompletableFuture,实现了Future<T>, CompletionStage<T>两个接口。
其位置:java.util.concurrent.CompletableFuture;
Future与CompletableFuture对比
Future与CompletableFuture相比,主要缺点如下(下边这些功能CompletableFuture都有):
- 不支持手动完成
- 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
- 不支持回调函数
- 无法在获取任务之后执行额外的任务。因为Future的get方法会一直阻塞到任务完成,Future不支持回调函数,所以无法实现这个功能。
- 不支持链式调用
- 对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。
- 不支持多个Future合并
- 比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
- 不支持异常处理
- Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
与函数式接口的关系
CompletableFuture的方法名称与函数式接口有很大的关系,所以此处特地列出来。
接口定义 | 方法 | 说明 |
Runnable | void run(); | 参数:无。返回值:无。 |
Function< T, R > | R apply(T t); | 参数:T对象。返回值:R对象。 |
Consumer< T > | void accept(T t); | 参数:T对象。返回值:无 |
Supplier< T > | T get(); | 参数:无。返回值:T对象 |
BiConsumer<T, U> | void accept(T t, U u); | 参数:T对象。返回值:boolean |
创建
简介
CompletableFuture提供了如下四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync:不支持返回值。因为它接收Runnable接口,Runnable是没有返回值的。
- supplyAsync:支持返回值。因为它接收Supplier接口,Supplier是有返回值的。
- 若executor参数没有设置值,那么会使用ForkJoinPool.commonPool默认线程池执行任务。实际业务中我们是严谨手动创建线程的。
- ForkJoinPool 的线程数默认是 CPU 的核心数。但是,不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。
实例(无Executor)
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("runAsync")); future.get();
执行结果:
runAsync
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync"); return "supplyAsync的返回值"; }); System.out.println(future.get());
执行结果:
supplyAsync
supplyAsync的返回值
CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAsync"); }); String result = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync"); return "supplyAsync的返回值"; }).get(); System.out.println(result);
执行结果
ForkJoinPool.commonPool-worker-9 执行异步任务 runAsync ForkJoinPool.commonPool-worker-9 执行异步任务 supplyAsync supplyAsync的返回值
实例(有Executor)
package org.example.a; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Demo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAsync"); }, executor); String result = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync"); return "supplyAsync的返回值"; }, executor).get(); System.out.println(result); } }
执行结果
pool-1-thread-1 执行异步任务 runAsync pool-1-thread-2 执行异步任务 supplyAsync supplyAsync的返回值
Future功能
CompletableFuture实现了Future接口,所以它有Future的所有功能。
package org.example.a; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class Demo { public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException { CompletableFuture<String> completableFuture = new CompletableFuture<String>(); Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println("子线程执行"); completableFuture.complete("success"); } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread t1 = new Thread(runnable); t1.start(); try { //主线程阻塞,等待完成 String result = completableFuture.get(); System.out.println("主线程获得结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
执行结果
子线程执行 主线程获得结果: success
顺序执行
顺序执行都是thenXxx
thenXxx: 在本线程中等待任务结束,然后执行下一步。
thenXxxAsync:在异步线程中等待任务结束,然后执行下一步。
上边这两条的具体含义见下边“thenRun系列”的两个示例。本部分“顺序执行”的其他部分都是这样的。
CompletableFuture<Void> thenRun(Runnable action) CompletableFuture<Void> thenRunAsync(Runnable action) CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) CompletableFuture<Void> thenAccept(Consumer<? super T> action) CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
thenRun
说明
不接受参数,也不返回结果,不进不出。
在本线程中等待
CompletableFuture .runAsync(() -> System.out.println(Thread.currentThread().getName() + " runAsync 执行")) .thenRun(() -> System.out.println(Thread.currentThread().getName() + " thenRun 执行"));
执行结果
ForkJoinPool.commonPool-worker-9 runAsync 执行 main thenRun 执行
在异步线程中等待
CompletableFuture .runAsync(() -> System.out.println(Thread.currentThread().getName() + " runAsync 执行")) .thenRunAsync(() -> System.out.println(Thread.currentThread().getName() + " thenRunAsync 执行"));
执行结果
ForkJoinPool.commonPool-worker-9 runAsync 执行 ForkJoinPool.commonPool-worker-9 thenRunAsync 执行
thenAccept
接受参数,但不返回结果,只进不出。
CompletableFuture .supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " runAsync 执行"); return "supplyAsync返回值"; }) .thenAccept(i -> System.out.println(Thread.currentThread().getName() + " thenRunAsync 执行。上一步结果:" + i));
执行结果
ForkJoinPool.commonPool-worker-9 runAsync 执行 main thenRunAsync 执行。上一步结果:supplyAsync返回值
thenApply
接收参数,也输出结果。
CompletableFuture<String> completableFuture = CompletableFuture .supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " runAsync 执行"); return "supplyAsync返回值"; }) .thenApply(i -> { System.out.println(Thread.currentThread().getName() + " thenRunAsync 执行。上一步结果:" + i); return "thenApply返回值"; }); System.out.println(completableFuture.get());
执行结果
ForkJoinPool.commonPool-worker-9 runAsync 执行 ForkJoinPool.commonPool-worker-9 thenRunAsync 执行。上一步结果:supplyAsync返回值 thenApply返回值
thenCompose
将多个CompletableFuture组合。
CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 执行"); return "supplyAsync(第1个)返回值"; }) .thenCompose(value -> CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 执行"); return value + "; supplyAsync(第2个)返回值"; }) ); System.out.println(future.get());
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 执行 ForkJoinPool.commonPool-worker-9 supplyAsync(第2个) 执行 supplyAsync(第1个)返回值; supplyAsync(第2个)返回值
合并任务
thenCombine(合并两个/有返回值)
简介
合并两个没有依赖关系的CompletableFutures任务。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
实例
CompletableFuture<Integer> future1= CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 执行"); return 1; }); CompletableFuture<Integer> future2= CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 执行"); return 2; }); CompletableFuture<Integer> result= future1.thenCombine(future2,(number1,number2)->{ return number1+number2; }); System.out.println(result.get());
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 执行 ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 执行 3
allOf/anyOf(合并多个/有返回值)
上面说的是两个任务的合并,那么多个任务需要使用allOf或者anyOf方法。
allOf
会等待所有的任务执行完毕。
注意,allOf返回值为CompletableFuture<Void>型,所以无法获得返回值。
Random rand = new Random(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束"); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束"); } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); CompletableFuture<Void> f = CompletableFuture.allOf(future1, future2); f.get(); //会阻塞在这里,等待所有的任务执行完毕 System.out.println("所有任务执行完毕");
执行结果(因为是随即睡眠,所以下边结果中的第一行和第二行可能会调换顺序)
ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 睡眠结束 ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 睡眠结束 所有任务执行完毕
anyOf
等待任一任务执行完毕。
Random rand = new Random(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束"); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束"); } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); CompletableFuture<Object> f = CompletableFuture.anyOf(future1,future2); System.out.println(f.get());
执行结果(因为是随即睡眠,所以下边结果中的第一行也可以能是第1个线程)
ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 睡眠结束 abc
thenAcceptBoth(合并两个/无返回值)
简介
两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。
位置是相对应的。比如下边的例子中,由于future1调用thenAccept,将future2作为参数。此时,无论哪个先执行完,输出结果总是:abcdef。
实例
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束"); return "abc"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束"); return "def"; }); future1.thenAcceptBoth(future2, (x, y) -> System.out.println(x + y)).get();
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync(第2个) 睡眠结束 ForkJoinPool.commonPool-worker-2 supplyAsync(第1个) 睡眠结束 abcdef
acceptEither(合并两个/无返回值)
简介
谁执行返回的结果快,就用那个CompletionStage的结果进行下一步的消耗操作。
注意:所有的CompletableFuture的返回值必须是一样的。比如下边,如果future1为CompletableFuture<Integer>,future2为CompletableFuture<String>则会直接报错。
实例
Random rand = new Random(); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 睡眠结束"); } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 睡眠结束"); } catch (InterruptedException e) { e.printStackTrace(); } return "def"; }); future1.acceptEither(future2, x -> System.out.println(x)).get();
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 睡眠结束 abc
applyToEither(有返回值)
简介
两个CompletionStage,谁执行返回的结果快,就用那个CompletionStage的结果进行下一步的转化操作。
实例
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync(第1个) 执行"); return "abc"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync(第2个) 执行"); return "def"; }); String result = future1.applyToEither(future2, x -> { System.out.println(Thread.currentThread().getName() + " applyToEither 执行"); return "此返回值所在的线程执行的快:" + x; }).get(); System.out.println(Thread.currentThread().getName() + " 结果:" + result);
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync(第1个) 执行 ForkJoinPool.commonPool-worker-2 supplyAsync(第2个) 执行 main applyToEither 执行 main 结果:此返回值所在的线程执行的快:abc
回调/异常
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
whenComplete(无返回值)
简介
接受2个参数,第一个参数为上一次任务的返回结果,第二个参数表示异常对象,该方法是只消费的,不会有返回结果。
whenComplete
异步子线程执行完之后,会在外部线程中执行接下来的操作。
CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync 执行"); return "abc"; }).whenComplete((s, throwable) -> { System.out.println(Thread.currentThread().getName() + " whenComplete 执行"); System.out.println("上一步的结果:" + s); System.out.println("上一步的异常:" + throwable.getMessage()); });
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync 执行 main whenComplete 执行 上一步的结果:abc
CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync 执行"); int i = 1 / 0; return "abc"; }).whenComplete((s, throwable) -> { System.out.println(Thread.currentThread().getName() + " whenComplete 执行"); System.out.println("上一步的结果:" + s); System.out.println("上一步的异常:" + throwable.getMessage()); });
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync 执行 ForkJoinPool.commonPool-worker-9 whenComplete 执行 上一步的结果:null 上一步的异常:java.lang.ArithmeticException: / by zero
whenCompleteAsync
异步子线程执行完之后,在这个异步子线程中执行接下来的操作。
CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync 执行"); return "abc"; }).whenCompleteAsync((s, throwable) -> { System.out.println(Thread.currentThread().getName() + " whenComplete 执行"); System.out.println("上一步的结果:" + s); System.out.println("上一步的异常:" + throwable.getMessage()); });
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync 执行 ForkJoinPool.commonPool-worker-9 whenComplete 执行 上一步的结果:abc
handle
和whenComplete方法对比
相同点:此方法也是接受2个参数,第一个参数为上一次任务的返回结果,第二个参数表示异常对象,
不同点:handle需要有返回结果,所以这个方法的使用场景可以是:尽管之前的执行的任务异常,仍然需要有默认返回值。
String result = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync 执行"); int i = 1 / 0; return "abc"; }).handle((s, throwable) -> { System.out.println(Thread.currentThread().getName() + " whenComplete 执行"); System.out.println("上一步的结果:" + s); if (throwable != null) { System.out.println("上一步的异常:" + throwable.getMessage()); return "产生异常了。"; } return "上一步的返回值:" + s; }).get(); System.out.println(result);
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync 执行 ForkJoinPool.commonPool-worker-9 whenComplete 执行 上一步的结果:null 上一步的异常:java.lang.ArithmeticException: / by zero 产生异常了。
exceptionally
简介
此方法和handle类似,不过它只接受一个参数即异常对象,且也是需要返回一个结果。
实例
String result = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync 执行"); int i = 1 / 0; return "abc"; }).exceptionally(throwable -> { System.out.println(Thread.currentThread().getName() + " exceptionally 执行"); return "产生异常了"; }).get(); System.out.println(result);
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync 执行 ForkJoinPool.commonPool-worker-9 exceptionally 执行 产生异常了
线程阻塞
get()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync 执行"); return "abc"; }); String s = future.get(); System.out.println(s);
执行结果
ForkJoinPool.commonPool-worker-9 supplyAsync 执行 abc
join()
和get用法一样,都可以用来堵塞主线程,且可以获取到future的值,不同的是,当他们抛出异常时会有所区别。
请先
!