什么是BlockingQueue?
什么是BlockingQueue?
BlockingQueue是Java并发编程中的重要工具类,用于解决生产者-消费者问题。本文将详细介绍BlockingQueue的基本概念、不同类型、工作原理以及实际应用示例。
一、队列类型
- 无限队列(unbounded queue):几乎可以无限增长
- 有限队列(bounded queue):定义了最大容量
二、队列数据结构
队列实质就是一种存储数据的结构,通常用链表或者数组实现。一般而言队列具备FIFO(先进先出)的特性,当然也有双端队列(Deque)和优先级队列。
主要操作包括入队(EnQueue)与出队(Dequeue)。
三、常见的4种阻塞队列
- ArrayBlockingQueue:由数组支持的有界队列
- LinkedBlockingQueue:由链接节点支持的可选有界队列
- PriorityBlockingQueue:由优先级堆支持的无界优先级队列
- DelayQueue:由优先级堆支持的、基于时间的调度队列
1. ArrayBlockingQueue
队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好。
队列创建:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
应用场景:在线程池中有比较多的应用,生产者消费者场景
工作原理:基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞
2. LinkedBlockingQueue
是一个基于链表的无界队列(理论上有界)。
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE。向无限队列添加元素的所有操作都将永远不会阻塞,但需要注意内存可能会填满,然后就会得到一个 OutOfMemory 异常。
3. DelayQueue
由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。
队列创建:
BlockingQueue<String> blockingQueue = new DelayQueue();
要求:入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口
应用场景:电影票
工作原理:队列内部会根据时间优先级进行排序。延迟类线程池周期执行。
四、BlockingQueue API
BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。
添加元素
方法 | 说明 |
---|---|
add() | 如果插入成功则返回 true,否则抛出 IllegalStateException 异常 |
put() | 将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入 |
offer() | 如果插入成功则返回 true,否则返回 false |
offer(E e, long timeout, TimeUnit unit) | 尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入 |
检索元素
方法 | 说明 |
---|---|
take() | 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用 |
poll(long timeout, TimeUnit unit) | 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null |
五、多线程生产者-消费者示例
接下来我们创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer ) 。
生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue 中。我们将创建 16 个线程用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。
生产者代码
@Slf4j
public class NumbersProducer implements Runnable {
private final int poisonPill;
private final int poisonPillPerProducer;
private BlockingQueue<Integer> numbersQueue;
public NumbersProducer(BlockingQueue<Integer> numbersQueue,
int poisonPill,
int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
@Override
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
log.info("生产者-{}号,生成编号:{}", Thread.currentThread().getId());
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
log.info("生产者-{}号,放入第{}颗毒丸!", Thread.currentThread().getId(), j + 1);
}
}
}
消费者代码
@Slf4j
public class NumbersConsumer implements Runnable {
private final int poisonPill;
private BlockingQueue<Integer> queue;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
@Override
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
log.info("消费者-{}号,处理编号:{}", Thread.currentThread().getId(), number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
主程序
public class Main {
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
// 创建生产者线程
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
// 创建消费者线程
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
// 最后一个生产者线程
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
}
}
BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。BlockingQueue 用于协调它们之间的工作。