# 线程池
# JDK 线程池
# 5 种常用线程池
Executors 工厂方法 Executors.newCachedThreadPool() 提供了无界线程池,可以进行自动线程回收;Executors.newFixedThreadPool(int) 提供了固定大小线程池,内部使用无界队列;Executors.newSingleThreadExecutor() 提供了单个后台线程。
# 1、newCachedThreadPool
//有缓冲的线程池,线程数 JVM 控制
ExecutorService threadPool = Executors.newCachedThreadPool();
2
底层实现
在 newCachedThreadPool 中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 初看该构造函数时我有这样的疑惑:核心线程池为 0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue 中进行等待,而 SynchronousQueue 的大小为 1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程? 这个问题的答案在上面讲 SynchronousQueue 时其实已经给出了,要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接收这个元素。因此即便 SynchronousQueue 一开始为空且大小为 1,第一个任务也无法放入其中,因为没有线程在等待从 SynchronousQueue 中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。
由于队列缓冲区为空,每来一个任务时,都会在必要时新建线程执行任务直到到达 Integer.MAX_VALUE,这就有可能导致大量的线程被创建,进而造成 oom
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2
3
4
5
# 2、newFixedThreadPool
//固定大小的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
2
底层实现
线程数量固定,使用无限大的队列。
可能因为阻塞队列被撑爆造成 oom
public static ExecutorService newFixedThreadPool(int nThreads) {
//超过nThreads个线程后会无限制的往阻塞队列放入线程
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2
3
4
5
6
# 3、newSingleThreadExecutor
ExecutorService threadPool = Executors.newSingleThreadExecutor();
底层实现
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
单线程的线程池,只有一个线程在工作
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
//超过1个线程后会无限制的往阻塞队列放入线程
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
2
3
4
5
6
7
8
# 4、newScheduledThreadPool
ExecutorService threadPool4 = Executors.newScheduledThreadPool(2);
底层实现
ScheduledThreadPoolExecutor 的父类即 ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是 DelayedWorkQueue 这个阻塞对列,在上面没有介绍,它作为静态内部类就在 ScheduledThreadPoolExecutor 中进行了实现。简单的说,DelayedWorkQueue 是一个无界队列,它能按一定的顺序对工作队列中的元素进行排列。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
2
3
4
# 5、ThreadPoolExecutor
默认线程池,可控制参数比较多
ExecutorService threadPool = new ThreadPoolExecutor();
# 6、newWorkStealingPool
这是 jdk1.8 中新增加的一种线程池实现,创建一个拥有多个任务队列(以便减少连接数)的线程池。
无参构造
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
2
3
4
5
6
# 线程池常用阻塞队列
BlockingQueue<Runnable> workQueue = null;
workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出队列,有界
workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出队列,无界
workQueue = new LinkedBlockingQueue<>(5);//当传入队列容量时有界
workQueue = new SynchronousQueue<>();//无缓冲的等待队列,无界
2
3
4
5
# 1、SynchronousQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), r -> new Thread(r, "ThreadTest"));
SynchronousQueue 没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
拥有公平(FIFO)和非公平(LIFO)策略,非公平侧罗会导致一些数据永远无法被消费的情况
使用 SynchronousQueue 阻塞队列一般用于构造 newCachedThreadPool
# 2、LinkedBlockingQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "ThreadTest"));
LinkedBlockingQueue 是一个无界缓存等待队列。当前执行的线程数量达到 corePoolSize 的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时 maximumPoolSizes 就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
# 3、ArrayBlockingQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));
ArrayBlockingQueue 是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于 corePoolSize 时,多余的元素缓存在 ArrayBlockingQueue 队列中等待有空闲的线程时继续执行,当 ArrayBlockingQueue 已满时,加入 ArrayBlockingQueue 失败,会开启新的线程去执行,当线程数已经达到最大的 maximumPoolSizes 时,再有新的元素尝试加入 ArrayBlockingQueue 时会报错。
# 四种拒绝策略
RejectedExecutionHandler rejected = null;
//默认,队列满了丢任务抛出异常
rejected = new ThreadPoolExecutor.AbortPolicy();
//队列满了丢任务不异常
rejected = new ThreadPoolExecutor.DiscardPolicy();
//将最早进入队列的任务删,之后再尝试加入队列
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();
//如果添加到线程池失败,那么主线程会自己去执行该任务
rejected = new ThreadPoolExecutor.CallerRunsPolicy();
2
3
4
5
6
7
8
9
# 1、CallerRunsPolicy:
线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }} 这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该 execute 的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲, 很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
# 2、AbortPolicy:
处理程序遭到拒绝将抛出运行时 RejectedExecutionException public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();} 这种策略直接抛出异常,丢弃任务。(jdk 默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
# 3、DiscardPolicy:
不能执行的任务将被删除 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} 这种策略和 AbortPolicy 几乎一样,也是丢弃任务,只不过他不抛出异常。
# 4、DiscardOldestPolicy:
如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }} 该策略就稍微复杂一些,在 pool 没有关闭的前提下首先丢掉缓存在队