手写并发任务编排工具类

需求

并发场景可能存在的需求-每个执行结果的回调

CompleteableFuture大家都用过,里面有supply、then、combine、allOf等等方法,都可以用来接收一个任务,最终将多个任务汇总成一个结果。

但有一个问题,你supply一个任务后,这个任务就黑盒了。如果你编排了很多个任务,每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后汇总结果。

一个并行框架,它最好是对每一步的执行都能监控。每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。拥有回调的任务,可以监控任务的执行状况,如果执行失败、超时,可以记录异常信息或者处理个性化的默认值。

CompleteableFuture中也有一些回调方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。

并发场景可能存在的需求-依赖上游的执行结果作为入参

future1.thenCompose(f2,()->xxx)

下游依赖于上游的结果

并发场景可能存在的需求-全组任务的超时

一组任务,虽然内部的各个执行单元的时间不可控,但是可以控制全组的执行时间不超过某个值。通过设置timeOut,来控制全组的执行阈值。

1
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);

并发场景可能存在的需求-高性能、低线程数

复用线程

该框架全程无锁,没有一个加锁的地方。

创建线程量少。如上图④中场景。A会运行在B、C执行更慢的那个单元的线程上,而不会额外创建线程。

总结

一个并发框架可能需要具备哪些能力?

1、提供任何形式的串行、并行执行单元的组合

2、为每个执行单元提供执行成功、失败、超时、异常的回调

3、支持为单个执行单元设置异常、失败后的默认值

4、支持为整个group(多个任意组合的执行单元)设置超时时间。单个执行单元失败,不影响其他单元的回调和最终结果获取。如果自己依赖的任务失败,则自己也失败,并返回默认值。

5、整个group执行完毕或超时后,同步阻塞返回所有执行单元结果集,按添加的顺序返回list。也支持整个group的异步回调不阻塞主线程

6、支持每个group独享线程池,或所有group共享线程池(默认)

异步回调如何实现

上面我们总结了多线程的编排场景及实现,以及并发场景的一些潜在需求及实现。

该框架的难点和重点,主要有两点,分别是任务的顺序编排任务结果的回调

回调是个很有用的模式,譬如我的主线程执行过程中,要执行一个非常耗时的逻辑。为了不阻塞主线程,自然我们会想到用异步的形式去执行这个耗时逻辑,新建个线程,让这个耗时的逻辑在线程中执行,不阻塞主线程。但问题来了,异步执行没毛病,执行成功、失败后出结果了,该怎么通知主线程?

4.1、CompletableFuture的回调

CompletableFuture提供了许多回调的方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等。下面列举一些比较常用的回调方法,如下:

1
2
3
4
5
6
7
8
//1.计算结果完成,或者异常时执行给定action(当前线程执行)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
//2.计算结果完成,或者异常时执行给定action(另起线程执行)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
//3.执行完成时对结果进行处理,还可以处理异常
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
//4.异常时,返回指定结果
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

CompleteableFuture提供的回调方法,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。

4.2、Netty future中的回调

Netty中的回调是非常多的。netty中的future,可以添加Listener,当异步任务执行完毕后,主动回调一下自己就可以了。

整个netty里面大量充斥着类似的回调,但是如果我们要用,仅仅是针对一个或多个异步任务,希望能有个类似的回调,netty就帮不上忙了。

Netty回调的伪代码:其中doSomething是在异步线程里,而回调是在主线程里的。(回调代码是在主线程,回调任务实际是在异步线程里执行的)

1
2
3
4
5
6
7
8
9
10
//主线程
main {
//doSomething是在异步线程里,回调是在主线程
doSomething().async().addListener(new Listener(){
@Override
public void complete() throws Exception {
//do your job
}
});
}

4.3、如何自己实现一个简单回调的异步任务

首先我们有这么一个需求,有N个耗时任务,可能是IO任务,我们希望他执行时不会阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。

我们可以有这样的角色:

  1. Bootstrap主线程
  2. worker 任务
  3. listener 监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.soyo.simple;

/**
* @author: lkl
* @date: 2025/9/12 22:24
*/
public class Bootstrap {

public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
Worker worker = bootstrap.newWorker();

Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");

//2.回调方法,输出worker中的内容
bootstrap.doWorker(wrapper).addListener(new Listener() {
@Override
public void callback(Object result) {
System.out.println(Thread.currentThread().getName());
System.out.println(result);
}
});

//1.主线程不阻塞,打印当前线程
System.out.println(Thread.currentThread().getName());
}

private Wrapper doWorker(Wrapper wrapper) {
new Thread(() -> {
Worker worker = wrapper.getWorker();
String result = worker.action(wrapper.getParam());
wrapper.getListener().callback(result);
}).start();

return wrapper;
}

private Worker newWorker() {
return new Worker() {
@Override
public String action(String obj) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return obj + " callback!";
}
};
}

}


思考

核心线程数为0的线程池

作者在issue上推荐一个tomcat只使用一个不定长线程池。

AsyncTool关于多线程并发执行的问题。如果不指定线程池,默认使用不定长线程池。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

注意,newCachedThreadPool() 它的核心线程数是0,最大线程数是Integer.MAX_VALUE。可以说它的线程数是可以无限扩大的线程池。

他使用的队列是没有存储空间的,只要请求过来了,如果没有空闲线程,就会创建一个新的线程。

这种不定长线程池,适合处理执行时间比较短的任务。在高并发下,如果在耗时长的RPC\IO操作中使用该线程池,势必会造成系统 线程爆炸。

AsyncTool作者在京东使用的场景多数为低耗时(10ms)高并发,瞬间冲击的场景。这种高并发,且任务耗时较少的,适合使用不定长线程池。

但是这种低耗时的场景也不多,对于耗时较长的场景,推荐使用自定义线程池,可以避免那些耗时长的任务长时间占用线程,造成线程 ”爆炸 “,容错率更高。

总结:

  • **newCachedThreadPool**** 特性**:核心线程数 0,最大线程数无限,队列无缓存 → 任务一来没线程就新建线程。
  • 适合短耗时任务:任务执行快,线程能很快释放和复用,高并发时能快速扩容,冲击过去后线程又能被回收,不会长期暴涨。
  • 不适合长耗时任务:线程长时间被占用,新任务只能不断新建线程,最终线程数“爆炸”,导致内存和 CPU 切换压力过大,系统崩溃。
  • 结论:短平快的高并发任务用不定长线程池;耗时任务必须用自定义线程池(限制核心数、最大数和队列),避免线程无限增长。

短耗时 + 高并发场景 → 秒杀下单接口

  • 用户在 11.11 秒杀活动一开始,会有 瞬间几十万请求冲进来。
  • 每个请求要做的事情很简单:
    • 校验库存(内存中或 Redis 中)
    • 扣减库存标记
    • 返回“抢购成功/失败”结果
  • 整个处理逻辑可能只需要 5~10ms

在这种场景下:

  • 请求量短时间非常大 → 需要线程池快速扩容,防止请求被阻塞。
  • 每个任务很快执行完 → 线程会迅速释放,可以被复用,不会造成线程无限积压。
  • 高峰一过,线程池里的线程就会在 60s 内回收 → 节省资源。

👉 所以 newCachedThreadPool 就特别适合这种“短平快、高并发、瞬时冲击”的场景。

而且双11这种平台肯定是希望用户请求在机器不会崩的情况下流量越多越好,因为下单的单数越多,那么平台赚得越多

源码解析