Java ThreadPoolExecutor 学习笔记(二)_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Java ThreadPoolExecutor 学习笔记(二)

Java ThreadPoolExecutor 学习笔记(二)

 2015/5/15 13:04:42  Vampiredx  程序员俱乐部  我要评论(0)
  • 摘要:ThreadPoolExecutor实现了ExecutorService接口,从常用的方法来看一下ThreadPoolExecutor内部实现的大致流程.Futuresubmit(Callabletask)这个方法的实现在父类AbstractExecutorService里面:1.用Callabletask去创建FutureTask实例。FutureTask是RunnableFuture接口的实现类,即实现了Runnable接口,也实现了Future接口。2
  • 标签:笔记 学习 thread Java 学习笔记
ThreadPoolExecutor 实现了 ExecutorService 接口,从常用的方法来看一下 ThreadPoolExecutor 内部实现的大致流程.

Future submit(Callable task)
这个方法的实现在父类 AbstractExecutorService 里面:
1. 用 Callable task去创建 FutureTask 实例。FutureTask 是 RunnableFuture 接口的实现类,即实现了 Runnable 接口,也实现了 Future 接口。
2. 用 FutureTask 实例调用子类 execute(Runnable command)。
3. 把 FutureTask 实例返回

Future submit(Runnable task, T result)
这个方法的实现也在父类 AbstractExecutorService 里面:
1. 用 Runnable task 和 result 去创建 FutureTask 实例(成功执行 task 后,返回 result)。
2. 用 FutureTask 实例调用子类 execute(Runnable command)。
3. 把 FutureTask 实例返回

void execute(Runnable command)
1. 获取当前线程数量。
2. 如果当前线程数量少于 corePoolSize,调用 addWorker(command, true)去增加新的核心线程。
3. 如果添加核心线程失败(因为并发原因不能再添加核心线程),或当前线程数量大于等于 corePoolSize,把 command 放进 workQueue。
4. 再次检查线程池状态,如果被关闭,从 workQueue 移除 command ,调用 handler.rejectedExecution(command)。如果第2次检查查线程池没有被关闭,并且没有 worker 在工作,则调用 addWorker(null, false) 新增线程。
5. 如果 command 无法放进 workQueue,调用 addWorker(command, false) 增加新的 线程。
6. 新增线程失败(失败可能是因为线程池被关闭或者线程数量已经达到 maximumPoolSize ),则调用 handler.rejectedExecution(command) 。(源代码往 workQueue 增加task调用的是 offer 方法,也就是说 workQueue 如果已经满了,当前线程不会被blocking。在默认 AbortPolicy 策略下,抛出 RejectedExecutionException。 但是有的时候,我们并不希望得到一个 RejectedExecutionException,而是希望阻塞当前以及后续的线程提交新的 task 。可以实现 customized RejectedExecutionHandler,调用workQueue的put方法以达到这个目的。)

从上面的流程可以看出,只有在 workQueue 满了以后,才会增加非核心的线程。如果 workQueue 的 size 很大,那么很可能一直工作的只有 corePoolSize 数量的线程。

boolean addWorker(Runnable firstTask, boolean core)
1. 获取当前的线程数量,线程池状态
2. 如果状态显示线程池已经被关闭,或者正在被关闭,workQueue 已经空了,则直接返回false(创建新的线程失败)
3. 如果当前是添加核心线程,线程池数量大于等于 corePoolSize,返回false。
4. 如果当前是添加非核心线程,线程池数量大于等于 maximumPoolSize,则直接返回false
5. 使用 CAS(compareAndSet) 去增加当前的线程数量+1,CAS失败则从step 3开始重试。成功则再次获得线程池状态,如果线程池状态已经改变,则从step 1开始重试
6. 使用 firstTask创建一个worker,这个过程会调用 ThreadFactory 去创建新的 Worker 线程
7. 设置 workerStarted = false
8. 获取线程池的 mainLock,再次检查线程池状态,如果还是 RUNNING,或者是 SHUTDOWN 但是 firstTask 是空的(处于正在关闭中,需要新的线程把已有任务处理完毕),则继续
9. 把新的 worker 保存在 workers,更新 largestPoolSize,释放mainLock
10. 启动新线程,设置workerStarted = true。(如果线程池状态已经在step 8改变,或者启动新线程报错(OOM),workerStarted还是false) 新的线程会调用 Worker.run(),代理调用到 runWorker() 方法。
11. 再次检查 workerStarted,如果是 false,则 call addWorkerFailed(worker)去移除 worker,减少当前的线程数量
12. 返回当前 workerStarted

从这里可以看到,对于当前的线程数量增加,主要还是通过 CAS 操作去更新。但是对于 workers,largestPoolSize 的维护,则是通过mainLock完成的。由于线程池状态的更新没有使用 mainLock,即便在获取 mainLock 的情况下,也需要反复 check。另外一点要指出的是,如果 step 4 CAS 操作成功的话,当前的线程数量+1。在step 8获取 mainLock 后,由于线程池状态的改变也是可能失败的,则在后面 addWorkerFailed(worker) 中会-1。

void addWorkerFailed(Worker w)
1. 获取线程池的mainLock
2. CAS操作减去线程数量-1
3. call tryTerminate() 尝试关闭线程池
4. 释放mainLock

void tryTerminate()
1. 获取当前的线程数量,线程池状态
2. 如果线程池状态是 RUNNING、 TIDYING、 TERMINATED 直接返回
3. 如果线程池是 SHUTDOWN,但是 workQueue 还有 task,直接返回
4. 到这步,线程池状态应该是 SHUTDOWN 并且 workQueue 已经没有 task,或者是 STOP。如果线程池里面还有线程存活,call interruptIdleWorkers(true) 从 workers 里面去关闭一个闲置线程 (为什么只关闭一个?),return
5. 到这步,线程池状态应该是 SHUTDOWN 或者 STOP,workQueue 已经没有 task,线程池里面没有线程存活。获取 mainLock, CAS 操作设置状态为 TIDYING 。如果 CAS 成功,则再次设置状态为 TERMINATED, call termination.signalAll()唤醒等待线程池关闭的线程(外部线程可能 call awaitTermination(long timeout, TimeUnit unit)等待线程池彻底关闭)
6. 释放mainLock


这里的疑惑为 step 4,当线程池里面还有线程存活的时候,尝试去关闭最多一个线程。为什么是只关闭最多一个?而不是全部?注意 step 5,CAS操作设置状态为 TIDYING 。这是因为前面检查状态都是在没获取 mainLock 情况下完成的,在获取 mainlock 后重复检查成功,之后才设置 TERMINATED 状态并唤醒等待线程池关闭的线程

class Worker
Worker 类是 ThreadPoolExecutor 内部私有类,继承于 AbstractQueuedSynchronizer,并且实现了 Runnable 接口。在前面 ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core) 中step 10,work 的线程被启动,将调用 Worker.run()方法执行task。在Worker.run()里面,代理调用ThreadPoolExecutor.runWorker(Worker w)。所以 Worker 主要还是实现了 AbstractQueuedSynchronizer 的 isHeldExclusively(), tryAcquire(int i), tryRelease(int i)方法,并提供了 lock() , isLocked() , tryLock() , unlock() 给 ThreadPoolExecutor 调用(Adapter模式),以及保持了 volatile long completedTasks 属性为统计使用

void runWorker(Worker w)
1. 从 Worker 里获取 firstTask(第一次)
2. 或者调用 getTask() 从 workQueue 里面获取任务
3. 调用 w.lock() 获取锁
4. 检查线程池状态,如果是 STOP,TIDYING,TERMINATED,则直接 call Thread.interrupt() worker 线程
5. 调用 task.run() 执行 task 。final阶段会把调用 completedTasks++ 统计完成任务数量。这里保留了 beforeExecute(wt, task) 和 afterExecute(task, thrown) 2个方法没有实现(模板模式),以便子类扩展
6. 2-5重复执行,如果 step 2 从 workQueue 获取null,则跳出循环,completedAbruptly = false。如果执行task过程中遇到任何Error或者Exception, completedAbruptly = true
7. 调用 processWorkerExit(w, completedAbruptly);

Runnable getTask()
1. 获取当前的线程池状态,如果是STOP,TIDYING或TERMINATED,调用decrementWorkerCount()减去worker数量,返回null
2. 获取当前works数量
3. 如果数量已经大于 maximumPoolSize,并且数量大于1或者 workQueue 已经空了,调用 compareAndDecrementWorkerCount(c) CAS操作减少works数量1,返回null
4. 如果线程池允许核心线程超时并且上次poll已经超时,并且数量大于1或者 workQueue 已经空了,调用 compareAndDecrementWorkerCount(c) CAS 操作减少 works 数量1,返回null
5. CAS操作失败则重新从step 1开始
6. 如果线程池允许核心线程超时,或者当前是非核心线程,调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 从 workQueue 获取任务。反之则调用 workQueue.take() 获取任务。获得任务后直接返回,超时则标记timedOut = true,重新从step 1开始
7. 期间如果线程被打断(一般是被调用shutdown()、shutdownNow()),则标记timedOut = false,重新从step 1开始

当线程池idle的时候,核心线程调用 workQueue.take() 在这里被阻塞。非核心线程则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),超时后线程退出。


void processWorkerExit(Worker w, boolean completedAbruptly)
1. 如果是异常退出 completedAbruptly = true,调用 decrementWorkerCount() 减去 worker 数量
2. 获取 mainLock,把 worker 完成任务数量统计到线程池维护的计数器里,completedTaskCount += w.completedTasks
3. 从 workers 删除 worker,释放 mainLock
4. 调用 tryTerminate() 尝试关闭线程池
5. 如果当前的线程池状态是 RUNNING 或者 SHUTDOWN,继续
6. 如果 worker 是正常退出,并且线程池是允许核心线程超时的或者 worker 数量已经大于等于 corePoolSize,直接返回。否则调用 addWorker(null, false) 创建一个新的线程
发表评论
用户名: 匿名