手写并发任务编排工具类
需求
并发场景可能存在的需求-每个执行结果的回调
CompleteableFuture大家都用过,里面有supply、then、combine、allOf等等方法,都可以用来接收一个任务,最终将多个任务汇总成一个结果。
但有一个问题,你supply一个任务后,这个任务就黑盒了。如果你编排了很多个任务,每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后汇总结果。
一个并行框架,它最好是对每一步的执行都能监控。每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。拥有回调的任务,可以监控任务的执行状况,如果执行失败、超时,可以记录异常信息或者处理个性化的默认值。
CompleteableFuture中也有一些回调方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。
并发场景可能存在的需求-依赖上游的执行结果作为入参
future1.thenCompose(f2,()->xxx)
下游依赖于上游的结果
并发场景可能存在的需求-全组任务的超时
一组任务,虽然内部的各个执行单元的时间不可控,但是可以控制全组的执行时间不超过某个值。通过设置timeOut,来控制全组的执行阈值。
1 | CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); |
并发场景可能存在的需求-高性能、低线程数
复用线程

该框架全程无锁,没有一个加锁的地方。
创建线程量少。如上图④中场景。A会运行在B、C执行更慢的那个单元的线程上,而不会额外创建线程。
总结
一个并发框架可能需要具备哪些能力?
1、提供任何形式的串行、并行执行单元的组合
2、为每个执行单元提供执行成功、失败、超时、异常的回调
3、支持为单个执行单元设置异常、失败后的默认值
4、支持为整个group(多个任意组合的执行单元)设置超时时间。单个执行单元失败,不影响其他单元的回调和最终结果获取。如果自己依赖的任务失败,则自己也失败,并返回默认值。
5、整个group执行完毕或超时后,同步阻塞返回所有执行单元结果集,按添加的顺序返回list。也支持整个group的异步回调不阻塞主线程
6、支持每个group独享线程池,或所有group共享线程池(默认)
异步回调如何实现
上面我们总结了多线程的编排场景及实现,以及并发场景的一些潜在需求及实现。
该框架的难点和重点,主要有两点,分别是任务的顺序编排和任务结果的回调。
回调是个很有用的模式,譬如我的主线程执行过程中,要执行一个非常耗时的逻辑。为了不阻塞主线程,自然我们会想到用异步的形式去执行这个耗时逻辑,新建个线程,让这个耗时的逻辑在线程中执行,不阻塞主线程。但问题来了,异步执行没毛病,执行成功、失败后出结果了,该怎么通知主线程?
4.1、CompletableFuture的回调
CompletableFuture提供了许多回调的方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等。下面列举一些比较常用的回调方法,如下:
1 | //1.计算结果完成,或者异常时执行给定action(当前线程执行) |
CompleteableFuture提供的回调方法,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。
4.2、Netty future中的回调
Netty中的回调是非常多的。netty中的future,可以添加Listener,当异步任务执行完毕后,主动回调一下自己就可以了。
整个netty里面大量充斥着类似的回调,但是如果我们要用,仅仅是针对一个或多个异步任务,希望能有个类似的回调,netty就帮不上忙了。
Netty回调的伪代码:其中doSomething是在异步线程里,而回调是在主线程里的。(回调代码是在主线程,回调任务实际是在异步线程里执行的)
1 | //主线程 |
4.3、如何自己实现一个简单回调的异步任务
首先我们有这么一个需求,有N个耗时任务,可能是IO任务,我们希望他执行时不会阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。
我们可以有这样的角色:
- Bootstrap主线程
- worker 任务
- listener 监听器
1 | package com.soyo.simple; |
思考
核心线程数为0的线程池
作者在issue上推荐一个tomcat只使用一个不定长线程池。
AsyncTool关于多线程并发执行的问题。如果不指定线程池,默认使用不定长线程池。
1 | public static ExecutorService newCachedThreadPool() { |
注意,newCachedThreadPool() 它的核心线程数是0,最大线程数是Integer.MAX_VALUE。可以说它的线程数是可以无限扩大的线程池。
他使用的队列是没有存储空间的,只要请求过来了,如果没有空闲线程,就会创建一个新的线程。
这种不定长线程池,适合处理执行时间比较短的任务。在高并发下,如果在耗时长的RPC\IO操作中使用该线程池,势必会造成系统 线程爆炸。
AsyncTool作者在京东使用的场景多数为低耗时(10ms)高并发,瞬间冲击的场景。这种高并发,且任务耗时较少的,适合使用不定长线程池。
但是这种低耗时的场景也不多,对于耗时较长的场景,推荐使用自定义线程池,可以避免那些耗时长的任务长时间占用线程,造成线程 ”爆炸 “,容错率更高。
总结:
**newCachedThreadPool**** 特性**:核心线程数 0,最大线程数无限,队列无缓存 → 任务一来没线程就新建线程。- 适合短耗时任务:任务执行快,线程能很快释放和复用,高并发时能快速扩容,冲击过去后线程又能被回收,不会长期暴涨。
- 不适合长耗时任务:线程长时间被占用,新任务只能不断新建线程,最终线程数“爆炸”,导致内存和 CPU 切换压力过大,系统崩溃。
- 结论:短平快的高并发任务用不定长线程池;耗时任务必须用自定义线程池(限制核心数、最大数和队列),避免线程无限增长。
短耗时 + 高并发场景 → 秒杀下单接口
- 用户在 11.11 秒杀活动一开始,会有 瞬间几十万请求冲进来。
- 每个请求要做的事情很简单:
- 校验库存(内存中或 Redis 中)
- 扣减库存标记
- 返回“抢购成功/失败”结果
- 整个处理逻辑可能只需要 5~10ms。
在这种场景下:
- 请求量短时间非常大 → 需要线程池快速扩容,防止请求被阻塞。
- 每个任务很快执行完 → 线程会迅速释放,可以被复用,不会造成线程无限积压。
- 高峰一过,线程池里的线程就会在 60s 内回收 → 节省资源。
👉 所以 newCachedThreadPool 就特别适合这种“短平快、高并发、瞬时冲击”的场景。
而且双11这种平台肯定是希望用户请求在机器不会崩的情况下流量越多越好,因为下单的单数越多,那么平台赚得越多