0%

Guava学习之异步编程

前言

其实Java里的异步的情况不是特别多,之前处处都是同步的处理的。
但是感觉自从Node火起来之后,Java中的异步也开始火了起来。
比如Vertx这个被称为Java版的Node库。

之前一直用的是JDK8自带的CompletableFuture,在Presto的代码里经常看到Guava的异步的Listener,所以这里也来学习一番。


ListeningExecutorService

Guava为了支持自己的Listener模式,新建了一种ExecutorService,叫做ListeningExecutorService
我们可以使用MoreExecutor去创建它。

1
2
3
4
//创建一个由invode线程去运行的线程池
ListeningExecutorService executorService = MoreExecutors.newDirectExecutorService();
//装饰一个自己的线程池返回
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

创建完毕之后我们就可以创建自己的ListenableFuture

1
2
3
ListenableFuture<?> listenableFuture = executorService.submit(() -> {
System.out.println("Hello");
});


addListener

上面创建完成自己的ListenableFuture之后,可以为他添加Listener

1
2
3
4
5
6
7
ListenableFuture<?> listenableFuture = executorService.submit(() -> {
System.out.println("Hello");
});

listenableFuture.addListener(() -> {
System.out.println("world");
}, executorService);

上面这是没有返回值的情况,如果我们的是有返回值的呢。
就是调用下面的CallBack


addCallBack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ListenableFuture<String> listenableFuture = executorService.submit(() -> {
System.out.println("Hello");
return "Hello";
});

Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println("get " + result);
}

@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, executorService);


Futures.allAsList

这个方法用来把多个ListenableFuture组合成一个。
当其中一个Future失败或者取消的时候,将会进入失败或者取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");

ListenableFuture<Integer> future2 = executorService.submit(() -> 2);

ListenableFuture<List<Object>> future = Futures.allAsList(future1, future2);

future.addListener(() -> {
System.out.println("Done!");
}, executorService);

Futures.addCallback(future, new FutureCallback<List<Object>>() {
@Override
public void onSuccess(@Nullable List<Object> result) {
System.out.println(result);
}

@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, executorService);


Futures.transform[Async]

这个方法用于转换返回值

1
2
3
4
5
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");

ListenableFuture<Integer> listenableFuture = Futures.transform(future1, String::length, executorService);

System.out.println(listenableFuture.get());

这个是同步的方法,如果需要异步的执行

1
2
3
4
5
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");

ListenableFuture<Integer> listenableFuture = Futures.transformAsync(future1, input -> Futures.immediateFuture(input.length()), executorService);

System.out.println(listenableFuture.get());


Futures.successfulAsList

和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。


immediateFuture和immediateCancelledFuture

这个两个类主要就是包装同步结果返回一个Future的。
其实内部结果已经确定了。
这两个的isDone的返回值不同。
immediateFuture是True而immediateCancelledFuture是false


SettableFuture

感觉这是个异步执行,同步获取的方法,只是用起来很方便。
如果我们在一个线程中需要等待另外一个线程的异步任务。
那么我们就可以去设置一个SettableFuture
但是在进行get获取的时候,是同步阻塞的。

1
2
3
4
5
6
7
8
SettableFuture<String> settableFuture = SettableFuture.create();

executorService.submit(() -> {
settableFuture.set("Hello");
});

System.out.println(settableFuture.isDone());
System.out.println(settableFuture.get()); //blocked


JdkFutureAdapters

一个适配器的类,把JDKFuture转化成ListenableFuture

1
2
3
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> stringFuture = executorService.submit(() -> "hello,world");
ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(stringFuture);