Java 之生产者和消费者模式详解

1. 生产者和消费者模式概述

生产者和消费者模式是一种经典的并发设计模式,用于解决生产者和消费者之间数据共享问题。它主要涉及三个角色:

  • 生产者 (Producer) :负责生产数据,例如将数据写入文件、读取数据库数据等。

  • 消费者 (Consumer) :负责消费数据,例如将数据从文件中读取出来、对数据进行处理等。

  • 缓冲区 (Buffer) :用于存放生产者生产的数据,消费者可以从缓冲区中获取数据。

2. 生产者和消费者案例

案例描述:模拟一个仓库管理系统,生产者负责生产商品,消费者负责消费商品。

代码示例:

```java
// 商品类
class Product {
    private String name;
    private int count;

    public Product(String name, int count) {
        this.name = name;
        this.count = count;
    }

    public String getName() {
        return name;
    }

    public int getCount() {
        return count;
    }
}

// 生产者
class Producer implements Runnable {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量

    public Producer(List buffer, int capacity) {
        this.buffer = buffer;
        this.capacity = capacity;
    }

    @Override
    public void run() {
        while (true) {
            // 生产商品
            Product product = new Product("商品", 10);
            // 等待缓冲区有空位
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.size() == capacity) {
                    try {
                        buffer.wait(); // 等待缓冲区有空位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 生产商品入库
                buffer.add(product);
                System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的消费者线程
            }
            try {
                Thread.sleep(1000); // 模拟生产时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final List buffer; // 缓冲区

    public Consumer(List buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            // 等待缓冲区有商品
            synchronized (buffer) { // 使用同步块保证线程安全
                while (buffer.isEmpty()) {
                    try {
                        buffer.wait(); // 等待缓冲区有商品
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 消费商品
                Product product = buffer.remove(0);
                System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
                buffer.notifyAll(); // 唤醒所有等待的生产者线程
            }
            try {
                Thread.sleep(1000); // 模拟消费时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity);
        Consumer consumer = new Consumer(buffer);
        // 启动线程
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}
```

解释:

  • synchronized 块: synchronized 块用于保证线程安全,确保同一时刻只有一个线程可以访问共享资源 buffer。

  • wait() 方法: 当生产者发现缓冲区已满,或消费者发现缓冲区为空时,它们会调用 wait() 方法进入等待状态。 进入等待状态的线程会释放锁,允许其他线程访问共享资源。

  • notifyAll() 方法: 当生产者生产完商品,或消费者消费完商品后,它们会调用 notifyAll() 方法唤醒所有在 buffer 上等待的线程。 唤醒后,等待线程会尝试再次获取锁,并继续执行。

3. 生产者和消费者案例优化

问题: 上面的案例中,生产者和消费者都使用了 wait()notifyAll() 方法进行同步,可能会导致虚假唤醒问题。 当线程被 notifyAll() 唤醒后,它并不能保证唤醒的原因是缓冲区状态发生了变化。

解决方案:使用条件变量来解决虚假唤醒问题。 条件变量允许线程等待特定条件的满足,避免了虚假唤醒问题。

代码示例:

```java
// 生产者
class Producer {
    private final List buffer; // 缓冲区
    private final int capacity; // 缓冲区容量
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Producer(List buffer, int capacity, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.capacity = capacity;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void produce() {
        // 生产商品
        Product product = new Product("商品", 10);
        // 等待缓冲区有空位
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.size() == capacity) {
                try {
                    notFull.await(); // 等待缓冲区有空位
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 生产商品入库
            buffer.add(product);
            System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notEmpty.signalAll(); // 唤醒所有等待的消费者线程
        }
    }
}

// 消费者
class Consumer {
    private final List buffer; // 缓冲区
    private final Condition notFull; // 缓冲区未满条件
    private final Condition notEmpty; // 缓冲区非空条件

    public Consumer(List buffer, Condition notFull, Condition notEmpty) {
        this.buffer = buffer;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
    }

    public void consume() {
        // 等待缓冲区有商品
        synchronized (buffer) { // 使用同步块保证线程安全
            while (buffer.isEmpty()) {
                try {
                    notEmpty.await(); // 等待缓冲区有商品
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 消费商品
            Product product = buffer.remove(0);
            System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
            notFull.signalAll(); // 唤醒所有等待的生产者线程
        }
    }
}

public class ProducerConsumerOptimized {
    public static void main(String[] args) {
        // 初始化缓冲区
        List buffer = new ArrayList<>();
        // 设置缓冲区容量
        int capacity = 5;
        // 创建锁
        Lock lock = new ReentrantLock();
        // 创建条件变量
        Condition notFull = lock.newCondition();
        Condition notEmpty = lock.newCondition();
        // 创建生产者和消费者线程
        Producer producer = new Producer(buffer, capacity, notFull, notEmpty);
        Consumer consumer = new Consumer(buffer, notFull, notEmpty);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • ReentrantLock 类: ReentrantLock 类提供了更灵活的锁机制,可以替代传统的 synchronized 块。

  • Condition 类: Condition 类用于创建条件变量,它与 ReentrantLock 关联。 每个条件变量对应一个特定的条件,线程可以通过 await() 方法等待该条件满足,并通过 signalAll() 方法唤醒所有等待该条件的线程。

4. 阻塞队列的基本使用

阻塞队列是一种线程安全的队列,它可以实现生产者和消费者之间的同步。 阻塞队列内部使用锁和条件变量来实现线程安全和等待唤醒机制。

代码示例:

```java
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 5 的阻塞队列
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5);
        // 生产者线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 将元素添加到队列
                    queue.put("元素 " + i); // 如果队列已满,则会阻塞直到有空间
                    System.out.println("生产者添加了元素:" + "元素 " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 消费者线程
        new Thread(() -> {
            while (true) {
                try {
                    // 从队列获取元素
                    String element = queue.take(); // 如果队列为空,则会阻塞直到有元素
                    System.out.println("消费者消费了元素:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
```

解释:

  • ArrayBlockingQueue 类: ArrayBlockingQueue 类是一个基于数组实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

5. 阻塞队列实现等待唤醒机制

阻塞队列内部使用锁和条件变量来实现等待唤醒机制,保证线程之间的安全同步。

  • put() 方法 : 当队列已满时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被消费后,消费者会调用 signalAll() 方法,唤醒所有在条件变量上等待的生产者线程。

  • take() 方法 : 当队列为空时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被生产后,生产者会调用 signalAll() 方法,唤醒所有在条件变量上等待的消费者线程。

代码示例:

```java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

// 生产者
class ProducerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ProducerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void produce() {
        while (true) {
            try {
                // 生产商品
                String product = "商品";
                // 将元素添加到队列
                queue.put(product); // 如果队列已满,则会阻塞生产者线程
                System.out.println("生产者生产了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class ConsumerBlockingQueue {
    private final BlockingQueue queue; // 阻塞队列

    public ConsumerBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    public void consume() {
        while (true) {
            try {
                // 消费商品
                String product = queue.take(); // 如果队列为空,则会阻塞消费者线程
                System.out.println("消费者消费了商品:" + product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class BlockingQueueWaitNotify {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue queue = new LinkedBlockingQueue<>();
        // 创建生产者和消费者线程
        ProducerBlockingQueue producer = new ProducerBlockingQueue(queue);
        ConsumerBlockingQueue consumer = new ConsumerBlockingQueue(queue);
        // 启动线程
        new Thread(producer::produce).start();
        new Thread(consumer::consume).start();
    }
}
```

解释:

  • LinkedBlockingQueue 类: LinkedBlockingQueue 类是一个基于链表实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。

  • put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。

  • take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。

总结:

生产者和消费者模式是一种常用的并发设计模式,它可以有效地解决生产者和消费者之间的数据共享问题。 阻塞队列是实现生产者和消费者模式的便捷工具,它可以简化代码编写,提高代码可读性。 通过使用阻塞队列,我们可以避免使用复杂的同步机制,从而降低开发成本和维护成本。希望对各位看官有所帮助,感谢各位看官的观看,下期见,谢谢~

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/5278.html

(0)
LomuLomu
上一篇 2025 年 1 月 6 日 上午12:56
下一篇 2025 年 1 月 6 日 上午1:26

相关推荐

  • 【永久激活】IDEA 2024激活破解保姆级教程,附激活码+工具,亲测可用

    IntelliJ IDEA 是 Java 编程语言的集成开发环境,被公认为最好的 Java 开发工具之一。本文分享通过脚本免费激活 IDEA 等 Jetbrains 全家桶工具,支持 2021 以上的版本包括最新版本。 一、下载并安装 IDEA 大家可以直接在 JetBrains 官网下载最新版本的 IDEA。安装步骤非常简单,按照提示一步一步进行即可。 二…

    未分类 2024 年 6 月 23 日
    2.0K00
  • JavaScript 延迟加载的方法( 7种 )

    JavaScript脚本的延迟加载(也称为懒加载)是指在网页的主要内容已经加载并显示给用户之后,再加载或执行额外的JavaScript代码。这样做可以加快页面的初始加载速度,改善用户体验,并减少服务器的压力。 以下是几种常见的延迟加载JavaScript的方法: defer 属性: 使用 async 属性: async 属性告诉浏览器立即开始下载脚本,并且在…

    2025 年 1 月 5 日
    50500
  • Python 调整Excel行高、列宽

    在Excel中,默认的行高和列宽可能不足以完全显示某些单元格中的内容,特别是当内容较长时。通过调整行高和列宽,可以确保所有数据都能完整显示,避免内容被截断。合理的行高和列宽可以使表格看起来更加整洁和专业,尤其是在包含大量数据的情况下。 本文将介绍如何通过Python调整Excel的行高列宽、或设置自适应行高列宽 。 Python Excel库 要通过Pyth…

    2024 年 12 月 24 日
    42600
  • 思维导图xmind如何安装?附安装包

    前言 大家好,我是小徐啊。我们在Java开发中,有时候是需要用到思维导图的,这可以帮助我们更好的理清思路,提高开发的效率。而说到思维导图,最有名的就是xmind了,它的功能十分强大,几乎是思维导图里面最强大的那一个。但是,默认只能使用初级功能,高级功能需要额外再开通,今天小徐就来介绍下如何安装xmind以及升级,让我们可以使用pro的功能。文末附获取方式。 …

    2025 年 1 月 13 日
    51800
  • 交易系统:应用层、领域层分层架构设计

    大家好,我是汤师爷~ 线上线下交易系统的应用架构包括终端、应用层、领域层和关联系统。 应用层能力 应用层定义软件的应用功能,负责接收用户请求、协调领域层执行任务并返回结果。主要包括以下模块: 1)C端服务模块 为消费者提供完整的交易链路功能,包括加购、下单、支付、结算、拆单、确认收货和退货退款等。 2)商家后台 为商家提供全面的订单管理功能,包括订单操作、搜…

    2024 年 12 月 28 日
    54500

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信