当前位置: 首页 > news >正文

AQS与ReentrantLock:从排队抢锁到公平与非公平的工程实践——JUC锁机制的基石

大家好,我是程序员小策。

先来几个灵魂拷问热热身:

  • AQS 的全称是什么?它到底是个队列还是个锁?
  • ReentrantLock 和 synchronized 都能加锁,为什么要有两个?
  • 公平锁和非公平锁,差一行代码,性能差几倍?
  • CountDownLatch、Semaphore、ReentrantLock,它们底层居然是同一套代码?
  • AQS 的等待队列是双向链表还是单向链表?入队操作为什么不是原子的?

大部分人能回答前两个,到第三个开始犹豫,到第五个就卡住了。

今天这篇文章就是要把这五个问题一个一个拆开。而且不只是讲 JDK 源码——我会从真实项目出发,让你看到 AQS 在生产代码里到底长什么样。

问题定义:synchronized 够用了,为什么还要 AQS?

Java 已经有synchronized了——关键字一加,锁就有了。那 Doug Lea 为什么还要设计一整套java.util.concurrent.locks包?

因为synchronized有三个硬伤:

  1. 不可中断:线程拿到锁之后,其他等待的线程只能死等,不能被 interrupt 唤醒
  2. 不可超时:没有tryLock(timeout)这种"等一会就放弃"的机制
  3. 不可扩展:你没法基于synchronized做出 CountDownLatch、Semaphore、ReadWriteLock 这些东西

朴素方案:每个功能都从零实现一套等待队列 + 阻塞/唤醒逻辑。

问题:CountDownLatch 需要队列,Semaphore 需要队列,ReentrantLock 也需要队列——它们的排队逻辑几乎一模一样,只是"什么时候算拿到锁"的判断条件不同。

能不能把排队逻辑抽出来,让子类只关心"能不能拿到锁"?

这就是 AQS 的设计动机。

核心概念:AQS 是什么?

AQS(AbstractQueuedSynchronizer):一个用于构建锁和同步器的框架,内部维护一个 volatile int state 和一个 CLH 变体的双向 FIFO 等待队列。子类只需实现"尝试获取/释放 state"的逻辑,排队和阻塞由 AQS 负责。

想象一个高速 ETC 收费站。

收费站有一个核心状态:栏杆是抬起还是放下(对应 AQS 的state)。车辆(线程)到达时,先看栏杆——抬着就直接过(CAS 抢锁成功),放下就排队等候。

排队区是一条有序的车道(CLH 双向队列),先来的车排在前面。当前面的车通过后,栏杆抬起,广播通知下一辆车(unpark唤醒后继节点)。

但这里有个细节:非公平模式下,新来的车可以直接插队抢栏杆——如果刚好栏杆抬着,新到的车可以直接冲过去,不用管后面排了多长的队。公平模式下,新来的车必须先看队列里有没有人在等,有人等就老老实实排到队尾。

翻译回技术语言:

收费站AQS
栏杆状态(抬起/放下)volatile int state
排队车道CLH 双向 FIFO 队列(Node 双向链表)
车辆通过CAS 修改 state 成功
排队等候LockSupport.park()阻塞线程
通知下一辆车LockSupport.unpark()唤醒后继节点
插队抢栏杆非公平锁的 barging 机制

代码实现:从 JDK 源码看 AQS + ReentrantLock

第一层:AQS 的核心骨架

以下代码截取自 OpenJDK AbstractQueuedSynchronizer.java,是 Doug Lea 的原版实现:

publicabstractclassAbstractQueuedSynchronizerextendsAbstractOwnableSynchronizerimplementsjava.io.Serializable{privatevolatileintstate;protectedfinalintgetState(){returnstate;}protectedfinalvoidsetState(intnewState){state=newState;}protectedfinalbooleancompareAndSetState(intexpect,intupdate){returnunsafe.compareAndSwapInt(this,stateOffset,expect,update);}staticfinalclassNode{volatileNodeprev;volatileNodenext;volatileThreadthread;volatileintwaitStatus;}privatetransientvolatileNodehead;privatetransientvolatileNodetail;publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}publicfinalbooleanrelease(intarg){if(tryRelease(arg)){Nodeh=head;if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}protectedbooleantryAcquire(intarg){thrownewUnsupportedOperationException();}protectedbooleantryRelease(intarg){thrownewUnsupportedOperationException();}}

逐段解释:

state是 AQS 的灵魂。对 ReentrantLock 来说,state=0表示无锁,state=1表示被锁定,state>1表示重入次数。对 CountDownLatch 来说,state表示剩余计数。对 Semaphore 来说,state表示剩余许可数。同一个字段,不同语义——这就是模板方法模式的威力。

acquire()是获取锁的入口。它的执行逻辑是:先调tryAcquire()尝试抢锁(子类实现),抢不到就调addWaiter()入队,然后调acquireQueued()在队列里自旋等待。整个流程就是:抢 → 排队 → 等 → 被唤醒 → 再抢

tryAcquire()tryRelease()是留给子类的"钩子"。AQS 不关心"什么算拿到锁",它只负责排队和阻塞。子类决定抢锁逻辑。

第二层:ReentrantLock 的公平与非公平

以下代码截取自 OpenJDK ReentrantLock.java:

publicclassReentrantLockimplementsLock,java.io.Serializable{privatefinalSyncsync;abstractstaticclassSyncextendsAbstractQueuedSynchronizer{abstractbooleaninitialTryLock();finalbooleantryLock(){Threadcurrent=Thread.currentThread();intc=getState();if(c==0){if(compareAndSetState(0,1)){setExclusiveOwnerThread(current);returntrue;}}elseif(getExclusiveOwnerThread()==current){if(++c<0)thrownewError("Maximum lock count exceeded");setState(c);returntrue;}returnfalse;}protectedfinalbooleantryRelease(intreleases){intc=getState()-releases;if(getExclusiveOwnerThread()!=Thread.currentThread())thrownewIllegalMonitorStateException();booleanfree=(c==0);if(free)setExclusiveOwnerThread(null);setState(c);returnfree;}}staticfinalclassNonfairSyncextendsSync{finalbooleaninitialTryLock(){Threadcurrent=Thread.currentThread();if(compareAndSetState(0,1)){// 直接抢,不看队列setExclusiveOwnerThread(current);returntrue;}elseif(getExclusiveOwnerThread()==current){intc=getState()+1;if(c<0)thrownewError("Maximum lock count exceeded");setState(c);returntrue;}returnfalse;}}staticfinalclassFairSyncextendsSync{finalbooleaninitialTryLock(){Threadcurrent=Thread.currentThread();intc=getState();if(c==0){if(!hasQueuedThreads()&&compareAndSetState(0,1)){// 先看队列有没有人setExclusiveOwnerThread(current);returntrue;}}elseif(getExclusiveOwnerThread()==current){intc2=getState()+1;if(c2<0)thrownewError("Maximum lock count exceeded");setState(c2);returntrue;}returnfalse;}}publicReentrantLock(){sync=newNonfairSync();}publicReentrantLock(booleanfair){sync=fair?newFairSync():newNonfairSync();}publicvoidlock(){sync.lock();}publicvoidunlock(){sync.release(1);}}

逐段解释:

公平和非公平的区别,就一行代码。NonfairSync.initialTryLock()直接compareAndSetState(0, 1)抢锁,不管队列里有没有人在等。FairSync.initialTryLock()多了一个!hasQueuedThreads()的判断——队列里有人?对不起,请排队。

重入的实现靠getExclusiveOwnerThread() == current如果当前线程已经持有锁,state直接 +1,不需要再 CAS。释放时state - 1,减到 0 才真正释放锁。这就是"可重入"的全部秘密。

默认构造器创建的是非公平锁。new ReentrantLock()等价于new ReentrantLock(false)。为什么默认非公平?因为吞吐量差距巨大——非公平锁允许新线程"插队"抢锁,减少了线程上下文切换的开销。

第三层:真实项目中的 AQS 子类

AQS 不只是 ReentrantLock 的底层。你日常用的这些类,全都是 AQS 的子类:

AQS state 语义模式
ReentrantLock0=无锁,1+=重入次数独占
ReentrantReadWriteLock高16位=读锁持有数,低16位=写锁重入数共享+独占
CountDownLatch剩余计数,countDown 减到 0 放行共享
Semaphore剩余许可数共享

来看一个真实项目中的例子。以下代码来自 AI-Meeting 项目 的UniversalAiChatHandler.java

importjava.util.concurrent.CountDownLatch;publicvoidstreamToSink(AiPropertiesDOaiProperties,StringuserMessage,List<AiMessageHistoryRespDTO>historyMessages,FluxSink<String>sink,AIContentAccumulatoraccumulator)throwsException{ChatClientchatClient=createChatClient(aiProperties);List<Message>messages=buildMessages(aiProperties,userMessage,historyMessages);CountDownLatchlatch=newCountDownLatch(1);finalThrowable[]streamError=newThrowable[1];chatClient.prompt().messages(messages).stream().chatResponse().subscribe(response->{/* 处理流式响应 */},error->{streamError[0]=error;latch.countDown();},()->{latch.countDown();});latch.await(60,TimeUnit.SECONDS);}

这里CountDownLatch(1)的 state 初始值就是 1。流式响应完成或出错时调countDown(),state 减到 0,AQS 释放所有等待线程。await()内部调的是 AQS 的acquireSharedInterruptibly()

你可能没写过 AQS 的子类,但你每天都在用 AQS。

边界与陷阱:AQS 的三个大坑

看起来很优雅对吧?但 AQS 的坑,比你想的深。

陷阱一:tryAcquire()实现不当导致死锁。AQS 的acquire()方法在tryAcquire()返回 false 后才会入队等待。如果你的tryAcquire()实现有 bug——比如永远返回 false,或者条件判断写反了——线程会永远阻塞在队列里。

后果:死锁,且线程堆栈显示在LockSupport.park()上,你根本看不出是哪里出了问题。

解法:实现tryAcquire()时,确保"能拿到锁"的路径一定存在。对于独占锁,state == 0时必须允许获取。

陷阱二:非公平锁的"饥饿"问题。非公平锁允许新线程插队,在高并发场景下,队列中的线程可能永远抢不到锁——因为总有新线程插队成功。

后果:某些线程长时间得不到执行,响应时间 P99 飙升。

解法:如果业务对响应时间敏感,用公平锁。虽然吞吐量低 10%-30%,但保证了先来先服务。

陷阱三:state溢出。ReentrantLock 的state是 int 类型,最大重入次数是Integer.MAX_VALUE(2147483647)。虽然正常代码不可能重入 21 亿次,但如果你在循环里lock()忘了unlock()……

后果:++c < 0触发Error("Maximum lock count exceeded")

解法:永远在 try-finally 里释放锁。这是铁律,没有例外。

高级考量:从单机 AQS 到分布式锁

AQS 解决的是单机内的线程同步问题。但当你从单机走向分布式,AQS 的等待队列就不够用了——它只能阻塞本 JVM 内的线程,跨进程的锁竞争它管不了。

Redisson 的分布式锁就是 AQS 思想的分布式延伸。以下代码截取自 Redisson RedissonLock.java:

publicclassRedissonLockextendsRedissonBaseLock{<T>RFuture<T>tryLockInnerAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommand<T>command){returnevalWriteSyncedNoRetryAsync(getRawName(),LongCodec.INSTANCE,command,"if ((redis.call('exists', KEYS[1]) == 0) "+"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()),unit.toMillis(leaseTime),getLockName(threadId));}}

注意看这段 Lua 脚本的逻辑:

  1. exists检查锁是否存在 → 对应 AQS 的state == 0
  2. hexists检查是否当前线程持有 → 对应 AQS 的getExclusiveOwnerThread() == current
  3. hincrby重入计数 +1 → 对应 AQS 的setState(c + 1)
  4. pexpire设置过期时间 →这是 AQS 没有的,分布式锁的看门狗机制

Redisson 把 AQS 的 state 从 JVM 内存搬到了 Redis Hash,把 CLH 队列从 JVM 线程队列换成了 Redis Pub/Sub + Semaphore。思想一样,存储介质变了。

但分布式锁引入了 AQS 不存在的问题:锁过期(看门狗续期)、网络分区(脑裂)、Redis 主从切换(锁丢失)。这些是单机 AQS 永远不需要考虑的。

对比表格

特性synchronizedReentrantLock(非公平)ReentrantLock(公平)
实现层面JVM 内置(monitorenter 字节码)AQS + CAS + CLH 队列AQS + CAS + CLH 队列
可中断不可lockInterruptibly()lockInterruptibly()
超时获取不可tryLock(timeout)tryLock(timeout)
公平性非公平非公平(默认)公平(!hasQueuedThreads()
条件变量一个(wait/notify)多个(Condition)多个(Condition)
可重入是(state 递增)是(state 递增)
吞吐量低(比非公平低 10%-30%)
适用场景简单同步、代码块级需要超时/中断/多条件严格先来先服务

一句话:简单同步用 synchronized,需要高级特性用 ReentrantLock,对公平性有要求用公平锁。

面试追问

面试追问 1:AQS 的 CLH 队列入队操作为什么不是原子的?addWaiter()里先 CAS 设置 tail,再设置 prev.next,中间如果挂了怎么办?
→ 回答方向:入队分两步——CAS 设置 tail 指向新节点,然后pred.next = node设置前驱的 next。第二步不是原子的,但即使失败了,其他线程可以从 tail 向前遍历 prev 链找到所有节点。AQS 的unparkSuccessor()在找不到 next 时,就是从 tail 反向遍历的。

面试追问 2:ReentrantLock 的tryLock()不遵守公平性设置,为什么?这是 bug 吗?
→ 回答方向:不是 bug,是有意设计。JDK 文档明确写了:tryLock()会"插队"——只要锁空闲就立即获取,不管队列里有没有线程在等。理由是tryLock()通常用于避免死锁的试探性获取,如果强制排队反而可能导致活锁。如果需要遵守公平性,用tryLock(0, TimeUnit.SECONDS)

面试追问 3:AQS 的 state 为什么是 int 而不是 long?如果需要 64 位的 state 怎么办?
→ 回答方向:Doug Lea 在设计时选择了 int,因为 CAS 操作在 32 位和 64 位 JVM 上都对 int 有原生支持,且对于锁计数、信号量许可数等场景,int 的范围足够。如果需要 64 位 state,可以参考java.util.concurrent.locks.StampedLock的实现——它用额外的 long 字段配合 Unsafe 的 CAS 操作。

面试追问 4:CountDownLatch 和 CyclicBarrier 都能实现"等待多个线程完成",它们在 AQS 层面的本质区别是什么?
→ 回答方向:CountDownLatch 基于 AQS 的共享模式,state是计数器,只能减不能增,一次性使用。CyclicBarrier 不基于 AQS,它内部用ReentrantLock + Condition实现循环等待,可以重复使用。本质区别:CountDownLatch 是"一个线程等 N 个事件",CyclicBarrier 是"N 个线程互相等"。

总结

AQS 不是锁,是造锁的模具——state 是锁芯,CLH 队列是锁体,tryAcquire/tryRelease 是钥匙孔的形状。

读完这篇你应该能:画出 AQS 的 acquire/release 完整流程图、解释 ReentrantLock 公平锁和非公平锁的那一行代码差异、说出 CountDownLatch 和 Semaphore 在 AQS 层面的 state 语义区别、理解 Redisson 分布式锁是对 AQS 思想的分布式延伸。

下次面试官问"AQS 是什么",别只说"一个队列"——告诉他:AQS 是 Doug Lea 用一个 volatile int 和一个双向链表,造出了整个 JUC 锁生态的基石。

http://www.jsqmd.com/news/886274/

相关文章:

  • 2026年抖音视频去水印最新方法:6种方案实测,这4款小程序一步到位 - 科技热点发布
  • Unity体素雾效VFM2:原理、性能与交互式雾气实现
  • 【DeepSeek注释生成优化实战指南】:20年AI工程师权威拆解3大瓶颈与5步提效法
  • 别再死磕USB HID了!用ESP32的Arduino框架手把手教你实现蓝牙鼠标键盘(附完整代码)
  • 【仅限首批内测开发者访问】Sora 2.1 Beta MOV导出API密钥激活路径曝光:3天后关闭权限窗口
  • 小红书视频怎么下载到手机里?实测6种方法,这4款小程序2026年依然免费好用 - 科技热点发布
  • 6款实用AI智能降重工具 合规程度拉满
  • Java开发转型AI大模型工程师:收藏这份心法+实战项目,轻松上手!
  • 北光恒电:安捷伦N5182B信号源 开机异常、自检报错、输出异常故障排查
  • 【限时解密】Midjourney内部模糊权重矩阵(.json配置文件级干预),仅剩最后83个白名单访问名额
  • Hindsight测试策略:单元测试、集成测试和端到端测试
  • Dramatron终极指南:如何用AI快速创作专业剧本的3种简单方法
  • 收藏干货|2026 版企业 AI 落地实操指南,程序员小白入门避坑必备
  • 2026实测:视频号保存视频到相册最全攻略,这4款微信小程序一步到位 - 科技热点发布
  • 二值响应假设检验:临界值精确构造与多重检验控制方法
  • 利用Cursor AI编程 两小时实现 基于Spring AI 2.0的带智能客服的商城系统(带在线支付功能)
  • 如何快速上手CANdevStudio:10分钟完成CAN总线仿真环境搭建
  • C#一维数组
  • 终极Chrome画中画扩展:如何在浏览器中实现高效视频多任务处理
  • 猫抓浏览器扩展:构建高效流媒体资源嗅探与下载的终极解决方案
  • 13-3 节点流(或文件流)
  • 单片机毕业设计——基于STM32智能温室控制系统设计与实现 要怎么设计与实现呢(全程可免费指导)
  • 为什么你的Claude集成测试总在凌晨报警?揭秘3类隐性上下文泄漏缺陷及4种防御型断言设计
  • 智慧树课程自动化脚本终极指南:从零到精通的全方位解析
  • 基于遗传算法-支持向量机的粗糙度加工工艺参数选择附Matlab代码
  • 【独家首发】Midjourney噪点强度量化模型(NOISE-Index™ v1.2):基于12,847组测试图谱建立的PSNR/SSIM/Perceptual Noise三维评估体系
  • MoveIt2完整指南:从零开始掌握ROS 2机器人运动规划的终极教程
  • 微信聊天记录取证与备份:从EnMicroMsg.db解密到完整导出实战指南
  • 漏洞修复窗口正在关闭,DeepSeek辅助扫描的72小时响应黄金法则,你掌握了吗?
  • Unity战斗角色资源包深度解析:动画事件与状态机工程实践