Java 之生产者和消费者模式详解

1. 生产者和消费者模式概述

生产者和消费者模式是一种经典的并发设计模式,用于解决生产者和消费者之间数据共享问题。它主要涉及三个角色:

  • 生产者 (Producer) :负责生产数据,例如将数据写入文件、读取数据库数据等。

  • 消费者 (Consumer) :负责消费数据,例如将数据从文件中读取出来、对数据进行处理等。

  • 缓冲区 (Buffer) :用于存放生产者生产的数据,消费者可以从缓冲区中获取数据。

2. 生产者和消费者案例

案例描述:模拟一个仓库管理系统,生产者负责生产商品,消费者负责消费商品。

代码示例:

```java
// 商品类
class Product {
    private String name;
    private int count;

    public Product(String name, int count) {
        this.name = name;
        this.count = count;
    }

    public String getName() {
        return name;
    }

    public int getCount() {
        return count;
    }
}

// 生产者
class Producer implements Runnable {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量

    public Producer(List buffer, int capacity) {
        this.buffer = buffer;
        this.capacity = capacity;
    }

    @Override
    public void run() {
        while (true) {
            // 生产商品
            Product product = new Product("商品", 10);
            // 等待缓冲区有空位
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.size() == capacity) {
                    try {
                        buffer.wait(); // 等待缓冲区有空位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 生产商品入库
                buffer.add(product);
                System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的消费者线程
            }
            try {
                Thread.sleep(1000); // 模拟生产时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final List buffer; // 缓冲区

    public Consumer(List buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            // 等待缓冲区有商品
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.isEmpty()) {
                    try {
                        buffer.wait(); // 等待缓冲区有商品
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 消费商品
                Product product = buffer.remove(0);
                System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的生产者线程
            }
            try {
                Thread.sleep(1000); // 模拟消费时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity);
        Consumer consumer = new Consumer(buffer);
        // 启动线程
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}
```

解释:

  • synchronized 块: synchronized 块用于保证线程安全,确保同一时刻只有一个线程可以访问共享资源 buffer。

  • wait() 方法: 当生产者发现缓冲区已满,或消费者发现缓冲区为空时,它们会调用 wait() 方法进入等待状态。 进入等待状态的线程会释放锁,允许其他线程访问共享资源。

  • notifyAll() 方法: 当生产者生产完商品,或消费者消费完商品后,它们会调用 notifyAll() 方法唤醒所有在 buffer 上等待的线程。 唤醒后,等待线程会尝试再次获取锁,并继续执行。

3. 生产者和消费者案例优化

问题: 上面的案例中,生产者和消费者都使用了 wait()notifyAll() 方法进行同步,可能会导致虚假唤醒问题。 当线程被 notifyAll() 唤醒后,它并不能保证唤醒的原因是缓冲区状态发生了变化。

解决方案:使用条件变量来解决虚假唤醒问题。 条件变量允许线程等待特定条件的满足,避免了虚假唤醒问题。

代码示例:

```java
// 生产者
class Producer {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Producer(List buffer, int capacity, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.capacity = capacity;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void produce() {
        // 生产商品
        Product product = new Product("商品", 10);
        // 等待缓冲区有空位
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.size() == capacity) {
                try {
                    notFull.await(); // 等待缓冲区有空位
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 生产商品入库
            buffer.add(product);
            System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notEmpty.signalAll(); // 唤醒所有等待的消费者线程
        }
    }
}

// 消费者
class Consumer {
    private final List buffer; // 缓冲区
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Consumer(List buffer, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void consume() {
        // 等待缓冲区有商品
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.isEmpty()) {
                try {
                    notEmpty.await(); // 等待缓冲区有商品
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 消费商品
            Product product = buffer.remove(0);
            System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notFull.signalAll(); // 唤醒所有等待的生产者线程
        }
    }
}

public class ProducerConsumerOptimized {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建锁
        Lock lock = new ReentrantLock();
        // 创建条件变量
        Condition notFull = lock.newCondition();
        Condition notEmpty = lock.newCondition();
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity, notFull, notEmpty);
        Consumer consumer = new Consumer(buffer, notFull, notEmpty);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • ReentrantLock 类: ReentrantLock 类提供了更灵活的锁机制,可以替代传统的 synchronized 块。

  • Condition 类: Condition 类用于创建条件变量,它与 ReentrantLock 关联。 每个条件变量对应一个特定的条件,线程可以通过 await() 方法等待该条件满足,并通过 signalAll() 方法唤醒所有等待该条件的线程。

4. 阻塞队列的基本使用

阻塞队列是一种线程安全的队列,它可以实现生产者和消费者之间的同步。 阻塞队列内部使用锁和条件变量来实现线程安全和等待唤醒机制。

代码示例:

```java
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 5 的阻塞队列
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5);
        // 生产者线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 将元素添加到队列
                    queue.put("元素 " + i); // 如果队列已满,则会阻塞直到有空间
                    System.out.println("生产者添加了元素:" + "元素 " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 消费者线程
        new Thread(() -> {
            while (true) {
                try {
                    // 从队列获取元素
                    String element = queue.take(); // 如果队列为空,则会阻塞直到有元素
                    System.out.println("消费者消费了元素:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
```

解释:

  • ArrayBlockingQueue 类: ArrayBlockingQueue 类是一个基于数组实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

5. 阻塞队列实现等待唤醒机制

阻塞队列内部使用锁和条件变量来实现等待唤醒机制,保证线程之间的安全同步。

  • put() 方法 : 当队列已满时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被消费后,消费者会调用 signalAll() 方法,唤醒所有在条件变量上等待的生产者线程。

  • take() 方法 : 当队列为空时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被生产后,生产者会调用 signalAll() 方法,唤醒所有在条件变量上等待的消费者线程。

代码示例:

```java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

// 生产者
class ProducerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ProducerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void produce() {
        while (true) {
            try {
                // 生产商品
                String product = "商品";
                // 将元素添加到队列
                queue.put(product); // 如果队列已满,则会阻塞生产者线程
                System.out.println("生产者生产了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class ConsumerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ConsumerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void consume() {
        while (true) {
            try {
                // 消费商品
                String product = queue.take(); // 如果队列为空,则会阻塞消费者线程
                System.out.println("消费者消费了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class BlockingQueueWaitNotify {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue queue = new LinkedBlockingQueue<>();
        // 创建生产者和消费者线程
        ProducerBlockingQueue producer = new ProducerBlockingQueue(queue);
        ConsumerBlockingQueue consumer = new ConsumerBlockingQueue(queue);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • LinkedBlockingQueue 类: LinkedBlockingQueue 类是一个基于链表实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

总结:

生产者和消费者模式是一种常用的并发设计模式,它可以有效地解决生产者和消费者之间的数据共享问题。 阻塞队列是实现生产者和消费者模式的便捷工具,它可以简化代码编写,提高代码可读性。 通过使用阻塞队列,我们可以避免使用复杂的同步机制,从而降低开发成本和维护成本。希望对各位看官有所帮助,感谢各位看官的观看,下期见,谢谢~

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/5278.html

(0)
LomuLomu
上一篇 2025 年 1 月 6 日 上午12:56
下一篇 2025 年 1 月 6 日 上午1:26

相关推荐

  • 《深入理解Mybatis原理》Mybatis插件机制&分页机制原理

    源码分析 插件机制 首先我们看下MyBatis拦截器的接口定义: “`java public interface Interceptor { Object intercept(Invocation invocation) throws Throwable; Object plugin(Object target); void setProperties(P…

    未分类 2025 年 1 月 10 日
    36300
  • 库存系统:仓库层、调度层、销售层的库存数据模型设计

    大家好,我是汤师爷~ 让我们一起深入挖掘库存概念模型的设计精髓,这不仅是构建库存管理系统的基石,更是确保库存数据精准和一致性的核心所在。 库存的数据模型设计 下图展示了库存概念模型的设计概览。通过精心设计的概念模型,我们能够有效支撑库存管理的多元化业务需求。 仓库层 仓库层是商品库存存放和管理的实际场所,承担着具体的仓储操作任务。它涵盖了企业自建仓库、第三方…

    2024 年 12 月 24 日
    50000
  • 扣子又出新功能,支持一键部署小程序,太强了!!

    大家好,我是R哥。 作为一名程序员和技术博主,我一直关注如何使用工具提升生产力,尤其是在内容创作和应用开发领域。 拿我开发一个微信小程序为例,我需要懂前端、后端、运维 等全栈技术,开发流程和技术栈复杂,我还需要购买云服务器、云数据库 等各种基础设施,资源耗费非常多。 虽然现在有如 Cursor 这样的革命性 AI 开发工具,它突破了传统开发模式的壁垒,非开发…

    2025 年 1 月 13 日
    28400
  • 2025最新IDEA激活码免费领+永久破解教程|IDEA破解一键搞定

    本方案对 JetBrains 全家桶(IDEA、PyCharm、DataGrip、Goland 等)全部有效,亲测可用! 先放一张最新版 IDEA 的激活截图镇楼——直接飙到 2099 年,爽翻! 下面用图文手把手教学,把 IDEA 一口气激活到 2099 年;老版本同样照此操作即可。 Windows / macOS / Linux 全覆盖,步骤与文件都已打…

    未分类 2025 年 11 月 12 日
    74400
  • Java技术新视野——Java实时大数据处理赋能车联网协同驾驶的实践探索(197)

    ✨亲爱的技术爱好者们,诚挚欢迎访问【云端科技驿站】!在这个数字化浪潮奔涌的时代,我们致力于打造一个融合创新技术与深度思考的知识分享平台。这里不仅有前沿的技术解析,更期待您带来独到见解,让我们携手在科技海洋中扬帆远航!✨全网平台统一标识:云端科技驿站一、加入【技术精英圈】快速通道1:【云端技术交流圈】快速通道2:【CSDN技术创作营】二、核心专栏推荐:1. 【…

    2025 年 5 月 13 日
    29000

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信