如何理解 Java 中的阻塞队列:从基础到高级的深度解析

提到阻塞队列,许多人脑海中会浮现出 BlockingQueueArrayBlockingQueueLinkedBlockingQueueSynchronousQueue。尽管这些实现看起来复杂,实际上阻塞队列本身的概念相对简单,真正挑战在于内部的 AQS(Abstract Queuing Synchronizer)。如果你对阻塞队列感到陌生,希望下面的内容能帮助你从全新角度理解它。


文章目录


1、线程间通信

线程间通信是指多个线程对共享资源的操作和协调。在生产者-消费者模型中,生产者和消费者是不同种类的线程,他们对同一个资源(如队列)进行操作。生产者负责向队列中插入数据,消费者负责从队列中取出数据。

主要挑战在于如何在资源达到上限时让生产者等待,而在资源达到下限时让消费者等待。线程间的这种相互调度,就是线程间通信。

以现实生活为例。消费者和生产者就像两个线程,原本做着各自的事情,厂家管自己生产,消费者管自己买,一般情况下彼此互不影响。900 240

image-20240808015130401

但当物资到达某个临界点时,就需要根据供需关系适当作出调整。比如,当厂家做了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费。

image-20240808015015146

同理,当消费者发现某个热销商品售罄,应该提醒厂家尽快生产。

image-20240808015356312

在上面的案例中,生产者和消费者是不同种类的线程,一个负责存入,另一个负责取出,且它们操作的是同一个资源。但最难的部分在于:资源到达上限时,生产者等待,消费者消费;资源达到下限时,生产者生产,消费者等待。

我们可以发现,原本互不打扰的两个线程之间开始了 “沟通”:

  • 生产者:做的商品太多了,应该扩大宣传,让大家来买。
  • 消费者:都卖完啦,应当提醒商家尽快补货。

这种线程间的相互调度,也就是线程间通信。


2、线程间通信的实现

实现线程间通信的方式有多种:

  • 轮询:生产者和消费者线程通过循环不断检查队列的状态。这种方法简单,但会消耗大量 CPU 资源,且无法保证原子性。
  • 等待唤醒机制(wait/notify):通过 waitnotify 机制,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,但 notify 可能导致线程竞争不均。
  • 等待唤醒机制(Condition):使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。
2.1、轮询

设计理念:生产者和消费者线程通过循环不断检查队列的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,否则一律 sleep 等待。

image-20240808015823555

代码实现:

```java
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

/**
 * 自定义阻塞队列实现:轮询版本
 * 
 * @param  队列中存储的元素类型
 */
public class WhileQueue {
    // 用来存储元素的容器
    private final LinkedList queue = new LinkedList<>();
    // 队列的最大容量
    private final int MAX_SIZE = 1;

    /**
     * 将元素添加到队列中
     * 
     * @param resource 要插入的元素
     * @throws InterruptedException 如果当前线程被中断
     */
    public void put(T resource) throws InterruptedException {
        // 如果队列满了,生产者线程将进入轮询等待状态
        while (queue.size() >= MAX_SIZE) {
            System.out.println("生产者:队列已满,无法插入...");
            TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试
        }
        // 插入元素到队列的前面
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
    }

    /**
     * 从队列中取出元素
     * 
     * @throws InterruptedException 如果当前线程被中断
     */
    public void take() throws InterruptedException {
        // 如果队列为空,消费者线程将进入轮询等待状态
        while (queue.size() <= 0) {
            System.out.println("消费者:队列为空,无法取出...");
            TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试
        }
        // 从队列的末尾取出元素
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        TimeUnit.MILLISECONDS.sleep(5000); // 模拟消费操作需要时间
    }
}

```

测试:

```java
/**
 * 测试类:创建生产者和消费者线程来测试WhileQueue的功能
 */
public class Test {
    public static void main(String[] args) {
        // 创建一个WhileQueue实例
        WhileQueue queue = new WhileQueue<>();

        // 创建并启动生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("消息" + i); // 插入消息到队列
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // 捕获并打印中断异常
                    }
                }
            }
        }).start();

        // 创建并启动消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take(); // 从队列中取出消息
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // 捕获并打印中断异常
                    }
                }
            }
        }).start();
    }
}

```

由于设定了队列最多只能存1个消息,所以只有当队列为空时,生产者才能插入数据。这是最简单的线程间通信:多个线程不断轮询共享资源,通过共享资源的状态判断自己下一步该做什么。

但上面的实现方式存在一些缺点:

  • 轮询的方式太耗费 CPU 资源,如果线程过多,比如几百上千个线程同时在那轮询,会给 CPU 带来较大负担
  • 无法保证原子性(代码里没有演示,但理论上确实如此,如果生产者的操作非原子性,消费者极可能获取到脏数据)
2.2、等待唤醒机制(wait/notify)

相对而言,等待唤醒机制则要优雅得多,底层维护线程队列,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,同时避免了过多线程同时自旋造成的 CPU 资源浪费,颇有点用空间换时间的味道。

当一个生产者线程无法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放 CPU 资源,等到消费者抢到 CPU 执行权并取出数据后,再由消费者唤醒生产者继续生产。

Java 有多种方式可以实现等待唤醒机制,最经典的就是通过 waitnotify 的方式:

```java
import java.util.LinkedList;

/**
 * 自定义阻塞队列实现:使用 wait/notify
 * 
 * @param  队列中存储的元素类型
 */
public class WaitNotifyQueue {
    // 用来存储元素的容器
    private final LinkedList queue = new LinkedList<>();
    // 队列的最大容量
    private final int MAX_SIZE = 1;

    /**
     * 将元素添加到队列中
     * 
     * @param resource 要插入的元素
     * @throws InterruptedException 如果当前线程被中断
     */
    public synchronized void put(T resource) throws InterruptedException {
        // 当队列满时,生产者线程进入等待状态
        while (queue.size() >= MAX_SIZE) {
            System.out.println("生产者:队列已满,无法插入...");
            this.wait(); // 释放锁,并进入等待状态
        }
        // 插入元素到队列的前面
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
        this.notify(); // 唤醒等待的消费者线程
    }

    /**
     * 从队列中取出元素
     * 
     * @throws InterruptedException 如果当前线程被中断
     */
    public synchronized void take() throws InterruptedException {
        // 当队列为空时,消费者线程进入等待状态
        while (queue.size() <= 0) {
            System.out.println("消费者:队列为空,无法取出...");
            this.wait(); // 释放锁,并进入等待状态
        }
        // 从队列的末尾取出元素
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        this.notify(); // 唤醒等待的生产者线程
    }
}


```

基于 waitnotify 的阻塞队列。其原理是通过同步机制和线程通信来处理生产者-消费者问题。在 put 方法中,生产者线程检查队列是否已满,如果已满,则调用 wait 使自己进入等待状态,释放锁,直到队列有空位。生产者在插入元素后调用 notify 唤醒可能等待的消费者线程。在 take 方法中,消费者线程检查队列是否为空,如果为空,则调用 wait 使自己进入等待状态,释放锁,直到队列有新元素。消费者在取出元素后调用 notify 唤醒可能等待的生产者线程。这种机制避免了忙等待,通过有效的线程通信提高了资源利用效率。

Ps:使用 notifyAll 在某些情况下可能更合适,尤其是当有多个生产者和消费者线程时。notifyAll 会唤醒所有等待的线程,而不仅仅是一个线程,这样可以保证系统中的所有线程都有机会被唤醒,避免了因线程唤醒不充分导致的潜在问题。

2.3、等待唤醒机制(Condition)

等待唤醒机制(wait/notify)版本的缺点是随机唤醒容易出现"己方唤醒己方",最终导致全部线程阻塞的乌龙事件,虽然 wait/notifyAll 能解决这个问题,但唤醒全部线程又不够精确,会造成无谓的线程竞争(实际只需要唤醒敌方线程即可)。

因此使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。

作为改进版,可以使用 ReentrantLockCondition 替代 synchronizedwait/notify

```java
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionQueue {
    // 容器,用来装东西
    private final LinkedList queue = new LinkedList<>();
    private final int CAPACITY = 10; // 队列容量

    // 显式锁(相对地,synchronized锁被称为隐式锁)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(T resource) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() >= CAPACITY) {
                // 队列满了,不能再塞东西了,等待消费者取出数据
                System.out.println("生产者:队列已满,无法插入...");
                // 生产者阻塞
                producerCondition.await();
            }
            System.out.println("生产者:插入" + resource + "!!!");
            queue.addFirst(resource);
            // 生产完毕,唤醒消费者
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= 0) {
                // 队列空了,不能再取东西,等待生产者插入数据
                System.out.println("消费者:队列为空,无法取出...");
                // 消费者阻塞
                consumerCondition.await();
            }
            System.out.println("消费者:取出消息!!!");
            queue.removeLast();
            // 消费完毕,唤醒生产者
            producerCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}


```

如何理解 Condition 呢?可以认为 lock.newCondition() 创建了一个队列,调用 producerCondition.await() 会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal() 时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工作。

也就是说,ReentrantLockCondition 通过拆分线程等待队列,让线程的等待唤醒更加精确了,想唤醒哪一方就唤醒哪一方。


3、自定义阻塞队列

基于以上机制,我们可以自定义实现一个简单的阻塞队列。以下代码示例展示了一个基于 wait/notifyAll 实现的阻塞队列:

```java
public class BlockingQueue {
    private final LinkedList queue = new LinkedList<>();
    private int MAX_SIZE = 1;
    private int remainCount = 0;

    public BlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("size最小为1");
        }
        this.MAX_SIZE = capacity;
    }

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= MAX_SIZE) {
            this.wait();
        }
        queue.addFirst(resource);
        remainCount++;
        this.notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.size() <= 0) {
            this.wait();
        }
        T resource = queue.removeLast();
        remainCount--;
        this.notifyAll();
        return resource;
    }
}

```

4、Java 中的 BlockingQueue

BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,继承自 Queue 接口。它提供了额外的阻塞操作,例如在队列为空时等待元素变得可用,或在队列已满时等待空间变得可用。

BlockingQueue 阻塞队列在 Java 中的主要实现有三个:

  1. ArrayBlockingQueue: 基于数组实现的有界阻塞队列,必须指定固定容量,支持可选的公平性策略。
  2. LinkedBlockingQueue: 基于链表实现的阻塞队列,默认无界或指定容量,有较高的插入和删除性能。
  3. SynchronousQueue: 一个没有内部容量的队列,每个插入操作必须等待一个对应的删除操作,反之亦然,适用于直接交换数据的场景。

更多实现可以参考:Java 并发集合:阻塞队列集合介绍

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/6746.html

(0)
LomuLomu
上一篇 2025 年 1 月 17 日 下午10:33
下一篇 2025 年 1 月 17 日 下午11:03

相关推荐

  • Nginx HttpHeader增加几个关键的安全选项

    在面对德勤等专业渗透测试(Pentest)的场景时,为了确保网站的安全性并顺利通过严格的安全审查,对这些安全头部配置进行精细化和专业化的调整是至关重要的。 以下是对每个选项的详细建议以及设置值的说明: 1. Strict-Transport-Security (HSTS) 这一策略确保所有通信都通过HTTPS进行,并防止降级攻击。 推荐值: add_head…

    未分类 2024 年 12 月 24 日
    32200
  • Java MDC技术详解:实现高效日志链路追踪的实战指南

    一、MDC技术概述 MDC(映射诊断上下文)是SLF4J日志框架提供的一项线程级数据存储功能。作为线程安全的容器,它能够在特定线程范围内保存多组键值数据,并自动将这些信息嵌入日志记录中,从而实现日志信息的上下文关联。 二、MDC的核心价值 功能 描述 应用实例 请求追踪 完整记录请求处理路径 微服务调用链路分析 参数传递 跨方法共享通用数据 机构编码、用户标…

    未分类 2025 年 5 月 19 日
    7300
  • Java 同步锁性能的最佳实践:从理论到实践的完整指南

    目录 一、同步锁性能分析 (一)性能验证说明 1. 使用同步锁的代码示例 2. 不使用同步锁的代码示例 3. 结果与讨论 (二)案例初步优化分析说明 1. 使用AtomicInteger原子类尝试优化分析 2. 对AtomicInteger原子类进一步优化 3. 结论说明(LongAdder原理理解体会) 二、回顾Java锁优化 (一)synchronize…

    2025 年 1 月 11 日
    43500
  • Java刷题训练第一期

    个人主页:手握风云 专栏:Java刷题训练营 1. 字符转ASCII码 问题描述:BoBo教KiKi字符常量或字符变量表示的字符在内存中以ASCII码形式存储。BoBo出了一个问题给KiKi,输入一个字符,输出该字符相应的ASCII码。 输入描述:一行,一个字符。 输出描述:一行,输出输入字符对应的ASCII码。 算法分析:在Java当中,没有针对字符的输入…

    2024 年 12 月 31 日
    27100
  • Java刷题常见的集合类,各种函数的使用以及常见的类型转化等等

    目录 前言 集合类 ArrayList 1. 创建和初始化 ArrayList 2.添加元素 add 3.获取元素 get 4.删除元素 remove 5.检查元素 6.遍历 ArrayList LinkedList Stack 1. 创建Stack对象 2. 压入元素 (push) 3. 弹出元素 (pop) 4. 查看栈顶元素 (peek) 5. 检查栈…

    2025 年 1 月 6 日
    27200

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信