线程池

手搓一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/**
* 线程根本不存在复用这一说,在它创建,销毁这个生命周期来看,执行完一个任务后肯定是会销毁的
*/

/**
* 我们需要什么?
* 核心线程 coreThread
* 非核心线程 supportThread
* 任务队列 workQueue
* 拒绝策略 rejectHandle
*/
public class MyThreadPool {

public MyThreadPool(BlockingQueue<Runnable> workQueue, int corePoolSize, int maxPoolSize, TimeUnit timeUnit, long timeout, RejectHandle rejectHandle){
this.workQueue = workQueue;
this.corePoolSize=corePoolSize;
this.maxPoolSize=maxPoolSize;
this.timeUnit = timeUnit;
this.timeout = timeout;
this.rejectHandle = rejectHandle;
}
//存放任务的队列
private final BlockingQueue<Runnable> workQueue;
private final int corePoolSize;
private final int maxPoolSize;
private final TimeUnit timeUnit;
private final long timeout;
private final RejectHandle rejectHandle;

//TODO: 当前这个execute一系列判断操作不是原子操作,所以存在线程安全问题,加锁,使用原子变量,volatile
public void execute(Runnable command){
//如果核心线程数还没满,则创建核心线程去执行
if(coreThreadList.size()<corePoolSize){
//创建线程
Thread thread = new CoreThread();
//添加线程
coreThreadList.add(thread);
//开启线程
thread.start();
}
//任务进入任务队列
//如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,成功时返回 true,
//如果当前没有可用空间,则返回 false。当使用容量受限的队列时,此方法通常比 add 更可取,
//后者可能仅通过引发异常来无法插入元素。
if(workQueue.offer(command)){
return;
}
//任务队列满了的话,开始创建非核心线程了
//判断线程数是否超过最大线程树
if(coreThreadList.size()+supportThreadList.size()<maxPoolSize){
Thread supportThread = new SupportThread();
supportThreadList.add(supportThread);
supportThread.start();
}
//阻塞队列还是放不下的话,就要执行拒绝策略了
if (!workQueue.offer(command)) {
//执行拒绝策略
rejectHandle.reject(command, this);
}
}
public void executeSafe(Runnable command){
synchronized (command){
//如果核心线程数还没满,则创建核心线程去执行
if(coreThreadList.size()<corePoolSize){
//创建线程
Thread thread = new CoreThread();
//添加线程
coreThreadList.add(thread);
//开启线程
thread.start();
}
//任务进入任务队列
//如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,成功时返回 true,
//如果当前没有可用空间,则返回 false。当使用容量受限的队列时,此方法通常比 add 更可取,
//后者可能仅通过引发异常来无法插入元素。
if(workQueue.offer(command)){
return;
}
//任务队列满了的话,开始创建非核心线程了
//判断线程数是否超过最大线程树
if(coreThreadList.size()+supportThreadList.size()<maxPoolSize){
Thread supportThread = new SupportThread();
supportThreadList.add(supportThread);
supportThread.start();
}
//阻塞队列还是放不下的话,就要执行拒绝策略了
if (!workQueue.offer(command)){
//执行拒绝策略
rejectHandle.reject(command,this);
}
}
}
List<Thread> coreThreadList=new ArrayList<>();
List<Thread> supportThreadList=new ArrayList<>();
//核心线程:会一直执行
class CoreThread extends Thread{
@Override
public void run() {
//死循环,一直存活
while(true){
//阻塞从任务队列里拿任务
try {
Runnable task = workQueue.take();
//运行任务
task.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
//辅助线程,有存活时间
class SupportThread extends Thread{
@Override
public void run() {
//死循环,一直存活
while(true){
//阻塞从任务队列里拿任务
try {
// timeout – how long to wait before giving up
Runnable task = workQueue.poll(timeout,timeUnit);
//如果一直拿不到任务就返回null
if (task==null){
break;
}
//运行任务
task.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}


}

线程池八大参数

1
2
3
4
5
6
7
8
this.workQueue = workQueue;
this.corePoolSize=corePoolSize;
this.maxPoolSize=maxPoolSize;
this.timeUnit = timeUnit;
//非核心线程最大活跃时间
this.timeout = timeout;
this.rejectHandle = rejectHandle;
this.线程工厂

线程池核心流程

核心就阻塞队列的三个方法

  • Object take() 一直阻塞获取队列的元素
  • Object poll(timeOut,TimeUtil) 在规定时间内阻塞获取任务,如果获取不到就返回 null
  • boolean offer() 往队列里塞任务,如果塞不进去就返回false 塞得进去就返回true
  1. 先判断当前线程数大小是否小于corePoolSize,是的话创建核心线程数,不是的话就直接执行2
  2. 把任务塞到队列里,看任务是否塞得进去
  3. 塞得进去返回,塞不进去判断TreadSize是否< maxSize
  4. 是:创建非核心线程
  5. 再一次把任务塞进队列里,判断是否能成功,失败就触发拒绝策略

拒绝策略

  • AbortPolicy****:丢弃任务并抛出 RejectedExecutionException 异常,这是默认的策略。
  • DiscardOldestPolicy:丢弃队列最前面的任务,执行后面的任务
  • CallerRunsPolicy:由调用线程处理该任务
  • DiscardPolicy:也是丢弃任务,但是不抛出异常,相当于静默处理。

线程池参数设置?

线程池参数设置

线程池默认有线程吗

没有。如果想要先创建好一些线程,来提高性能,我们可以通过预热机制,preStartThread or

preStartAllCoreThread来预热线程,然后线程池等着接任务就行了

线程池如何保证线程安全操作的

keepAliveTime对核心线程有效吗

默认无效,但我们可以开启 allowedCoreThreadTimeOut 来生效

如果线程池里的任务出异常了,我们是否可以捕获这个异常,且这个工作线程会不会被销毁

可以捕获,但需要手动捕获。如果不捕获,线程就相当于崩溃了工作线程会被销毁,且后面会重新new一个出

来,这是线程的恢复机制

如何对线程池的内存使用情况进行一个预估

线程池的任务如何进行持久化

引入MQ,发到MQ,利用MQ消费者的ack机制,防止没执行的任务在线程池所在的机器宕机时发生丢失

线程死循环获取任务,会浪费cpu吗?

  • 如果是忙轮询(busy loop),线程会一直占用 CPU 核,确实会浪费 CPU,哪怕没有任务也会不断执行指令。
  • 如果加了sleep / wait / condition variable 机制,那么线程会在没有任务时阻塞,让 CPU 去干别的事,这样几乎不浪费 CPU。

所以关键是有没有阻塞机制
比如 BlockingQueue.take() 就会在没任务时阻塞,不会忙等

while(true){

//取不到任务就阻塞住

task = queue.take();

}

take():

public E take() throws InterruptedException {

//中断锁,即不会死等到拿到锁,期间能响应中断

lock.lockInterruptibly();

try {

    while (count == 0) { // 队列空

        notEmpty.await(); // 等待条件满足

    }

    return dequeue();

} finally {

    lock.unlock();

}

}

不断沉睡唤醒,内核态和用户态不断切换,会不会有开销?

会有,但相对 CPU 忙等来说,这个开销小很多。

  • 沉睡:线程会调用系统调用(syscall)让内核把它挂起(从用户态切到内核态)。
  • 唤醒:任务到来时,内核会切回用户态让线程运行。
    这个切换属于上下文切换,大概是微秒级的开销,除非唤醒特别频繁,否则影响不大。

线程池有什么缺陷?

常见缺陷:

  1. 资源占用:线程是重量级对象,占内存(线程栈)和内核资源。
  2. 线程数固定:线程池满了,新任务要么排队,要么拒绝,容易导致延迟变大。
  3. 任务阻塞问题:如果线程执行的任务是阻塞型(特别是 I/O 阻塞),会占住线程,导致其他任务排队等待,甚至造成线程池“假死”。
  4. 调优复杂:核心线程数、队列长度、拒绝策略等要根据业务特点调,不同场景差异大。

IO密集型任务适合放线程池吗?

注意两点:

线程的核心任务就是执行 CPU 指令,因为这是它能对世界产生效果的唯一方式。

IO操作不是 CPU 做的运算,而是 外设/设备控制器 做的工作。CPU 只负责发起操作检查状态,真正的数

据传输是硬件自己完成的。

那么

I/O 密集型任务不适合直接放传统线程池,因为线程容易被阻塞占用,导致线程池空转和任务堆积。更适合异

步 I/O 或事件驱动模型,让线程资源得到充分利用。


IO密集型任务为什么不适合放线程池?

  • 线程池的线程是稀缺资源,而 I/O 阻塞期间,线程只是“等数据”,没用 CPU 干活。
  • 阻塞 I/O 会让有限的线程数被浪费掉,新任务只能等它们完成。
  • 解决办法是异步化加大线程池线程数(但这样会增加上下文切换和内存开销)。

画板

你说IO任务堵住了,为什么会堵住?

  • 阻塞 I/O(Blocking I/O)调用,比如 read()accept()recv(),在数据没到之前,线程会停在这里等。
  • 数据没到可能是因为:
  1. 对方(客户端/服务器)没发送数据。
  2. 网络延迟高。
  3. 磁盘读写速度慢(比如从 HDD 读大文件)。
  • 在阻塞期间,这个线程既不释放线程池的线程,也不能去干别的事,导致任务堆积。

关于如何尽快释放宝贵资源,业务最佳实践,dubbo服务端线程模型

jdk线程池的一个bug,但官方没修复,把它当成了一个feature,但我认为这其实是一个设计不好的地方

netty的eventloopgroup

核心线程数为0的场景有哪些?

核心线程数和最大线程数一样,会有什么场景?

线程池优化

利用cpu亲和性优化上下文切换

什么地方用到了,netty

其他类线程池

  • forkjionpool
  • Tomcat pool
  • eventLoopGroup

forkjionpool

双端队列

头和尾都可以进行添加和删除操作的队列,本质就是stack和queue的结合

工作窃取算法

工作窃取算法是一种动态、负载均衡高效的多线程任务分配机制,空闲线程可主动“窃取”其它线程未完成任务,使系统资源利用率最大化。


举一个例子:

假设有线程A/B/C,每个线程一开始都有一份任务队列:

- <font style="color:rgb(78, 78, 78);">线程A:任务A1, A2, A3</font>
- <font style="color:rgb(78, 78, 78);">线程B:任务B1, B2</font>
- <font style="color:rgb(78, 78, 78);">线程C:任务C1</font>
- <font style="color:rgb(78, 78, 78);">线程C完成C1后,将自己任务队列清空。</font>
- <font style="color:rgb(78, 78, 78);">它随机选取别的线程(比如线程A),“偷”走A的最后一个任务A3执行。</font>
- <font style="color:rgb(78, 78, 78);">这样被称为“工作窃取”。</font>

线程安全的保证:

执行流程及原理:

什么时候用threadpool,什么时候用forkjoin?