问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

什么是BlockingQueue?

创作时间:
作者:
@小白创作中心

什么是BlockingQueue?

引用
1
来源
1.
https://www.coonote.com/java-note/blockingqueue.html

BlockingQueue是Java并发编程中的重要工具类,用于解决生产者-消费者问题。本文将详细介绍BlockingQueue的基本概念、不同类型、工作原理以及实际应用示例。

一、队列类型

  1. 无限队列(unbounded queue):几乎可以无限增长
  2. 有限队列(bounded queue):定义了最大容量

二、队列数据结构

队列实质就是一种存储数据的结构,通常用链表或者数组实现。一般而言队列具备FIFO(先进先出)的特性,当然也有双端队列(Deque)和优先级队列。

主要操作包括入队(EnQueue)与出队(Dequeue)。

三、常见的4种阻塞队列

  1. ArrayBlockingQueue:由数组支持的有界队列
  2. LinkedBlockingQueue:由链接节点支持的可选有界队列
  3. PriorityBlockingQueue:由优先级堆支持的无界优先级队列
  4. DelayQueue:由优先级堆支持的、基于时间的调度队列

1. ArrayBlockingQueue

队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好。

队列创建:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

应用场景:在线程池中有比较多的应用,生产者消费者场景

工作原理:基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞

2. LinkedBlockingQueue

是一个基于链表的无界队列(理论上有界)。

BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE。向无限队列添加元素的所有操作都将永远不会阻塞,但需要注意内存可能会填满,然后就会得到一个 OutOfMemory 异常。

3. DelayQueue

由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。

队列创建:

BlockingQueue<String> blockingQueue = new DelayQueue();

要求:入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口

应用场景:电影票

工作原理:队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

四、BlockingQueue API

BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

添加元素

方法
说明
add()
如果插入成功则返回 true,否则抛出 IllegalStateException 异常
put()
将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入
offer()
如果插入成功则返回 true,否则返回 false
offer(E e, long timeout, TimeUnit unit)
尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入

检索元素

方法
说明
take()
获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit)
检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

五、多线程生产者-消费者示例

接下来我们创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer ) 。

生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue 中。我们将创建 16 个线程用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。

生产者代码

@Slf4j
public class NumbersProducer implements Runnable {
    private final int poisonPill;
    private final int poisonPillPerProducer;
    private BlockingQueue<Integer> numbersQueue;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue,
                           int poisonPill,
                           int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }

    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("生产者-{}号,生成编号:{}", Thread.currentThread().getId());
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("生产者-{}号,放入第{}颗毒丸!", Thread.currentThread().getId(), j + 1);
        }
    }
}

消费者代码

@Slf4j
public class NumbersConsumer implements Runnable {
    private final int poisonPill;
    private BlockingQueue<Integer> queue;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("消费者-{}号,处理编号:{}", Thread.currentThread().getId(), number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

主程序

public class Main {
    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
        int mod = N_CONSUMERS % N_PRODUCERS;
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

        // 创建生产者线程
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }

        // 创建消费者线程
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }

        // 最后一个生产者线程
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }
}

BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。BlockingQueue 用于协调它们之间的工作。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号