发布于 

CompletableFuture

JAVA1.5开始有个Future类,允许异步返回结果,但是对于Future的Consumer来说,他在获取结果(Future.get())时,还是需要阻塞等待的。JAVA8提供了一个
新的类CompletableFuture,更好的支持了异步特性,使用类似于回调方法的方式,使得consumer不需要再阻塞等待。

提取/修改包装的值

创建一个CompletableFuture对象,返回给你的客户端,只要当你的结果可用时,调用complete()方法,就会给所有等待这个future值的客户端解锁。

1
2
3
4
public CompletableFuture<String> ask() {
final CompletableFuture<String> future = new CompletableFuture<>();
return future;
}

所有调用ask().get()的线程将一直阻塞,直到future.complete("result")发生。

future.complete()只能调用一次,后续的重复调用会失效。

CompletableFuture.obtrudeValue(…)可以覆盖Future之前的值,小心使用

1
2
3
4
5
6
7
8
9
 /**
* If not already completed, causes invocations of {@link #get()}
* and related methods to throw the given exception.
*
* @param ex the exception
* @return {@code true} if this invocation caused this CompletableFuture
* to transition to a completed state, else {@code false}
*/
public boolean completeExceptionally(Throwable ex)

如果future还没有complete,那么调用此方法可以使get()方法抛出指定异常。

创建和获取CompletableFuture

除了手动创建,还可以通过下面的静态方法创建CompletableFuture:

1
2
3
4
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

注意runAsync中的runnable不会返回任何值,所以得到的是Completable对象,如果想要异步处理并且得到返回结果,应该使用supplyAsync(…):

1
2
3
4
5
6
7
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);

CompletableFuture上的转换和操作(thenApply)

CompletableFuture类似于Scala和JavaScript的Future,允许future completed的时候注册异步回调。我们不需要等待和阻塞。

1
2
3
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

thenApply会在complete future的那个线程调用方法,而…ApplyAsync会在另一个线程池异步调用该方法。
例子:

1
2
3
CompletableFuture<String> f1 = //...
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

f1 complete的时候,后面的thenApply就会被执行,从String转为Integer再转为Double。

完成时运行代码(Running code on completion)(thenAccept/thenRun)

1
2
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

thenAccept()/thenRun()方法不阻塞。把它们当成添加到Futured的事件监听器,在未来某个时间会被执行。

单个CompletableFuture的错误处理

1
2
CompletableFuture<String> safe =
future.exceptionally(ex -> "We have a problem: " + ex.getMessage());

两个CompletableFuture组合在一起

组合(连接)两个Future(thenCompose())

有时候我们想在future的值上执行某些方法,但是这个方法返回的也是一个Future.我们期待返回的是一个顶级的Future,而不是一个CompletableFuture<CompletableFuture>这样的东西。thenCompose()类似于Scala中的flatMap,thenApply类似于Scala中的map。

1
2
3
4
5
6
7
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture =
docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc) //...

注意这里thenApply和thenCompose返回结果的区别。

两个Future的值的变换(thenCombine())

thenCompse用来连接一个依赖于另一个Future的Future,而thenCombine把两个独立的Future组合起来,当它们都可用的时候。

1
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

例子:

1
2
3
4
5
6
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...
等待两个CompletableFuture完成

这次不是想组合两个CompletableFuture,而仅仅是想当他们都完成时被通知,我们可以使用thenAcceptBoth()/runAfterBoth() 系列的方法。他们类似于thenAccept/thenRun,但是等待两个Future完成。

1
2
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

有人会问为什么不直接阻塞等待两个Future完成?如下:

1
2
3
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

当然你也可以这么做,但是CompletableFuture的宗旨是允许异步,事件驱动编程模型而不是阻塞等待结果。所以功能上两种用法是一样的,但是后面一种会没必要地占用一条线程来等待。

等待第一个CompletableFuture完成

当你有两个产生相同类型结果的任务时,你可能只关心返回时间,而不关系哪个任务先完成。

1
2
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

例子:

1
2
3
4
5
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Result: " + s);
});

fast返回更快但是不准确的结果,predictable返回更准确但更慢的结果,为了性能和准确性的平衡,我们可能同时调用两个系统,并且只拿第一个返回的继续下面的操作,并不关心它到底是哪个返回的。

转换第一个完成的

applyToEither() 是acceptEither的老兄弟。applyToEither会返回一个Future,而acceptEither只是当两个Future完成时执行一些代码。这个Future会在它依赖的两个Future中的最快完成的那个完成时完成。

1
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)

组合多个CompletableFuture

上面教了我们怎么等待两个Future完成(thenCombine())和等待第一个完成(applyToEither())。使用static帮助方法可以扩展到任意个Future。

1
2
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)

allOf会等待所有Future完成,anyOf会等待最快的Future完成。

参考和翻译于:Java 8: Definitive guide to CompletableFuture