Java 之生产者和消费者模式详解

1. 生产者和消费者模式概述

生产者和消费者模式是一种经典的并发设计模式,用于解决生产者和消费者之间数据共享问题。它主要涉及三个角色:

  • 生产者 (Producer) :负责生产数据,例如将数据写入文件、读取数据库数据等。

  • 消费者 (Consumer) :负责消费数据,例如将数据从文件中读取出来、对数据进行处理等。

  • 缓冲区 (Buffer) :用于存放生产者生产的数据,消费者可以从缓冲区中获取数据。

2. 生产者和消费者案例

案例描述:模拟一个仓库管理系统,生产者负责生产商品,消费者负责消费商品。

代码示例:

```java
// 商品类
class Product {
    private String name;
    private int count;

    public Product(String name, int count) {
        this.name = name;
        this.count = count;
    }

    public String getName() {
        return name;
    }

    public int getCount() {
        return count;
    }
}

// 生产者
class Producer implements Runnable {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量

    public Producer(List buffer, int capacity) {
        this.buffer = buffer;
        this.capacity = capacity;
    }

    @Override
    public void run() {
        while (true) {
            // 生产商品
            Product product = new Product("商品", 10);
            // 等待缓冲区有空位
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.size() == capacity) {
                    try {
                        buffer.wait(); // 等待缓冲区有空位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 生产商品入库
                buffer.add(product);
                System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的消费者线程
            }
            try {
                Thread.sleep(1000); // 模拟生产时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final List buffer; // 缓冲区

    public Consumer(List buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            // 等待缓冲区有商品
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.isEmpty()) {
                    try {
                        buffer.wait(); // 等待缓冲区有商品
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 消费商品
                Product product = buffer.remove(0);
                System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的生产者线程
            }
            try {
                Thread.sleep(1000); // 模拟消费时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity);
        Consumer consumer = new Consumer(buffer);
        // 启动线程
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}
```

解释:

  • synchronized 块: synchronized 块用于保证线程安全,确保同一时刻只有一个线程可以访问共享资源 buffer。

  • wait() 方法: 当生产者发现缓冲区已满,或消费者发现缓冲区为空时,它们会调用 wait() 方法进入等待状态。 进入等待状态的线程会释放锁,允许其他线程访问共享资源。

  • notifyAll() 方法: 当生产者生产完商品,或消费者消费完商品后,它们会调用 notifyAll() 方法唤醒所有在 buffer 上等待的线程。 唤醒后,等待线程会尝试再次获取锁,并继续执行。

3. 生产者和消费者案例优化

问题: 上面的案例中,生产者和消费者都使用了 wait()notifyAll() 方法进行同步,可能会导致虚假唤醒问题。 当线程被 notifyAll() 唤醒后,它并不能保证唤醒的原因是缓冲区状态发生了变化。

解决方案:使用条件变量来解决虚假唤醒问题。 条件变量允许线程等待特定条件的满足,避免了虚假唤醒问题。

代码示例:

```java
// 生产者
class Producer {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Producer(List buffer, int capacity, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.capacity = capacity;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void produce() {
        // 生产商品
        Product product = new Product("商品", 10);
        // 等待缓冲区有空位
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.size() == capacity) {
                try {
                    notFull.await(); // 等待缓冲区有空位
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 生产商品入库
            buffer.add(product);
            System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notEmpty.signalAll(); // 唤醒所有等待的消费者线程
        }
    }
}

// 消费者
class Consumer {
    private final List buffer; // 缓冲区
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Consumer(List buffer, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void consume() {
        // 等待缓冲区有商品
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.isEmpty()) {
                try {
                    notEmpty.await(); // 等待缓冲区有商品
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 消费商品
            Product product = buffer.remove(0);
            System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notFull.signalAll(); // 唤醒所有等待的生产者线程
        }
    }
}

public class ProducerConsumerOptimized {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建锁
        Lock lock = new ReentrantLock();
        // 创建条件变量
        Condition notFull = lock.newCondition();
        Condition notEmpty = lock.newCondition();
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity, notFull, notEmpty);
        Consumer consumer = new Consumer(buffer, notFull, notEmpty);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • ReentrantLock 类: ReentrantLock 类提供了更灵活的锁机制,可以替代传统的 synchronized 块。

  • Condition 类: Condition 类用于创建条件变量,它与 ReentrantLock 关联。 每个条件变量对应一个特定的条件,线程可以通过 await() 方法等待该条件满足,并通过 signalAll() 方法唤醒所有等待该条件的线程。

4. 阻塞队列的基本使用

阻塞队列是一种线程安全的队列,它可以实现生产者和消费者之间的同步。 阻塞队列内部使用锁和条件变量来实现线程安全和等待唤醒机制。

代码示例:

```java
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 5 的阻塞队列
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5);
        // 生产者线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 将元素添加到队列
                    queue.put("元素 " + i); // 如果队列已满,则会阻塞直到有空间
                    System.out.println("生产者添加了元素:" + "元素 " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 消费者线程
        new Thread(() -> {
            while (true) {
                try {
                    // 从队列获取元素
                    String element = queue.take(); // 如果队列为空,则会阻塞直到有元素
                    System.out.println("消费者消费了元素:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
```

解释:

  • ArrayBlockingQueue 类: ArrayBlockingQueue 类是一个基于数组实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

5. 阻塞队列实现等待唤醒机制

阻塞队列内部使用锁和条件变量来实现等待唤醒机制,保证线程之间的安全同步。

  • put() 方法 : 当队列已满时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被消费后,消费者会调用 signalAll() 方法,唤醒所有在条件变量上等待的生产者线程。

  • take() 方法 : 当队列为空时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被生产后,生产者会调用 signalAll() 方法,唤醒所有在条件变量上等待的消费者线程。

代码示例:

```java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

// 生产者
class ProducerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ProducerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void produce() {
        while (true) {
            try {
                // 生产商品
                String product = "商品";
                // 将元素添加到队列
                queue.put(product); // 如果队列已满,则会阻塞生产者线程
                System.out.println("生产者生产了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class ConsumerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ConsumerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void consume() {
        while (true) {
            try {
                // 消费商品
                String product = queue.take(); // 如果队列为空,则会阻塞消费者线程
                System.out.println("消费者消费了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class BlockingQueueWaitNotify {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue queue = new LinkedBlockingQueue<>();
        // 创建生产者和消费者线程
        ProducerBlockingQueue producer = new ProducerBlockingQueue(queue);
        ConsumerBlockingQueue consumer = new ConsumerBlockingQueue(queue);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • LinkedBlockingQueue 类: LinkedBlockingQueue 类是一个基于链表实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

总结:

生产者和消费者模式是一种常用的并发设计模式,它可以有效地解决生产者和消费者之间的数据共享问题。 阻塞队列是实现生产者和消费者模式的便捷工具,它可以简化代码编写,提高代码可读性。 通过使用阻塞队列,我们可以避免使用复杂的同步机制,从而降低开发成本和维护成本。希望对各位看官有所帮助,感谢各位看官的观看,下期见,谢谢~

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/5278.html

(0)
LomuLomu
上一篇 2025 年 1 月 6 日 上午12:56
下一篇 2025 年 1 月 6 日 上午1:26

相关推荐

  • 永久有效的IDEA激活破解教程(2024亲测有效!)

    【永久启用】IDEA 2024.1.2 完备激活指南:配有验证激活码与工具 IntelliJ IDEA 是一款前沿的 Java 集成开发环境,广泛认为是顶级的 Java 工具之一。这篇文章将指导您如何利用脚本来免费激活 IDEA 和整个 Jetbrains 工具套件,适用于 2021 年及之后的版本,包括最新版。 安装 IntelliJ IDEA 您可以从 …

    未分类 2024 年 7 月 10 日
    8.8K00
  • 基于Redis有序集合实现滑动窗口限流

    滑动窗口算法是一种基于时间窗口的限流算法,它将时间划分为若干个固定大小的窗口,每个窗口内记录了该时间段内的请求次数。通过动态地滑动窗口,可以动态调整限流的速率,以应对不同的流量变化。 整个限流可以概括为两个主要步骤: 统计窗口内的请求数量 应用限流规则 Redis有序集合每个value有一个score(分数),基于score我们可以定义一个时间窗口,然后每次…

    2024 年 12 月 31 日
    44400
  • PostgreSQL 数据库的启动与停止管理

    title: PostgreSQL 数据库的启动与停止管理date: 2024/12/28updated: 2024/12/28author: cmdragon excerpt:作为一个强大的开源关系数据库管理系统,PostgreSQL在众多应用场景中发挥着关键作用。在实际使用过程中,对于数据库的启动和停止操作至关重要。这不仅关系到数据库的正常运行,也直接影…

    2025 年 1 月 1 日
    30200
  • 一文搞懂架构设计的衡量标准:功能性、可用性、性能、可扩展性、安全性、协作效率、复杂度、成本效益

    大家好,我是汤师爷~ 架构设计的首要目标是服务于业务需求。因此,我们不应该盲目追求所谓的”最厉害的”架构,而应该致力于寻找最适合当前业务环境和未来发展需求的架构方案。 衡量架构的合理性是一个复杂的过程,需要从多个角度进行全面评估。主要可以从以下视角进行分析: 功能需求视角:评估架构是否有效支撑当前业务需求,并具有充分的灵活性以适应未来业务发展。 非功能需求视…

    未分类 2025 年 1 月 15 日
    49000
  • 阿里一面:那我把线程池coreSize配置成0会怎样?

    写在前面 设想一下,在我们的项目中存在一个边缘的业务流程,它并不频繁地被触发。在设计线程池时,我回想起了线程池的常见配置原则。为了最大限度地节省资源,我将核心线程数(corePoolSize)设置为0。这样的配置是否能够顺利执行任务呢? 线程池配置原则回顾 在任务提交时,线程池会根据以下策略进行处理: 如果线程池中的线程数少于核心线程数,则创建一个新线程来执…

    2024 年 12 月 26 日
    51200

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信