当前位置: 首页 > news >正文

[JAVA探索之路]带你手写多线程实现生产者-消费者模型

目录

引言

一、什么是生产者-消费者模型

二、为什么多线程下会有问题

1. 数据不安全

2. 仓库满了还继续放

3. 仓库空了还继续取

三、实现思路

1. 定义一个仓库类 Buffer

2. 提供两个方法

3. 加锁保证线程安全

4. 条件不满足时让线程等待

5. 条件变化后唤醒其他线程

四、代码实现

五、代码详细解析

1. 为什么要有 Buffer

2. 为什么 put() 要加 synchronized

3. 为什么仓库满了要 wait()

4. 为什么仓库空了也要 wait()

5. 为什么要用 while,不能用 if

6. 为什么要用 notifyAll(),而不是 notify()

六、总结


引言

在 Java 并发编程里,生产者-消费者模型是一个非常经典的问题。很多人第一次接触它时,会觉得概念有点绕:什么是生产者,什么是消费者,为什么还要加一个缓冲区,为什么要 wait()、notifyAll()?

其实把它想简单一点,它就是一个“放东西”和“拿东西”的过程。

比如:

  • 生产者负责往仓库里放货
  • 消费者负责从仓库里拿货
  • 仓库有容量限制,不能无限放
  • 如果仓库空了,消费者就得等
  • 如果仓库满了,生产者就得等

这就是生产者-消费者模型的核心思想。

这篇文章我们不使用 BlockingQueue 这样的现成工具,而是用最基础的synchronized + wait() + notifyAll()手写一个多线程版的生产者-消费者模型。这样做的目的是为了真正理解 Java 线程通信的底层思路。


一、什么是生产者-消费者模型

生产者-消费者模型,简单来说,就是把“生产数据”和“消费数据”这两件事拆开,中间通过一个共享缓冲区来连接。

它通常包含 3 个角色:

  • 生产者:负责生成数据,放到缓冲区里
  • 消费者:负责从缓冲区中取出数据并处理
  • 缓冲区:用于存放生产者产生的数据

这样设计的好处是“解耦”。

也就是说,生产者不需要关心消费者处理得快不快,消费者也不需要关心生产者什么时候生成数据。双方只需要和缓冲区打交道就行。

现实开发中,这种模型非常常见,比如:

  • 消息队列
  • 日志异步写入
  • 任务调度系统
  • 订单处理系统
  • 请求削峰填谷

二、为什么多线程下会有问题

如果只有一个线程,这件事非常简单。

但是一旦变成多个线程,问题就来了。

假设有多个生产者线程和多个消费者线程同时操作同一个仓库,如果不加任何控制,就可能出现下面这些问题:

1. 数据不安全

多个线程同时修改同一个集合,可能会导致数据错乱。

2. 仓库满了还继续放

如果仓库容量只有 5,但生产者不检查容量,就可能一直往里塞数据。

3. 仓库空了还继续取

如果仓库里已经没有数据了,消费者还去取,就会报错或者取到错误结果。

所以,多线程环境下我们必须解决两个关键问题:

  • 线程同步:保证同一时刻只有一个线程能操作共享资源
  • 线程通信:当条件不满足时,线程要等待;条件满足后,线程要被唤醒

三、实现思路

我们可以把这个问题拆成下面几步:

1. 定义一个仓库类 Buffer

仓库内部维护一个有界队列,用来保存数据。

2. 提供两个方法

  • put():生产者往仓库放数据
  • take():消费者从仓库取数据

3. 加锁保证线程安全

使用 synchronized 保证同一时刻只能有一个线程进入 put() 或 take()。

4. 条件不满足时让线程等待

  • 仓库满了,生产者调用 wait()
  • 仓库空了,消费者调用 wait()

5. 条件变化后唤醒其他线程

每次生产或消费完成后,调用 notifyAll() 唤醒其他正在等待的线程。


四、代码实现

下面我们先来看完整代码,然后再一段一段解释。

import java.util.LinkedList; import java.util.Queue; class Buffer { private final Queue<Integer> queue = new LinkedList<>(); private final int capacity; public Buffer(int capacity) { this.capacity = capacity; } public synchronized void put(int value) throws InterruptedException { while (queue.size() == capacity) { System.out.println(Thread.currentThread().getName() + " 发现仓库已满,进入等待"); wait(); } queue.offer(value); System.out.println(Thread.currentThread().getName() + " 生产了:" + value + ",当前库存:" + queue.size()); notifyAll(); } public synchronized int take() throws InterruptedException { while (queue.isEmpty()) { System.out.println(Thread.currentThread().getName() + " 发现仓库为空,进入等待"); wait(); } int value = queue.poll(); System.out.println(Thread.currentThread().getName() + " 消费了:" + value + ",当前库存:" + queue.size()); notifyAll(); return value; } } class Producer implements Runnable { private final Buffer buffer; public Producer(Buffer buffer) { this.buffer = buffer; } @Override public void run() { int value = 1; while (true) { try { buffer.put(value++); Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } } class Consumer implements Runnable { private final Buffer buffer; public Consumer(Buffer buffer) { this.buffer = buffer; } @Override public void run() { while (true) { try { buffer.take(); Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } } public class ProducerConsumerDemo { public static void main(String[] args) { Buffer buffer = new Buffer(5); Thread producer1 = new Thread(new Producer(buffer), "生产者1"); Thread producer2 = new Thread(new Producer(buffer), "生产者2"); Thread consumer1 = new Thread(new Consumer(buffer), "消费者1"); Thread consumer2 = new Thread(new Consumer(buffer), "消费者2"); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); } }

五、代码详细解析

1. 为什么要有 Buffer

Buffer 就是共享仓库,所有生产者和消费者都操作它。

它里面最关键的两个成员是:

private final Queue<Integer> queue = new LinkedList<>(); private final int capacity;
  • queue 用来存放数据
  • capacity 表示仓库最大容量

这里我们用了 LinkedList 来模拟队列,因为它支持先进先出,比较符合“先生产先消费”的场景。


2. 为什么 put() 要加 synchronized

public synchronized void put(int value) throws InterruptedException

加上 synchronized 后,表示同一时刻只能有一个线程进入这个方法。

如果不加锁,多个生产者线程可能同时往队列里加数据,这样很容易出现线程安全问题。

同理,take() 也必须加锁。


3. 为什么仓库满了要 wait()

来看这段代码:

while (queue.size() == capacity) { System.out.println(Thread.currentThread().getName() + " 发现仓库已满,进入等待"); wait(); }

意思是:

  • 如果仓库已经满了
  • 当前生产者就不要再生产了
  • 直接进入等待状态

等到消费者消费掉一些数据后,仓库有空间了,再继续生产。

这里的 wait() 不是“傻等”,而是把当前线程挂起,同时释放锁,让其他线程有机会进入同步方法。

这一点非常重要。

如果线程等待时不释放锁,那消费者就永远进不来,仓库也永远腾不出空间,程序就卡死了。


4. 为什么仓库空了也要 wait()

消费者的逻辑是一样的:

while (queue.isEmpty()) { System.out.println(Thread.currentThread().getName() + " 发现仓库为空,进入等待"); wait(); }

如果仓库里没有数据,消费者就不能硬取,只能等生产者先放进去。


5. 为什么要用 while,不能用 if

这是面试里特别爱问的一点。

很多人第一次写会这样写:

if (queue.size() == capacity) { wait(); }

看起来好像没问题,但实际上不够安全。

原因是:线程被唤醒之后,不代表条件一定满足。

比如:

  • 某个生产者被唤醒了
  • 但它重新拿到锁时,仓库可能又已经满了

所以,线程醒来后必须再次检查条件。

这就是为什么要用 while:

一句话总结:

if 只检查一次,while 会反复检查,更安全。


6. 为什么要用 notifyAll(),而不是 notify()

每次生产或消费完成后,我们都调用了:

notifyAll();

这表示唤醒所有在这个对象上等待的线程。

为什么不用 notify()?

因为 notify() 只随机唤醒一个线程,可能会唤醒“错误的人”。

举个例子:

  • 仓库已经满了
  • 现在有多个生产者在等待,也有多个消费者在等待
  • 如果这时你用 notify(),结果唤醒了另一个生产者
  • 但生产者醒来后发现仓库还是满的,又继续等待
  • 真正应该被唤醒的消费者却没醒

这样程序效率会很差,甚至可能出现“假死”现象。

所以在这种场景下,notifyAll() 更稳妥。

虽然它的唤醒范围更大一些,但逻辑更安全,也更容易写对。


六、总结

生产者-消费者模型本质上并不复杂,它就是一个“仓库调度问题”:

  • 生产者往仓库里放数据
  • 消费者从仓库里拿数据
  • 仓库满了,生产者等待
  • 仓库空了,消费者等待

在 Java 中,我们可以通过 synchronized + wait() + notifyAll() 手写实现这一套机制。

你只要真正理解下面这几句话,这个模型就算掌握了:

  • synchronized 负责加锁,保证线程安全
  • wait() 负责等待,并释放锁
  • notifyAll() 负责唤醒等待线程
  • 条件判断要用 while,不能只用 if

制作不易,如果对你有帮助请点赞评论收藏感谢大家的支持

http://www.jsqmd.com/news/588272/

相关文章:

  • C++的std--ranges算法并行执行数据竞争检测
  • 第06章langchain之向量化和向量数据库
  • 实战指南:基于快马AI构建企业级域名监控与故障切换管理平台
  • 找到一种方法:用LM Studio 和 llmster 可以把qwen3.5改成nothinking版本装载来提高响应速度
  • 别再找了,这应该是目前最好用的翻译插件了。
  • TongWeb8.0支持JBoss Weld‌
  • 基于单片机的水产养殖饲料自动投喂系统
  • NCMDump解密指南:三步解锁网易云音乐加密文件的终极方案
  • 嘿,今天来跟大家聊聊基于Copula多元互相关的随机场边坡模型。这模型可有意思啦,它在边坡稳定性研究这块有着独特的魅力
  • 第6章 Mosquitto用户认证与访问控制
  • 【自动驾驶技术解析】端到端架构与感知规控演进全景(2025–2026)
  • Node.js 类
  • Java 小白必看:MySQL 主从延迟是什么?怎么排查?怎么彻底解决?
  • 全球GPU算力荒背景下,主流算力平台价格与服务对比分析
  • Ace Data Cloud:使用 SERP API 获取 Google 搜索结果
  • Go语言的context.WithCancel中的协调资源
  • 面对 AI 热潮,企业最值得优先落地的5个业务场景
  • 国密GB35114+国标GB28181平台EasyGBS双重加持筑牢雪亮工程坚实安全底座
  • 我做了一个能连微信、家电、汽车和 AI 的超级管家:Wanny
  • 25、CSP、SRI、HttpOnly、SameSite、Secure 一次讲透
  • 基于Matlab的Dijkstra算法与蚁群优化算法路径规划
  • 快马AI助力:十分钟用openclaw搭建你的第一个网页爬虫原型
  • 测评 ASR 歌词生成模型
  • ComfyUI-VideoHelperSuite视频工作流技术指南:从基础操作到专业应用
  • COMSOL随机裂隙双重介质注浆数值模拟代做
  • 在线监测助力变电站隐蔽沉降灾害“早发现、早处置”
  • 题解:[JOI Final 2026] JOI 之旅 2 / JOI Tour 2
  • DirectX Repair增强版:免安装便携设计的系统维护利器
  • 快马平台十分钟速成:基于yolov8的目标检测web应用原型搭建
  • WarcraftHelper:让经典魔兽争霸3在现代电脑上完美运行的终极解决方案