【从零开始的JUC并发第四章】:JUC常用工具类
🔥你好我是fengxin_rou这是我的个人主页fengxin_rou的主页
❄️欢迎查看我的专栏我的专栏
《Java后端学习》、《JAVASE基础》、《JUC并发》、《redis》、《JVM虚拟机》、《MYSQL》、《黑马点评》、《rabbitmq》、《JavaWeb+AI的talis学习系统》、《苍穹外卖》
目录
前言
正文
一、显式锁 ReentrantLock 深度解析
1.1 ReentrantLock 与 synchronized 核心区别
1.2 可重入性实现原理
1.3 可中断锁实现原理
1.4 可超时获取锁实现原理
1.5 公平锁与非公平锁实现原理
1.6 实战代码示例
二、ConcurrentHashMap 底层原理与演进
2.1 JDK7 分段锁设计原理
2.2 JDK8 架构演进:CAS + synchronized
2.3 放弃分段锁的深层原因
2.4 扩容机制深度解析
2.5 完整工作流程梳理
三、BlockingQueue 阻塞队列实战
3.1 阻塞队列核心作用
3.2 ArrayBlockingQueue 详解
3.3 LinkedBlockingQueue 详解
3.4 SynchronousQueue 详解
3.5 DelayQueue 原理与延时任务应用
3.6 四者对比与选型建议
四、CompletableFuture 异步编程
4.1 异步编程背景
4.2 核心 API 分类详解
4.3 实战示例:并行调用优化
结语
前言
在多核 CPU 成为主流的今天,并发编程已成为后端开发者必备的核心技能。本文系统梳理 Java 并发编程中的锁机制、并发容器、阻塞队列与异步编程四大核心模块,深入解析底层实现原理,结合实战代码帮助读者掌握高并发场景下的技术选型与避坑策略。
正文
一、显式锁 ReentrantLock 深度解析
1.1 ReentrantLock 与 synchronized 核心区别
synchronized是 Java 原生的隐式锁,基于 JVM 实现,自动完成锁的获取与释放;而ReentrantLock是 JDK 提供的显式锁,基于 AQS(AbstractQueuedSynchronizer)框架实现,需要手动控制加锁与释放。
对比维度 | synchronized | ReentrantLock |
实现层面 | JVM 层面 | JDK API 层面 |
锁释放 | 自动释放 | 必须手动 unlock () |
可中断 | 不支持 | 支持 lockInterruptibly () |
可超时 | 不支持 | 支持 tryLock (timeout) |
公平锁 | 仅非公平 | 支持公平 / 非公平 |
条件变量 | 仅 1 个等待队列 | 支持多个 Condition |
synchronized在 JDK6 后进行了大量优化,包括偏向锁、轻量级锁、自旋锁等,性能已大幅提升。但在需要灵活控制锁行为的场景下,ReentrantLock 仍是首选。
1.2 可重入性实现原理
可重入锁指同一个线程可以多次获取同一把锁而不会产生死锁。ReentrantLock 通过 AQS 的 state 状态变量和 exclusiveOwnerThread 实现可重入。
当线程首次获取锁时,state 从 0 变为 1,并记录当前持有线程;当同一线程再次获取锁时,state 进行累加;释放锁时 state 递减,直到 state 归 0 才真正释放锁。
// ReentrantLock.NonfairSync.tryAcquire()核心逻辑 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 锁空闲,CAS尝试获取 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 同一线程重入,state累加 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }可重入性避免了同一线程反复获取锁导致的死锁,是递归调用场景的必备特性。
1.3 可中断锁实现原理
synchronized获取锁时不可中断,线程会一直阻塞直到获取锁;ReentrantLock 通过lockInterruptibly()方法支持中断响应。
当调用lockInterruptibly()时,如果线程在等待队列中被中断,会直接抛出 InterruptedException,不再继续等待锁。这为取消阻塞操作提供了可能,是实现超时获取锁的基础。
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } // AQS.acquireInterruptibly() public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); // 可中断的等待逻辑 }1.4 可超时获取锁实现原理
tryLock(long timeout, TimeUnit unit)方法支持在指定时间内尝试获取锁,超时则返回 false。其实现基于 LockSupport.parkNanos () 进行限时等待,在等待过程中同时检测中断和超时。
// 超时获取锁使用示例 public boolean tryLockWithTimeout(ReentrantLock lock, long timeoutMs) { try { return lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } }可超时特性在分布式系统中尤为重要,可以有效避免因网络波动导致的线程永久阻塞。
1.5 公平锁与非公平锁实现原理
ReentrantLock 默认采用非公平锁,可通过构造函数参数指定为公平锁:
ReentrantLock fairLock = new ReentrantLock(true); // 公平锁 ReentrantLock unfairLock = new ReentrantLock(false); // 非公平锁(默认)公平锁严格按照线程请求顺序分配锁,新线程必须加入等待队列尾部;非公平锁允许新线程在锁释放时直接尝试抢占,不考虑队列中等待的线程。
公平锁的tryAcquire()会额外检查hasQueuedPredecessors(),确保只有等待队列中没有前驱节点时才尝试获取锁。
非公平锁性能通常优于公平锁(吞吐量高约 30%),但可能产生线程饥饿。公平锁保证了顺序性,但增加了上下文切换开销。
1.6 实战代码示例
/** * ReentrantLock完整使用示例 */ public class ReentrantLockDemo { private final ReentrantLock lock = new ReentrantLock(true); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final Queue<Integer> queue = new LinkedList<>(); private static final int CAPACITY = 10; public void produce(int data) throws InterruptedException { lock.lock(); try { while (queue.size() == CAPACITY) { notFull.await(); // 队列满,生产者等待 } queue.offer(data); notEmpty.signal(); // 唤醒消费者 } finally { lock.unlock(); // 必须在finally中释放锁 } } public int consume() throws InterruptedException { lock.lockInterruptibly(); // 可中断方式获取锁 try { while (queue.isEmpty()) { notEmpty.await(); } int data = queue.poll(); notFull.signal(); return data; } finally { lock.unlock(); } } }注意事项:
unlock()必须放在 finally 块中,防止异常导致锁泄漏使用多个 Condition 可以精确控制唤醒条件,这是 synchronized 不具备的能力
二、ConcurrentHashMap 底层原理与演进
2.1 JDK7 分段锁设计原理
JDK7 的 ConcurrentHashMap 采用 \\分段锁(Segment)\\设计,整个哈希表被拆分为 16 个 Segment 数组,每个 Segment 独立加锁。
核心数据结构:
ConcurrentHashMap └── Segment[] (默认16个) └── HashEntry[] (每个Segment独立的哈希表) └── HashEntry链表Segment 继承自 ReentrantLock,每次 put 操作只锁定对应的 Segment,其他 Segment 的读写不受影响。理论上最大并发度等于 Segment 数量(默认 16)。
但分段锁存在明显缺陷:
并发度固定,无法随数组扩容动态提升
空间浪费,每个 Segment 都需要独立的锁和数据结构
跨段操作(如 size ())需要加锁所有 Segment,性能较差
2.2 JDK8 架构演进:CAS + synchronized
JDK8 彻底放弃分段锁,采用数组 + 链表 + 红黑树结构,与 HashMap 保持一致。锁的粒度从 Segment 级别细化到每个哈希桶(Node)级别。
核心改进:
使用
synchronized替代 ReentrantLock,JVM 对 synchronized 的优化更成熟无锁竞争时使用 CAS 进行无锁化更新
仅在哈希桶发生哈希冲突时才对首节点加锁
链表长度超过 8 时自动转为红黑树,解决哈希碰撞攻击
// JDK8 ConcurrentHashMap.putVal()核心逻辑 final V putVal(K key, V value, boolean onlyIfAbsent) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { int i = (n - 1) & hash; Node<K,V> f = tabAt(tab, i); if (f == null) { // 桶为空,CAS直接插入 if (casTabAt(tab, i, null, new Node(hash, key, value))) break; } else { synchronized (f) { // 仅锁定桶的首节点 if (tabAt(tab, i) == f) { // 双重检查 // 链表或红黑树插入逻辑 } } } } }2.3 放弃分段锁的深层原因
JDK8 放弃分段锁主要基于以下考量:
1. 锁粒度更细
分段锁最小粒度是 Segment,JDK8 最小粒度是哈希桶
并发度随数组容量动态提升,理论最大并发度等于数组长度
2. synchronized 性能优化
JDK6 后 synchronized 引入偏向锁、轻量级锁、自适应自旋
无锁竞争时偏向锁性能优于 ReentrantLock
JVM 可以对 synchronized 进行逃逸分析等深度优化
3. 减少内存开销
消除 Segment 对象的内存占用
数据结构与 HashMap 统一,代码复用性更高
4. 红黑树引入
解决哈希碰撞导致的链表过长问题
极端情况下时间复杂度从 O (n) 降至 O (logn)
2.4 扩容机制深度解析
ConcurrentHashMap 的扩容是并发协作式的,支持多线程共同参与扩容,这是其核心亮点。
扩容触发条件:
元素数量达到阈值(容量 × 加载因子)
单桶链表长度超过 8 但数组容量小于 64
扩容核心流程:
扩容准备:创建 nextTable,容量为原数组 2 倍
扩容标记:sizeCtl 设为负数,标记正在扩容
任务分配:每个线程负责连续的 16 个桶的迁移
并发迁移:
处理完的桶设置为 ForwardingNode
遇到 ForwardingNode 自动跳过或协助扩容
扩容完成:table 指向 nextTable,重置 sizeCtl
// 扩容时的ForwardingNode标记 static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }并发扩容的精妙之处:
读操作遇到 ForwardingNode 会转发到新数组,不阻塞
写操作遇到 ForwardingNode 会主动协助扩容,实现 "多线程帮忙"
迁移过程采用 "复制 + 清除" 方式,保证数据一致性
2.5 完整工作流程梳理
以 put 操作为例,完整执行流程:
计算 key 的哈希值,定位哈希桶位置
如果数组未初始化,CAS 触发初始化
如果目标桶为空,CAS 直接插入新节点
如果遇到 ForwardingNode,协助扩容后重试
否则,对桶首节点加 synchronized 锁
遍历链表或红黑树,key 存在则更新,不存在则追加
链表长度超过 8,触发树化或扩容
释放锁,CAS 更新元素计数,检查是否需要扩容
整个过程中,锁的持有时间极短,大部分操作都是无锁的 CAS 操作,这是 ConcurrentHashMap 高性能的根本原因。
三、BlockingQueue 阻塞队列实战
3.1 阻塞队列核心作用
BlockingQueue是 Java 并发包中最重要的数据结构之一,专门解决生产者 - 消费者模式的线程协作问题。
核心特性:
队列满时,生产者线程自动阻塞,直到有消费者消费
队列空时,消费者线程自动阻塞,直到有生产者生产
所有操作都是线程安全的,内部通过锁和条件变量实现
四种核心操作模式:
操作方式 | 抛出异常 | 返回特殊值 | 阻塞等待 | 超时等待 |
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | - | - |
阻塞队列是线程池的核心组件,也是解耦生产消费速率不匹配的标准解决方案。
3.2 ArrayBlockingQueue 详解
ArrayBlockingQueue是基于数组实现的有界阻塞队列,创建时必须指定容量。
核心特点:
有界队列,容量固定不可扩容
单锁双 Condition 机制(notEmpty + notFull)
支持公平 / 非公平模式
读写共用同一把锁,并发度较低
// ArrayBlockingQueue核心结构 public class ArrayBlockingQueue<E> { final Object[] items; // 存储数组 int takeIndex; // 取元素位置 int putIndex; // 放元素位置 int count; // 元素数量 final ReentrantLock lock; // 单锁 private final Condition notEmpty; private final Condition notFull; }适用场景:队列大小可预估、对内存占用敏感的场景。
3.3 LinkedBlockingQueue 详解
LinkedBlockingQueue是基于链表实现的阻塞队列,默认容量为 Integer.MAX_VALUE。
核心特点:
双锁分离设计(takeLock + putLock),读写互不阻塞
默认无界(实际最大 2^31-1),也可指定容量
吞吐量高于 ArrayBlockingQueue
内存占用相对较高
// LinkedBlockingQueue双锁设计 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();注意:无界模式下如果生产速度远大于消费速度,可能导致 OOM。
适用场景:生产消费速率差异较大、追求高吞吐量的场景。
3.4 SynchronousQueue 详解
SynchronousQueue是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作。
核心特点:
内部没有缓冲区,队列容量始终为 0
支持公平 / 非公平模式
直接传递,不存储元素
是 Executors.newCachedThreadPool () 的默认队列
// SynchronousQueue典型用法 SynchronousQueue<Integer> queue = new SynchronousQueue<>(); // 生产者线程 new Thread(() -> { queue.put(1); // 会阻塞直到有消费者take }).start(); // 消费者线程 new Thread(() -> { queue.take(); // 会阻塞直到有生产者put }).start();适用场景:任务必须立即处理、不允许排队的场景,如 CachedThreadPool。
3.5 DelayQueue 原理与延时任务应用
DelayQueue是支持延时获取元素的无界阻塞队列,元素必须实现 Delayed 接口。
核心原理:
内部基于 PriorityQueue 实现,按过期时间排序
只有当元素的延迟时间到期后才能被取出
队首元素永远是最早过期的元素
// 延时任务元素定义 public class DelayedTask implements Delayed { private final long executeTime; private final Runnable task; public DelayedTask(long delayMs, Runnable task) { this.executeTime = System.currentTimeMillis() + delayMs; this.task = task; } @Override public long getDelay(TimeUnit unit) { return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed other) { return Long.compare(this.executeTime, ((DelayedTask)other).executeTime); } }典型应用场景:
订单超时自动取消
会话超时清理
定时任务调度
缓存过期失效
// 延时队列使用示例 DelayQueue<DelayedTask> delayQueue = new DelayQueue<>(); delayQueue.put(new DelayedTask(30000, () -> System.out.println("30秒后执行"))); delayQueue.put(new DelayedTask(60000, () -> System.out.println("60秒后执行"))); // 消费者线程 while (true) { DelayedTask task = delayQueue.take(); // 阻塞直到任务到期 task.run(); }3.6 四者对比与选型建议
特性 | ArrayBlockingQueue | LinkedBlockingQueue | SynchronousQueue | DelayQueue |
容量 | 有界 | 可选有界 / 无界 | 0 | 无界 |
数据结构 | 数组 | 链表 | 直接传递 | 优先级队列 |
锁机制 | 单锁 | 双锁分离 | CAS | 单锁 |
公平性 | 支持 | 不支持 | 支持 | 不支持 |
吞吐量 | 中等 | 高 | 极高 | 低 |
典型应用 | 固定大小线程池 | 固定大小线程池 | 缓存线程池 | 延时任务 |
选型建议:
需要控制队列大小 → ArrayBlockingQueue
追求高吞吐量 → LinkedBlockingQueue
任务必须立即执行 → SynchronousQueue
需要延时执行 → DelayQueue
四、CompletableFuture 异步编程
4.1 异步编程背景
传统 Future 接口的局限性:
无法手动完成
不支持链式调用
不支持异常处理
无法组合多个 Future
CompletableFuture在 Java8 中引入,实现了 CompletionStage 接口,提供了丰富的异步编程能力,支持函数式编程风格。
4.2 核心 API 分类详解
1. 创建类 API
// 使用默认线程池 CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("异步任务")); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "返回结果"); // 使用自定义线程池(推荐) ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "自定义线程池", executor); // 手动完成 CompletableFuture<String> future4 = new CompletableFuture<>(); future4.complete("手动设置结果"); future4.completeExceptionally(new RuntimeException("手动异常"));注意:默认使用 ForkJoinPool.commonPool (),所有 CompletableFuture 共享,CPU 密集型任务建议使用自定义线程池。
2. 链式转换类 API
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + " World") // 同步转换 .thenApplyAsync(s -> s.toUpperCase()) // 异步转换 .thenAccept(s -> System.out.println(s)) // 消费结果 .thenRun(() -> System.out.println("执行完成")); // 仅执行,不消费结果thenApply:输入 T,输出 U,类似 mapthenAccept:输入 T,无输出,消费型thenRun:无输入无输出,仅执行动作
3. 组合类 API
// AND组合:两个都完成才执行 CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World"); f1.thenCombine(f2, (s1, s2) -> s1 + " " + s2) .thenAccept(System.out::println); // 输出 Hello World // OR组合:任意一个完成就执行 CompletableFuture<String> fast = f1.applyToEither(f2, s -> s + " faster");4. 异常处理 API
CompletableFuture.supplyAsync(() -> { if (true) throw new RuntimeException("出错了"); return "正常"; }) .exceptionally(ex -> { System.out.println("捕获异常: " + ex.getMessage()); return "默认值"; // 异常时返回默认值 }) .handle((result, ex) -> { if (ex != null) { return "处理异常"; } return result; }) .whenComplete((result, ex) -> { // 无论成功失败都会执行,不改变结果 System.out.println("执行完成"); });5. 多任务组合
// 所有任务都完成 CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); // 任意一个任务完成 CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);4.3 实战示例:并行调用优化
/** * 并行调用多个服务,聚合结果 */ public class CompletableFutureDemo { public UserInfo getUserInfo(Long userId) { // 并行调用三个接口 CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId)); CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> orderService.getOrders(userId)); CompletableFuture<List<Address>> addrFuture = CompletableFuture.supplyAsync(() -> addressService.getAddresses(userId)); // 等待所有完成,聚合结果 return CompletableFuture.allOf(userFuture, orderFuture, addrFuture) .thenApply(v -> { UserInfo info = new UserInfo(); info.setUser(userFuture.join()); info.setOrders(orderFuture.join()); info.setAddresses(addrFuture.join()); return info; }) .exceptionally(ex -> { log.error("获取用户信息失败", ex); return null; }) .join(); } }通过 CompletableFuture,原本串行 3 秒的调用可以优化到 1 秒完成,这是微服务架构下的常用优化手段。
结语
本文系统解析了 Java 并发编程的四大核心模块:ReentrantLock 的灵活锁机制、ConcurrentHashMap 的高性能并发设计、BlockingQueue 的生产者 - 消费者模式、CompletableFuture 的异步编程能力。
这些技术是构建高并发系统的基石。建议结合实际项目深入实践,重点关注各组件的适用场景与性能特性,避免在生产环境中出现并发安全问题。进阶可深入研究 AQS 框架、JMM 内存模型与无锁算法。
