如何理解 Java 中的阻塞队列:从基础到高级的深度解析

提到阻塞队列,许多人脑海中会浮现出 BlockingQueueArrayBlockingQueueLinkedBlockingQueueSynchronousQueue。尽管这些实现看起来复杂,实际上阻塞队列本身的概念相对简单,真正挑战在于内部的 AQS(Abstract Queuing Synchronizer)。如果你对阻塞队列感到陌生,希望下面的内容能帮助你从全新角度理解它。


文章目录


1、线程间通信

线程间通信是指多个线程对共享资源的操作和协调。在生产者-消费者模型中,生产者和消费者是不同种类的线程,他们对同一个资源(如队列)进行操作。生产者负责向队列中插入数据,消费者负责从队列中取出数据。

主要挑战在于如何在资源达到上限时让生产者等待,而在资源达到下限时让消费者等待。线程间的这种相互调度,就是线程间通信。

以现实生活为例。消费者和生产者就像两个线程,原本做着各自的事情,厂家管自己生产,消费者管自己买,一般情况下彼此互不影响。900 240

image-20240808015130401

但当物资到达某个临界点时,就需要根据供需关系适当作出调整。比如,当厂家做了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费。

image-20240808015015146

同理,当消费者发现某个热销商品售罄,应该提醒厂家尽快生产。

image-20240808015356312

在上面的案例中,生产者和消费者是不同种类的线程,一个负责存入,另一个负责取出,且它们操作的是同一个资源。但最难的部分在于:资源到达上限时,生产者等待,消费者消费;资源达到下限时,生产者生产,消费者等待。

我们可以发现,原本互不打扰的两个线程之间开始了 “沟通”:

  • 生产者:做的商品太多了,应该扩大宣传,让大家来买。
  • 消费者:都卖完啦,应当提醒商家尽快补货。

这种线程间的相互调度,也就是线程间通信。


2、线程间通信的实现

实现线程间通信的方式有多种:

  • 轮询:生产者和消费者线程通过循环不断检查队列的状态。这种方法简单,但会消耗大量 CPU 资源,且无法保证原子性。
  • 等待唤醒机制(wait/notify):通过 waitnotify 机制,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,但 notify 可能导致线程竞争不均。
  • 等待唤醒机制(Condition):使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。
2.1、轮询

设计理念:生产者和消费者线程通过循环不断检查队列的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,否则一律 sleep 等待。

image-20240808015823555

代码实现:

```java
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

/**
 * 自定义阻塞队列实现:轮询版本
 * 
 * @param  队列中存储的元素类型
 */
public class WhileQueue {
    // 用来存储元素的容器
    private final LinkedList queue = new LinkedList<>();
    // 队列的最大容量
    private final int MAX_SIZE = 1;

    /**
     * 将元素添加到队列中
     * 
     * @param resource 要插入的元素
     * @throws InterruptedException 如果当前线程被中断
     */
    public void put(T resource) throws InterruptedException {
        // 如果队列满了,生产者线程将进入轮询等待状态
        while (queue.size() >= MAX_SIZE) {
            System.out.println("生产者:队列已满,无法插入...");
            TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试
        }
        // 插入元素到队列的前面
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
    }

    /**
     * 从队列中取出元素
     * 
     * @throws InterruptedException 如果当前线程被中断
     */
    public void take() throws InterruptedException {
        // 如果队列为空,消费者线程将进入轮询等待状态
        while (queue.size() <= 0) {
            System.out.println("消费者:队列为空,无法取出...");
            TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试
        }
        // 从队列的末尾取出元素
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        TimeUnit.MILLISECONDS.sleep(5000); // 模拟消费操作需要时间
    }
}

```

测试:

```java
/**
 * 测试类:创建生产者和消费者线程来测试WhileQueue的功能
 */
public class Test {
    public static void main(String[] args) {
        // 创建一个WhileQueue实例
        WhileQueue queue = new WhileQueue<>();

        // 创建并启动生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("消息" + i); // 插入消息到队列
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // 捕获并打印中断异常
                    }
                }
            }
        }).start();

        // 创建并启动消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take(); // 从队列中取出消息
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // 捕获并打印中断异常
                    }
                }
            }
        }).start();
    }
}

```

由于设定了队列最多只能存1个消息,所以只有当队列为空时,生产者才能插入数据。这是最简单的线程间通信:多个线程不断轮询共享资源,通过共享资源的状态判断自己下一步该做什么。

但上面的实现方式存在一些缺点:

  • 轮询的方式太耗费 CPU 资源,如果线程过多,比如几百上千个线程同时在那轮询,会给 CPU 带来较大负担
  • 无法保证原子性(代码里没有演示,但理论上确实如此,如果生产者的操作非原子性,消费者极可能获取到脏数据)
2.2、等待唤醒机制(wait/notify)

相对而言,等待唤醒机制则要优雅得多,底层维护线程队列,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,同时避免了过多线程同时自旋造成的 CPU 资源浪费,颇有点用空间换时间的味道。

当一个生产者线程无法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放 CPU 资源,等到消费者抢到 CPU 执行权并取出数据后,再由消费者唤醒生产者继续生产。

Java 有多种方式可以实现等待唤醒机制,最经典的就是通过 waitnotify 的方式:

```java
import java.util.LinkedList;

/**
 * 自定义阻塞队列实现:使用 wait/notify
 * 
 * @param  队列中存储的元素类型
 */
public class WaitNotifyQueue {
    // 用来存储元素的容器
    private final LinkedList queue = new LinkedList<>();
    // 队列的最大容量
    private final int MAX_SIZE = 1;

    /**
     * 将元素添加到队列中
     * 
     * @param resource 要插入的元素
     * @throws InterruptedException 如果当前线程被中断
     */
    public synchronized void put(T resource) throws InterruptedException {
        // 当队列满时,生产者线程进入等待状态
        while (queue.size() >= MAX_SIZE) {
            System.out.println("生产者:队列已满,无法插入...");
            this.wait(); // 释放锁,并进入等待状态
        }
        // 插入元素到队列的前面
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
        this.notify(); // 唤醒等待的消费者线程
    }

    /**
     * 从队列中取出元素
     * 
     * @throws InterruptedException 如果当前线程被中断
     */
    public synchronized void take() throws InterruptedException {
        // 当队列为空时,消费者线程进入等待状态
        while (queue.size() <= 0) {
            System.out.println("消费者:队列为空,无法取出...");
            this.wait(); // 释放锁,并进入等待状态
        }
        // 从队列的末尾取出元素
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        this.notify(); // 唤醒等待的生产者线程
    }
}


```

基于 waitnotify 的阻塞队列。其原理是通过同步机制和线程通信来处理生产者-消费者问题。在 put 方法中,生产者线程检查队列是否已满,如果已满,则调用 wait 使自己进入等待状态,释放锁,直到队列有空位。生产者在插入元素后调用 notify 唤醒可能等待的消费者线程。在 take 方法中,消费者线程检查队列是否为空,如果为空,则调用 wait 使自己进入等待状态,释放锁,直到队列有新元素。消费者在取出元素后调用 notify 唤醒可能等待的生产者线程。这种机制避免了忙等待,通过有效的线程通信提高了资源利用效率。

Ps:使用 notifyAll 在某些情况下可能更合适,尤其是当有多个生产者和消费者线程时。notifyAll 会唤醒所有等待的线程,而不仅仅是一个线程,这样可以保证系统中的所有线程都有机会被唤醒,避免了因线程唤醒不充分导致的潜在问题。

2.3、等待唤醒机制(Condition)

等待唤醒机制(wait/notify)版本的缺点是随机唤醒容易出现"己方唤醒己方",最终导致全部线程阻塞的乌龙事件,虽然 wait/notifyAll 能解决这个问题,但唤醒全部线程又不够精确,会造成无谓的线程竞争(实际只需要唤醒敌方线程即可)。

因此使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。

作为改进版,可以使用 ReentrantLockCondition 替代 synchronizedwait/notify

```java
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionQueue {
    // 容器,用来装东西
    private final LinkedList queue = new LinkedList<>();
    private final int CAPACITY = 10; // 队列容量

    // 显式锁(相对地,synchronized锁被称为隐式锁)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(T resource) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() >= CAPACITY) {
                // 队列满了,不能再塞东西了,等待消费者取出数据
                System.out.println("生产者:队列已满,无法插入...");
                // 生产者阻塞
                producerCondition.await();
            }
            System.out.println("生产者:插入" + resource + "!!!");
            queue.addFirst(resource);
            // 生产完毕,唤醒消费者
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= 0) {
                // 队列空了,不能再取东西,等待生产者插入数据
                System.out.println("消费者:队列为空,无法取出...");
                // 消费者阻塞
                consumerCondition.await();
            }
            System.out.println("消费者:取出消息!!!");
            queue.removeLast();
            // 消费完毕,唤醒生产者
            producerCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}


```

如何理解 Condition 呢?可以认为 lock.newCondition() 创建了一个队列,调用 producerCondition.await() 会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal() 时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工作。

也就是说,ReentrantLockCondition 通过拆分线程等待队列,让线程的等待唤醒更加精确了,想唤醒哪一方就唤醒哪一方。


3、自定义阻塞队列

基于以上机制,我们可以自定义实现一个简单的阻塞队列。以下代码示例展示了一个基于 wait/notifyAll 实现的阻塞队列:

```java
public class BlockingQueue {
    private final LinkedList queue = new LinkedList<>();
    private int MAX_SIZE = 1;
    private int remainCount = 0;

    public BlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("size最小为1");
        }
        this.MAX_SIZE = capacity;
    }

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= MAX_SIZE) {
            this.wait();
        }
        queue.addFirst(resource);
        remainCount++;
        this.notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.size() <= 0) {
            this.wait();
        }
        T resource = queue.removeLast();
        remainCount--;
        this.notifyAll();
        return resource;
    }
}

```

4、Java 中的 BlockingQueue

BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,继承自 Queue 接口。它提供了额外的阻塞操作,例如在队列为空时等待元素变得可用,或在队列已满时等待空间变得可用。

BlockingQueue 阻塞队列在 Java 中的主要实现有三个:

  1. ArrayBlockingQueue: 基于数组实现的有界阻塞队列,必须指定固定容量,支持可选的公平性策略。
  2. LinkedBlockingQueue: 基于链表实现的阻塞队列,默认无界或指定容量,有较高的插入和删除性能。
  3. SynchronousQueue: 一个没有内部容量的队列,每个插入操作必须等待一个对应的删除操作,反之亦然,适用于直接交换数据的场景。

更多实现可以参考:Java 并发集合:阻塞队列集合介绍

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/6746.html

(0)
LomuLomu
上一篇 2025 年 1 月 17 日 下午10:33
下一篇 2025 年 1 月 17 日 下午11:03

相关推荐

  • 『玩转Streamlit』–集成定时任务

    学习了Streamlit了之后,可以尝试给自己的命令行小工具加一个简单的界面。 本篇总结了我改造自己的数据采集的工具时的一些经验。 1. 概要 与常规的程序相比,数据采集任务的特点很明显,比如它一般都是I/O密集型程序,涉及大量网络请求或文件读写,耗费的时间比较长;而且往往是按照一定的时间间隔周期性地执行。 这样的程序对交互性要求不高,所以我之前都是用命令行…

    2025 年 1 月 16 日
    22700
  • 《重构:改善既有代码的设计(第2版)》PDF、EPUB免费下载

    电子版仅供预览,下载后24小时内务必删除,支持正版,喜欢的请购买正版书籍 点击原文去下载 书籍信息 作者: [美] Martin Fowler出版社: 人民邮电出版社出品方: 异步图书副标题: 改善既有代码的设计原作名: Refactoring: Improving the Design of Existing Code,Second Edition译者: …

    2025 年 1 月 13 日
    13300
  • 【Java 学习】详细讲解—包和导包、Scanner类、输入源

    1. 包 1.1 包的概念 想象一下,你和你的同学们来自不同的家庭,每个家庭都有自己的生活方式和空间。如果这些家庭都住在同一个屋檐下,那么个人的习惯和空间就会相互干扰。同理,在软件开发中,不同的程序也需要有各自的“空间”以保持独立性,这就是包(Package)的作用。包可以被看作是一组文件夹,它们允许在不同的文件夹中存在同名的文件,从而实现隔离。 1.2 包…

    未分类 2024 年 12 月 27 日
    13900
  • 促销系统:促销业务详解

    大家好,我是汤师爷~ 促销活动的核心价值在于利用价格优势吸引贪便宜的消费者。许多用户会积极寻找各类优惠,看到红包或折扣时容易产生购买冲动。 对商家而言,促销是快速清理库存的有效工具。特别是对于季节性商品或临期产品,促销能加快出货速度。同时,促销也能提升销售额,当顾客对商品感兴趣,但因价格犹豫不决时,适当的优惠往往能促使其下单购买。 促销业务概述 什么是促销?…

    2025 年 1 月 10 日
    17100
  • Intellij IDEA 永久激活破解问题汇总

    IDEA最新永久激活破解教程:https://www.it1024doc.com/4100.html 1. 输入激活码后提示:“key is invalid”(无效key),也就是激活码无效 出现上面的情况后,一般都是工具没有生效,造成激活失败,请仔细看教程,是否遗漏了什么步骤。也有小伙伴在尝试重启自己电脑后,激活成功。总之,没有激活成功问题比较多,大家可以…

    未分类 2024 年 6 月 22 日
    26100

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信