Java 之生产者和消费者模式详解

1. 生产者和消费者模式概述

生产者和消费者模式是一种经典的并发设计模式,用于解决生产者和消费者之间数据共享问题。它主要涉及三个角色:

  • 生产者 (Producer) :负责生产数据,例如将数据写入文件、读取数据库数据等。

  • 消费者 (Consumer) :负责消费数据,例如将数据从文件中读取出来、对数据进行处理等。

  • 缓冲区 (Buffer) :用于存放生产者生产的数据,消费者可以从缓冲区中获取数据。

2. 生产者和消费者案例

案例描述:模拟一个仓库管理系统,生产者负责生产商品,消费者负责消费商品。

代码示例:

```java
// 商品类
class Product {
    private String name;
    private int count;

    public Product(String name, int count) {
        this.name = name;
        this.count = count;
    }

    public String getName() {
        return name;
    }

    public int getCount() {
        return count;
    }
}

// 生产者
class Producer implements Runnable {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量

    public Producer(List buffer, int capacity) {
        this.buffer = buffer;
        this.capacity = capacity;
    }

    @Override
    public void run() {
        while (true) {
            // 生产商品
            Product product = new Product("商品", 10);
            // 等待缓冲区有空位
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.size() == capacity) {
                    try {
                        buffer.wait(); // 等待缓冲区有空位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 生产商品入库
                buffer.add(product);
                System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的消费者线程
            }
            try {
                Thread.sleep(1000); // 模拟生产时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final List buffer; // 缓冲区

    public Consumer(List buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            // 等待缓冲区有商品
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.isEmpty()) {
                    try {
                        buffer.wait(); // 等待缓冲区有商品
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 消费商品
                Product product = buffer.remove(0);
                System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的生产者线程
            }
            try {
                Thread.sleep(1000); // 模拟消费时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity);
        Consumer consumer = new Consumer(buffer);
        // 启动线程
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}
```

解释:

  • synchronized 块: synchronized 块用于保证线程安全,确保同一时刻只有一个线程可以访问共享资源 buffer。

  • wait() 方法: 当生产者发现缓冲区已满,或消费者发现缓冲区为空时,它们会调用 wait() 方法进入等待状态。 进入等待状态的线程会释放锁,允许其他线程访问共享资源。

  • notifyAll() 方法: 当生产者生产完商品,或消费者消费完商品后,它们会调用 notifyAll() 方法唤醒所有在 buffer 上等待的线程。 唤醒后,等待线程会尝试再次获取锁,并继续执行。

3. 生产者和消费者案例优化

问题: 上面的案例中,生产者和消费者都使用了 wait()notifyAll() 方法进行同步,可能会导致虚假唤醒问题。 当线程被 notifyAll() 唤醒后,它并不能保证唤醒的原因是缓冲区状态发生了变化。

解决方案:使用条件变量来解决虚假唤醒问题。 条件变量允许线程等待特定条件的满足,避免了虚假唤醒问题。

代码示例:

```java
// 生产者
class Producer {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Producer(List buffer, int capacity, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.capacity = capacity;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void produce() {
        // 生产商品
        Product product = new Product("商品", 10);
        // 等待缓冲区有空位
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.size() == capacity) {
                try {
                    notFull.await(); // 等待缓冲区有空位
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 生产商品入库
            buffer.add(product);
            System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notEmpty.signalAll(); // 唤醒所有等待的消费者线程
        }
    }
}

// 消费者
class Consumer {
    private final List buffer; // 缓冲区
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Consumer(List buffer, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void consume() {
        // 等待缓冲区有商品
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.isEmpty()) {
                try {
                    notEmpty.await(); // 等待缓冲区有商品
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 消费商品
            Product product = buffer.remove(0);
            System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notFull.signalAll(); // 唤醒所有等待的生产者线程
        }
    }
}

public class ProducerConsumerOptimized {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建锁
        Lock lock = new ReentrantLock();
        // 创建条件变量
        Condition notFull = lock.newCondition();
        Condition notEmpty = lock.newCondition();
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity, notFull, notEmpty);
        Consumer consumer = new Consumer(buffer, notFull, notEmpty);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • ReentrantLock 类: ReentrantLock 类提供了更灵活的锁机制,可以替代传统的 synchronized 块。

  • Condition 类: Condition 类用于创建条件变量,它与 ReentrantLock 关联。 每个条件变量对应一个特定的条件,线程可以通过 await() 方法等待该条件满足,并通过 signalAll() 方法唤醒所有等待该条件的线程。

4. 阻塞队列的基本使用

阻塞队列是一种线程安全的队列,它可以实现生产者和消费者之间的同步。 阻塞队列内部使用锁和条件变量来实现线程安全和等待唤醒机制。

代码示例:

```java
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 5 的阻塞队列
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5);
        // 生产者线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 将元素添加到队列
                    queue.put("元素 " + i); // 如果队列已满,则会阻塞直到有空间
                    System.out.println("生产者添加了元素:" + "元素 " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 消费者线程
        new Thread(() -> {
            while (true) {
                try {
                    // 从队列获取元素
                    String element = queue.take(); // 如果队列为空,则会阻塞直到有元素
                    System.out.println("消费者消费了元素:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
```

解释:

  • ArrayBlockingQueue 类: ArrayBlockingQueue 类是一个基于数组实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

5. 阻塞队列实现等待唤醒机制

阻塞队列内部使用锁和条件变量来实现等待唤醒机制,保证线程之间的安全同步。

  • put() 方法 : 当队列已满时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被消费后,消费者会调用 signalAll() 方法,唤醒所有在条件变量上等待的生产者线程。

  • take() 方法 : 当队列为空时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被生产后,生产者会调用 signalAll() 方法,唤醒所有在条件变量上等待的消费者线程。

代码示例:

```java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

// 生产者
class ProducerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ProducerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void produce() {
        while (true) {
            try {
                // 生产商品
                String product = "商品";
                // 将元素添加到队列
                queue.put(product); // 如果队列已满,则会阻塞生产者线程
                System.out.println("生产者生产了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class ConsumerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ConsumerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void consume() {
        while (true) {
            try {
                // 消费商品
                String product = queue.take(); // 如果队列为空,则会阻塞消费者线程
                System.out.println("消费者消费了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class BlockingQueueWaitNotify {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue queue = new LinkedBlockingQueue<>();
        // 创建生产者和消费者线程
        ProducerBlockingQueue producer = new ProducerBlockingQueue(queue);
        ConsumerBlockingQueue consumer = new ConsumerBlockingQueue(queue);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • LinkedBlockingQueue 类: LinkedBlockingQueue 类是一个基于链表实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

总结:

生产者和消费者模式是一种常用的并发设计模式,它可以有效地解决生产者和消费者之间的数据共享问题。 阻塞队列是实现生产者和消费者模式的便捷工具,它可以简化代码编写,提高代码可读性。 通过使用阻塞队列,我们可以避免使用复杂的同步机制,从而降低开发成本和维护成本。希望对各位看官有所帮助,感谢各位看官的观看,下期见,谢谢~

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/5278.html

(0)
LomuLomu
上一篇 2025 年 1 月 6 日 上午12:56
下一篇 2025 年 1 月 6 日 上午1:26

相关推荐

  • Java中的线程安全的集合类(如果想知道Java中有关线程安全的集合类的知识,那么只看这一篇就足够了!)

    前言:在多线程编程领域,确保集合类的线程安全性对于维护数据的一致性和防止并发问题至关重要。Java 提供了一系列线程安全的集合类,它们各自在不同的并发场景下展现出独特的优势和局限。 在深入探讨之前,让我们先概览本文将要覆盖的主要内容: 目录 1.线程安全的集合类概览 2.多线程环境下ArrayList的使用策略 (1)直接操作ArrayList (2)利用C…

    2024 年 12 月 28 日
    14100
  • Java Druid 面试题

    Druid连接池在项目中有哪些优势? 性能优越:Druid采用了高效的连接管理机制,可以快速地创建和回收数据库连接,减少了连接的创建和销毁带来的性能开销。 监控与统计:Druid提供了详细的监控信息,包括连接池的状态、SQL执行的统计信息等,这有助于性能调优和问题诊断。 SQL日志记录:Druid内置了SQL执行日志记录功能,可以记录所有SQL语句的执行情况…

    未分类 2025 年 1 月 11 日
    18000
  • Java 大视界 — Java 开发 Spark 应用:RDD 操作与数据转换(四)

    💖💖💖亲爱的朋友们,热烈欢迎你们来到 青云交的博客 !能与你们在此邂逅,我满心欢喜,深感无比荣幸。在这个瞬息万变的时代,我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的博客 ,正是这样一个温暖美好的所在。在这里,你们不仅能够收获既富有趣味又极为实用的内容知识,还可以毫无拘束地畅所欲言,尽情分享自己独特的见解。我真诚地期待着你们的到来,愿我们能在这片…

    2025 年 1 月 1 日
    18300
  • 微软开源!Office 文档轻松转 Markdown!

    大家好,我是 Java陈序员。 今天,给大家介绍一款微软开源的文档转 Markdown 工具。 关注微信公众号:【Java陈序员】,获取开源项目分享、AI副业分享、超200本经典计算机电子书籍等。 项目介绍 MarkItDown —— 微软开源的 Python 工具,能够将多种常见的文件格式(如 PDF、PowerPoint、Word、Excel、图像、音频…

    2025 年 1 月 14 日
    13400
  • 【2024最新版】Java JDK安装配置全攻略:图文详解

    目录 1. 引言 2. 准备工作 2.1 确定操作系统 2.2 检查系统要求 2.3 下载JDK安装包 3. 安装步骤(以Windows系统为例) 4. 配置环境变量 4.1 jdk配置验证 4.2 配置JAVA_HOME环境变量 4.3 配置Path环境变量 4.4 验证jdk是否配置成功 5. 结语 1. 引言 随着技术的不断发展和更新,Java作为世界…

    2024 年 12 月 28 日
    13900

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信