ThreadPoolExecutor最佳实践:如何选择队列
ThreadPoolExecutor最佳实践:如何选择队列
ThreadPoolExecutor是Java中常用的线程池实现,其性能和稳定性在很大程度上取决于工作队列的选择。本文将详细介绍ThreadPoolExecutor的工作原理,并深入探讨JDK中提供的几种阻塞队列实现,帮助读者更好地理解和使用ThreadPoolExecutor。
ThreadPoolExecutor最核心的四点:
- 当有任务提交的时候,会创建核心线程去执行任务(即使有核心线程空闲);
- 当核心线程数达到corePoolSize时,后续提交的都会进BlockingQueue中排队;
- 当BlockingQueue满了(offer失败),就会创建临时线程(临时线程空闲超过一定时间后,会被销毁);
- 当线程总数达到maximumPoolSize时,后续提交的任务都会被RejectedExecutionHandler拒绝。
1. BlockingQueue
线程池中工作队列由BlockingQueue实现类提供功能,BlockingQueue定义了这么几组方法:
Summary of BlockingQueue methods | Throws exception | Special value | Blocks | Times out |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
阻塞队列是最典型的“生产者消费者”模型:
- 生产者调用put()方法将生产的元素入队,消费者调用take()方法;
- 当队列满了,生产者调用的put()方法会阻塞,直到队列有空间可入队;
- 当队列为空,消费者调用的get()方法会阻塞,直到队列有元素可消费;
但是需要十分注意的是:ThreadPoolExecutor提交任务时使用offer方法(不阻塞),工作线程从队列取任务使用take方法(阻塞)。正是因为ThreadPoolExecutor使用了不阻塞的offer方法,所以当队列容量已满,线程池会去创建新的临时线程;同样因为工作线程使用take()方法取任务,所以当没有任务可取的时候线程池的线程将会空闲阻塞。
事实上,工作线程的超时销毁是调用
offer(e, time, unit)
实现的。
2. JDK提供的阻塞队列实现
JDK中提供了以下几个BlockingQueue实现类:
2.1. ArrayBlockingQueue
这是一个由数组实现的容量固定的有界阻塞队列。这个队列的实现非常简单:
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; // 入队
if (++putIndex == items.length) // 如果指针到了末尾
putIndex = 0; // 下一个入队的位置变为0
count++;
notEmpty.signal(); // 提醒消费者线程消费
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; // 出队置空
if (++takeIndex == items.length) // 如果指针到了末尾
takeIndex = 0; // 下一个出队的位置变为0
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 提醒生产者线程生产
return x;
}
通过简单的指针循环实现了一个环形队列:
下面有一张维基百科关于环形缓冲区的的动画,虽然动画描述内容与ArrayBlockingQueue实现有所差异,但贵在生动形象(着实找不到更好的动画了)。
ArrayBlockingQueue主要复杂在迭代,允许迭代中修改队列(删除元素时会更新迭代器),并不会抛出ConcurrentModificationException;好在大多数场景中我们不会迭代阻塞队列。
2.2. SynchronousQueue
这是一个非常有意思的集合,更准确的说它并不是一个集合容器,因为它没有容量。你可以“偷偷地”把它看作
new ArrayBlockingQueue(0)
,之所以用"偷偷地"这么龌龊的词,首先是因为
ArrayBlockingQueue
在
capacity<1
时会抛异常,其次
ArrayBlockingQueue(0)
并不能实现
SynchronousQueue
这么强大的功能。
正如SynchronousQueue的名字所描述一样——“同步队列”,它专门用于生产者线程与消费者线程之间的同步:
- 因为它任何时候都是空的,所以消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回。
- 同样的,你也可以认为任何时候都是满的,所以生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
另外还有几点需要注意:
- SynchronousQueue不能遍历,因为它没有元素可以遍历;
- 所有的阻塞队列都不允许插入null元素,因为当生产者生产了一个null的时候,消费者调用poll()返回null,无法判断是生产者生产了一个null元素,还是队列本身就是空。
CachedThreadPool使用的就是同步队列:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因为SynchronousQueue无容量的特性,所以CachedThreadPool不会对任务进行排队,如果线程池中没有空闲线程,CachedThreadPool会立即创建一个新线程来接收这个任务。
所以使用CachedThreadPool要注意避免提交长时间阻塞的任务,可能会由于线程数过多而导致内存溢出(OutOfOutOfMemoryError)。
2.3. LinkedBlockingQueue
这是一个由单链表实现的默认无界的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。
按照官方文档的说法LinkedBlockingQueue是一种可选有界(optionally-bounded)阻塞队列。
SingleThreadPool和FixedThreadPool使用的就是LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(