BlockingQueue
是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
- BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用,我们的关注点应该在 put(e) 和 take() 这两个方法,因为这两个方法是带阻塞的。
- BlockingQueue 不接受 null 值的插入
- 可以是有界的,也可以是无界的(容量为Integer.MAX_VALUE)
- BlockingQueue 的实现都是线程安全的(除批量操作),常用于生产者-消费者的场景中。
ArrayBlockingQueue
底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。
LinkedBlockingQueue
底层是单项链表,可以当做无界和有界队列来使用,使用两个锁来实现线程安全。takeLock 和 notEmpty 搭配 – putLock 和 notFull 搭配 (唤醒对方的设计比较特别)
SynchronousQueue
读线程和写线程需要同步,本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。
transfer,其基本算法如下:
我们可以假设出一个男女配对的场景:一个男的过来,如果一个人都没有,那么他需要等待;如果发现有一堆男的在等待,那么他需要排到队列后面;如果发现是一堆女的在排队,那么他直接牵走队头的那个女的。
PriorityBlockingQueue
是无界队列,带排序功能,数据结构为二叉堆,数组第一个也是树的根节点总是最小值;要求元素可比较大小;它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
并发设计 volatile int allocationSpinLock //锁,用于数组扩容时的 CAS 操作 设计巧妙,提高了吞吐量
数据结构的设计也值得参考
ThreadPoolExecutor
java 线程池几个相关类的继承结构:Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor
相关类有:FutureTask(Runnable),BlockingQueue,Executors
ExecutorService
定义了一个简单的线程池的主要功能,能提交任务,能获取结果,能关闭线程池。
AbstractExecutorService
实现的主要方法: invokeAny 方法和 invokeAll 方法;两个 newTaskFor 方法,用于将任务包装成 FutureTask。
ThreadPoolExecutor
是 JDK 中的线程池实现,它实现了任务提交、线程管理、监控等等方法。
调用方式:需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。
线程池需要考虑的问题:
队列是否有界,提交任务时队列满了怎么办?什么情况下会创建新的线程?提交任务时线程池满了怎么办?空闲线程怎么关掉?
最需要关心的几个属性:
corePoolSize –核心线程数
maximumPoolSize –最大线程数,线程池允许创建的最大线程数。
workQueue –任务队列,BlockingQueue 接口的某个实现
keepAliveTime –空闲线程(大于corePoolSize 时)的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。
threadFactory –用于生成线程,
handler –拒绝策略,默认为AbortPolicy,不管怎样,直接抛出 RejectedExecutionException 异常
其他属性:
largestPoolSize –线程池的大小曾经达到的最大值
AtomicInteger ctl–一个 32 位的整数来存放 线程池的状态 和 当前池中的线程数
线程池的状态:
RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个
设计1:任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker
内部类 Worker 继承抽象类 AbstractQueuedSynchronizer,方便实现同步 ,volatile long completedTasks;//用于存放此线程完成的任务数
关键方法:
runStateOf() 位运算得到了 线程池的状态
workerCountOf() 位运算得到了 当前池中的线程数
execute(Runnable command)
addWorker(Runnable firstTask, boolean core) 创建新的线程
runWorker(Worker w) 线程执行任务
getTask() 从队列中获取任务
reject(Runnable command) 执行拒绝策略,有两种情况会调用该方法
Executors 工具类
- newFixedThreadPool 最大线程数设置为与核心线程数相等,keepAliveTime 无效,任务队列采用 LinkedBlockingQueue,无界队列
过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。 - newSingleThreadExecutor nThreads 为1,其他的和上面一样。
- newCachedThreadPool 核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue ;这种线程池对于任务可以比较快速地完成的情况有比较好的性能。
过程分析:鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 SynchronousQueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中,否则就是和第一个任务一样,进到最后的 else if 分支创建新线程。
面试考察点
问题2:说说线程池中的线程创建时机?
- 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
- 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
- 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
- 注意:如果将队列设置为无界队列,那么线程数达到 corePoolSize 后,其实线程数就不会再增长了。因为后面的任务直接往队列塞就行了,此时 maximumPoolSize 参数就没有什么意义。
问题4:任务执行过程中发生异常怎么处理?
如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。
问题5:什么时候会执行拒绝策略?
workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。
多线程的风险:
创建大量同类线程而导致消耗完内存,而且大量线程会造成“过度切换”的问题
异步编程
通常,我们会有一个线程池用于执行异步任务,提交任务的线程将任务提交到线程池就可以立马返回,不必等到任务真正完成。
如果想要知道任务的执行结果,通常是通过传递一个回调函数的方式,任务结束后去调用这个函数;
或者通过 Future 实例,future.get() 阻塞等待获取执行结果(但可以决定什么时候阻塞获取结果,还可以设置什么时候执行成功)。