问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

ThreadPoolExecutor最佳实践:如何选择队列

创作时间:
作者:
@小白创作中心

ThreadPoolExecutor最佳实践:如何选择队列

引用
CSDN
1.
https://blog.csdn.net/Holmofy/article/details/81610481

ThreadPoolExecutor是Java中常用的线程池实现,其性能和稳定性在很大程度上取决于工作队列的选择。本文将详细介绍ThreadPoolExecutor的工作原理,并深入探讨JDK中提供的几种阻塞队列实现,帮助读者更好地理解和使用ThreadPoolExecutor。

ThreadPoolExecutor最核心的四点:

  1. 当有任务提交的时候,会创建核心线程去执行任务(即使有核心线程空闲);
  2. 当核心线程数达到corePoolSize时,后续提交的都会进BlockingQueue中排队;
  3. 当BlockingQueue满了(offer失败),就会创建临时线程(临时线程空闲超过一定时间后,会被销毁);
  4. 当线程总数达到maximumPoolSize时,后续提交的任务都会被RejectedExecutionHandler拒绝。

1. BlockingQueue

线程池中工作队列由BlockingQueue实现类提供功能,BlockingQueue定义了这么几组方法:

Summary of BlockingQueue methods
Throws exception
Special value
Blocks
Times out
Insert
add(e)
offer(e)
put(e)
offer(e, time, unit)
Remove
remove()
poll()
take()
poll(time, unit)
Examine
element()
peek()
not applicable
not applicable

阻塞队列是最典型的“生产者消费者”模型:

  • 生产者调用put()方法将生产的元素入队,消费者调用take()方法;
  • 当队列满了,生产者调用的put()方法会阻塞,直到队列有空间可入队;
  • 当队列为空,消费者调用的get()方法会阻塞,直到队列有元素可消费;

但是需要十分注意的是:ThreadPoolExecutor提交任务时使用offer方法(不阻塞),工作线程从队列取任务使用take方法(阻塞)。正是因为ThreadPoolExecutor使用了不阻塞的offer方法,所以当队列容量已满,线程池会去创建新的临时线程;同样因为工作线程使用take()方法取任务,所以当没有任务可取的时候线程池的线程将会空闲阻塞。

事实上,工作线程的超时销毁是调用

offer(e, time, unit)

实现的。

2. JDK提供的阻塞队列实现

JDK中提供了以下几个BlockingQueue实现类:

2.1. ArrayBlockingQueue

这是一个由数组实现容量固定的有界阻塞队列。这个队列的实现非常简单:

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x; // 入队
    if (++putIndex == items.length) // 如果指针到了末尾
        putIndex = 0; // 下一个入队的位置变为0
    count++;
    notEmpty.signal(); // 提醒消费者线程消费
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null; // 出队置空
    if (++takeIndex == items.length) // 如果指针到了末尾
        takeIndex = 0; // 下一个出队的位置变为0
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal(); // 提醒生产者线程生产
    return x;
}

通过简单的指针循环实现了一个环形队列:

下面有一张维基百科关于环形缓冲区的的动画,虽然动画描述内容与ArrayBlockingQueue实现有所差异,但贵在生动形象(着实找不到更好的动画了)。

ArrayBlockingQueue主要复杂在迭代,允许迭代中修改队列(删除元素时会更新迭代器),并不会抛出ConcurrentModificationException;好在大多数场景中我们不会迭代阻塞队列。

2.2. SynchronousQueue

这是一个非常有意思的集合,更准确的说它并不是一个集合容器,因为它没有容量。你可以“偷偷地”把它看作

new ArrayBlockingQueue(0)

,之所以用"偷偷地"这么龌龊的词,首先是因为

ArrayBlockingQueue

capacity<1

时会抛异常,其次

ArrayBlockingQueue(0)

并不能实现

SynchronousQueue

这么强大的功能。

正如SynchronousQueue的名字所描述一样——“同步队列”,它专门用于生产者线程与消费者线程之间的同步

  • 因为它任何时候都是空的,所以消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回。
  • 同样的,你也可以认为任何时候都是满的,所以生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

另外还有几点需要注意:

  • SynchronousQueue不能遍历,因为它没有元素可以遍历;
  • 所有的阻塞队列都不允许插入null元素,因为当生产者生产了一个null的时候,消费者调用poll()返回null,无法判断是生产者生产了一个null元素,还是队列本身就是空。

CachedThreadPool使用的就是同步队列

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因为SynchronousQueue无容量的特性,所以CachedThreadPool不会对任务进行排队,如果线程池中没有空闲线程,CachedThreadPool会立即创建一个新线程来接收这个任务。

所以使用CachedThreadPool要注意避免提交长时间阻塞的任务,可能会由于线程数过多而导致内存溢出(OutOfOutOfMemoryError)。

2.3. LinkedBlockingQueue

这是一个由单链表实现默认无界的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。

按照官方文档的说法LinkedBlockingQueue是一种可选有界(optionally-bounded)阻塞队列

SingleThreadPool和FixedThreadPool使用的就是LinkedBlockingQueue

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号