/** * 我们需要什么? * 核心线程 coreThread * 非核心线程 supportThread * 任务队列 workQueue * 拒绝策略 rejectHandle */ public class MyThreadPool {
public MyThreadPool(BlockingQueue<Runnable> workQueue, int corePoolSize, int maxPoolSize, TimeUnit timeUnit, long timeout, RejectHandle rejectHandle){ this.workQueue = workQueue; this.corePoolSize=corePoolSize; this.maxPoolSize=maxPoolSize; this.timeUnit = timeUnit; this.timeout = timeout; this.rejectHandle = rejectHandle; } //存放任务的队列 private final BlockingQueue<Runnable> workQueue; private final int corePoolSize; private final int maxPoolSize; private final TimeUnit timeUnit; private final long timeout; private final RejectHandle rejectHandle;
//TODO: 当前这个execute一系列判断操作不是原子操作,所以存在线程安全问题,加锁,使用原子变量,volatile public void execute(Runnable command){ //如果核心线程数还没满,则创建核心线程去执行 if(coreThreadList.size()<corePoolSize){ //创建线程 Thread thread = new CoreThread(); //添加线程 coreThreadList.add(thread); //开启线程 thread.start(); } //任务进入任务队列 //如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,成功时返回 true, //如果当前没有可用空间,则返回 false。当使用容量受限的队列时,此方法通常比 add 更可取, //后者可能仅通过引发异常来无法插入元素。 if(workQueue.offer(command)){ return; } //任务队列满了的话,开始创建非核心线程了 //判断线程数是否超过最大线程树 if(coreThreadList.size()+supportThreadList.size()<maxPoolSize){ Thread supportThread = new SupportThread(); supportThreadList.add(supportThread); supportThread.start(); } //阻塞队列还是放不下的话,就要执行拒绝策略了 if (!workQueue.offer(command)) { //执行拒绝策略 rejectHandle.reject(command, this); } } public void executeSafe(Runnable command){ synchronized (command){ //如果核心线程数还没满,则创建核心线程去执行 if(coreThreadList.size()<corePoolSize){ //创建线程 Thread thread = new CoreThread(); //添加线程 coreThreadList.add(thread); //开启线程 thread.start(); } //任务进入任务队列 //如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,成功时返回 true, //如果当前没有可用空间,则返回 false。当使用容量受限的队列时,此方法通常比 add 更可取, //后者可能仅通过引发异常来无法插入元素。 if(workQueue.offer(command)){ return; } //任务队列满了的话,开始创建非核心线程了 //判断线程数是否超过最大线程树 if(coreThreadList.size()+supportThreadList.size()<maxPoolSize){ Thread supportThread = new SupportThread(); supportThreadList.add(supportThread); supportThread.start(); } //阻塞队列还是放不下的话,就要执行拒绝策略了 if (!workQueue.offer(command)){ //执行拒绝策略 rejectHandle.reject(command,this); } } } List<Thread> coreThreadList=new ArrayList<>(); List<Thread> supportThreadList=new ArrayList<>(); //核心线程:会一直执行 class CoreThread extends Thread{ @Override public void run() { //死循环,一直存活 while(true){ //阻塞从任务队列里拿任务 try { Runnable task = workQueue.take(); //运行任务 task.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } } //辅助线程,有存活时间 class SupportThread extends Thread{ @Override public void run() { //死循环,一直存活 while(true){ //阻塞从任务队列里拿任务 try { // timeout – how long to wait before giving up Runnable task = workQueue.poll(timeout,timeUnit); //如果一直拿不到任务就返回null if (task==null){ break; } //运行任务 task.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }