AQS与ReentrantLock:从排队抢锁到公平与非公平的工程实践——JUC锁机制的基石
大家好,我是程序员小策。
先来几个灵魂拷问热热身:
- AQS 的全称是什么?它到底是个队列还是个锁?
- ReentrantLock 和 synchronized 都能加锁,为什么要有两个?
- 公平锁和非公平锁,差一行代码,性能差几倍?
- CountDownLatch、Semaphore、ReentrantLock,它们底层居然是同一套代码?
- AQS 的等待队列是双向链表还是单向链表?入队操作为什么不是原子的?
大部分人能回答前两个,到第三个开始犹豫,到第五个就卡住了。
今天这篇文章就是要把这五个问题一个一个拆开。而且不只是讲 JDK 源码——我会从真实项目出发,让你看到 AQS 在生产代码里到底长什么样。
问题定义:synchronized 够用了,为什么还要 AQS?
Java 已经有synchronized了——关键字一加,锁就有了。那 Doug Lea 为什么还要设计一整套java.util.concurrent.locks包?
因为synchronized有三个硬伤:
- 不可中断:线程拿到锁之后,其他等待的线程只能死等,不能被 interrupt 唤醒
- 不可超时:没有
tryLock(timeout)这种"等一会就放弃"的机制 - 不可扩展:你没法基于
synchronized做出 CountDownLatch、Semaphore、ReadWriteLock 这些东西
朴素方案:每个功能都从零实现一套等待队列 + 阻塞/唤醒逻辑。
问题:CountDownLatch 需要队列,Semaphore 需要队列,ReentrantLock 也需要队列——它们的排队逻辑几乎一模一样,只是"什么时候算拿到锁"的判断条件不同。
能不能把排队逻辑抽出来,让子类只关心"能不能拿到锁"?
这就是 AQS 的设计动机。
核心概念:AQS 是什么?
AQS(AbstractQueuedSynchronizer):一个用于构建锁和同步器的框架,内部维护一个 volatile int state 和一个 CLH 变体的双向 FIFO 等待队列。子类只需实现"尝试获取/释放 state"的逻辑,排队和阻塞由 AQS 负责。
想象一个高速 ETC 收费站。
收费站有一个核心状态:栏杆是抬起还是放下(对应 AQS 的state)。车辆(线程)到达时,先看栏杆——抬着就直接过(CAS 抢锁成功),放下就排队等候。
排队区是一条有序的车道(CLH 双向队列),先来的车排在前面。当前面的车通过后,栏杆抬起,广播通知下一辆车(unpark唤醒后继节点)。
但这里有个细节:非公平模式下,新来的车可以直接插队抢栏杆——如果刚好栏杆抬着,新到的车可以直接冲过去,不用管后面排了多长的队。公平模式下,新来的车必须先看队列里有没有人在等,有人等就老老实实排到队尾。
翻译回技术语言:
| 收费站 | AQS |
|---|---|
| 栏杆状态(抬起/放下) | volatile int state |
| 排队车道 | CLH 双向 FIFO 队列(Node 双向链表) |
| 车辆通过 | CAS 修改 state 成功 |
| 排队等候 | LockSupport.park()阻塞线程 |
| 通知下一辆车 | LockSupport.unpark()唤醒后继节点 |
| 插队抢栏杆 | 非公平锁的 barging 机制 |
代码实现:从 JDK 源码看 AQS + ReentrantLock
第一层:AQS 的核心骨架
以下代码截取自 OpenJDK AbstractQueuedSynchronizer.java,是 Doug Lea 的原版实现:
publicabstractclassAbstractQueuedSynchronizerextendsAbstractOwnableSynchronizerimplementsjava.io.Serializable{privatevolatileintstate;protectedfinalintgetState(){returnstate;}protectedfinalvoidsetState(intnewState){state=newState;}protectedfinalbooleancompareAndSetState(intexpect,intupdate){returnunsafe.compareAndSwapInt(this,stateOffset,expect,update);}staticfinalclassNode{volatileNodeprev;volatileNodenext;volatileThreadthread;volatileintwaitStatus;}privatetransientvolatileNodehead;privatetransientvolatileNodetail;publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}publicfinalbooleanrelease(intarg){if(tryRelease(arg)){Nodeh=head;if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}protectedbooleantryAcquire(intarg){thrownewUnsupportedOperationException();}protectedbooleantryRelease(intarg){thrownewUnsupportedOperationException();}}逐段解释:
state是 AQS 的灵魂。对 ReentrantLock 来说,state=0表示无锁,state=1表示被锁定,state>1表示重入次数。对 CountDownLatch 来说,state表示剩余计数。对 Semaphore 来说,state表示剩余许可数。同一个字段,不同语义——这就是模板方法模式的威力。
acquire()是获取锁的入口。它的执行逻辑是:先调tryAcquire()尝试抢锁(子类实现),抢不到就调addWaiter()入队,然后调acquireQueued()在队列里自旋等待。整个流程就是:抢 → 排队 → 等 → 被唤醒 → 再抢。
tryAcquire()和tryRelease()是留给子类的"钩子"。AQS 不关心"什么算拿到锁",它只负责排队和阻塞。子类决定抢锁逻辑。
第二层:ReentrantLock 的公平与非公平
以下代码截取自 OpenJDK ReentrantLock.java:
publicclassReentrantLockimplementsLock,java.io.Serializable{privatefinalSyncsync;abstractstaticclassSyncextendsAbstractQueuedSynchronizer{abstractbooleaninitialTryLock();finalbooleantryLock(){Threadcurrent=Thread.currentThread();intc=getState();if(c==0){if(compareAndSetState(0,1)){setExclusiveOwnerThread(current);returntrue;}}elseif(getExclusiveOwnerThread()==current){if(++c<0)thrownewError("Maximum lock count exceeded");setState(c);returntrue;}returnfalse;}protectedfinalbooleantryRelease(intreleases){intc=getState()-releases;if(getExclusiveOwnerThread()!=Thread.currentThread())thrownewIllegalMonitorStateException();booleanfree=(c==0);if(free)setExclusiveOwnerThread(null);setState(c);returnfree;}}staticfinalclassNonfairSyncextendsSync{finalbooleaninitialTryLock(){Threadcurrent=Thread.currentThread();if(compareAndSetState(0,1)){// 直接抢,不看队列setExclusiveOwnerThread(current);returntrue;}elseif(getExclusiveOwnerThread()==current){intc=getState()+1;if(c<0)thrownewError("Maximum lock count exceeded");setState(c);returntrue;}returnfalse;}}staticfinalclassFairSyncextendsSync{finalbooleaninitialTryLock(){Threadcurrent=Thread.currentThread();intc=getState();if(c==0){if(!hasQueuedThreads()&&compareAndSetState(0,1)){// 先看队列有没有人setExclusiveOwnerThread(current);returntrue;}}elseif(getExclusiveOwnerThread()==current){intc2=getState()+1;if(c2<0)thrownewError("Maximum lock count exceeded");setState(c2);returntrue;}returnfalse;}}publicReentrantLock(){sync=newNonfairSync();}publicReentrantLock(booleanfair){sync=fair?newFairSync():newNonfairSync();}publicvoidlock(){sync.lock();}publicvoidunlock(){sync.release(1);}}逐段解释:
公平和非公平的区别,就一行代码。NonfairSync.initialTryLock()直接compareAndSetState(0, 1)抢锁,不管队列里有没有人在等。FairSync.initialTryLock()多了一个!hasQueuedThreads()的判断——队列里有人?对不起,请排队。
重入的实现靠getExclusiveOwnerThread() == current。如果当前线程已经持有锁,state直接 +1,不需要再 CAS。释放时state - 1,减到 0 才真正释放锁。这就是"可重入"的全部秘密。
默认构造器创建的是非公平锁。new ReentrantLock()等价于new ReentrantLock(false)。为什么默认非公平?因为吞吐量差距巨大——非公平锁允许新线程"插队"抢锁,减少了线程上下文切换的开销。
第三层:真实项目中的 AQS 子类
AQS 不只是 ReentrantLock 的底层。你日常用的这些类,全都是 AQS 的子类:
| 类 | AQS state 语义 | 模式 |
|---|---|---|
| ReentrantLock | 0=无锁,1+=重入次数 | 独占 |
| ReentrantReadWriteLock | 高16位=读锁持有数,低16位=写锁重入数 | 共享+独占 |
| CountDownLatch | 剩余计数,countDown 减到 0 放行 | 共享 |
| Semaphore | 剩余许可数 | 共享 |
来看一个真实项目中的例子。以下代码来自 AI-Meeting 项目 的UniversalAiChatHandler.java:
importjava.util.concurrent.CountDownLatch;publicvoidstreamToSink(AiPropertiesDOaiProperties,StringuserMessage,List<AiMessageHistoryRespDTO>historyMessages,FluxSink<String>sink,AIContentAccumulatoraccumulator)throwsException{ChatClientchatClient=createChatClient(aiProperties);List<Message>messages=buildMessages(aiProperties,userMessage,historyMessages);CountDownLatchlatch=newCountDownLatch(1);finalThrowable[]streamError=newThrowable[1];chatClient.prompt().messages(messages).stream().chatResponse().subscribe(response->{/* 处理流式响应 */},error->{streamError[0]=error;latch.countDown();},()->{latch.countDown();});latch.await(60,TimeUnit.SECONDS);}这里CountDownLatch(1)的 state 初始值就是 1。流式响应完成或出错时调countDown(),state 减到 0,AQS 释放所有等待线程。await()内部调的是 AQS 的acquireSharedInterruptibly()。
你可能没写过 AQS 的子类,但你每天都在用 AQS。
边界与陷阱:AQS 的三个大坑
看起来很优雅对吧?但 AQS 的坑,比你想的深。
陷阱一:tryAcquire()实现不当导致死锁。AQS 的acquire()方法在tryAcquire()返回 false 后才会入队等待。如果你的tryAcquire()实现有 bug——比如永远返回 false,或者条件判断写反了——线程会永远阻塞在队列里。
后果:死锁,且线程堆栈显示在LockSupport.park()上,你根本看不出是哪里出了问题。
解法:实现tryAcquire()时,确保"能拿到锁"的路径一定存在。对于独占锁,state == 0时必须允许获取。
陷阱二:非公平锁的"饥饿"问题。非公平锁允许新线程插队,在高并发场景下,队列中的线程可能永远抢不到锁——因为总有新线程插队成功。
后果:某些线程长时间得不到执行,响应时间 P99 飙升。
解法:如果业务对响应时间敏感,用公平锁。虽然吞吐量低 10%-30%,但保证了先来先服务。
陷阱三:state溢出。ReentrantLock 的state是 int 类型,最大重入次数是Integer.MAX_VALUE(2147483647)。虽然正常代码不可能重入 21 亿次,但如果你在循环里lock()忘了unlock()……
后果:++c < 0触发Error("Maximum lock count exceeded")。
解法:永远在 try-finally 里释放锁。这是铁律,没有例外。
高级考量:从单机 AQS 到分布式锁
AQS 解决的是单机内的线程同步问题。但当你从单机走向分布式,AQS 的等待队列就不够用了——它只能阻塞本 JVM 内的线程,跨进程的锁竞争它管不了。
Redisson 的分布式锁就是 AQS 思想的分布式延伸。以下代码截取自 Redisson RedissonLock.java:
publicclassRedissonLockextendsRedissonBaseLock{<T>RFuture<T>tryLockInnerAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommand<T>command){returnevalWriteSyncedNoRetryAsync(getRawName(),LongCodec.INSTANCE,command,"if ((redis.call('exists', KEYS[1]) == 0) "+"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()),unit.toMillis(leaseTime),getLockName(threadId));}}注意看这段 Lua 脚本的逻辑:
exists检查锁是否存在 → 对应 AQS 的state == 0hexists检查是否当前线程持有 → 对应 AQS 的getExclusiveOwnerThread() == currenthincrby重入计数 +1 → 对应 AQS 的setState(c + 1)pexpire设置过期时间 →这是 AQS 没有的,分布式锁的看门狗机制
Redisson 把 AQS 的 state 从 JVM 内存搬到了 Redis Hash,把 CLH 队列从 JVM 线程队列换成了 Redis Pub/Sub + Semaphore。思想一样,存储介质变了。
但分布式锁引入了 AQS 不存在的问题:锁过期(看门狗续期)、网络分区(脑裂)、Redis 主从切换(锁丢失)。这些是单机 AQS 永远不需要考虑的。
对比表格
| 特性 | synchronized | ReentrantLock(非公平) | ReentrantLock(公平) |
|---|---|---|---|
| 实现层面 | JVM 内置(monitorenter 字节码) | AQS + CAS + CLH 队列 | AQS + CAS + CLH 队列 |
| 可中断 | 不可 | lockInterruptibly()可 | lockInterruptibly()可 |
| 超时获取 | 不可 | tryLock(timeout)可 | tryLock(timeout)可 |
| 公平性 | 非公平 | 非公平(默认) | 公平(!hasQueuedThreads()) |
| 条件变量 | 一个(wait/notify) | 多个(Condition) | 多个(Condition) |
| 可重入 | 是 | 是(state 递增) | 是(state 递增) |
| 吞吐量 | 中 | 高 | 低(比非公平低 10%-30%) |
| 适用场景 | 简单同步、代码块级 | 需要超时/中断/多条件 | 严格先来先服务 |
一句话:简单同步用 synchronized,需要高级特性用 ReentrantLock,对公平性有要求用公平锁。
面试追问
面试追问 1:AQS 的 CLH 队列入队操作为什么不是原子的?addWaiter()里先 CAS 设置 tail,再设置 prev.next,中间如果挂了怎么办?
→ 回答方向:入队分两步——CAS 设置 tail 指向新节点,然后pred.next = node设置前驱的 next。第二步不是原子的,但即使失败了,其他线程可以从 tail 向前遍历 prev 链找到所有节点。AQS 的unparkSuccessor()在找不到 next 时,就是从 tail 反向遍历的。
面试追问 2:ReentrantLock 的tryLock()不遵守公平性设置,为什么?这是 bug 吗?
→ 回答方向:不是 bug,是有意设计。JDK 文档明确写了:tryLock()会"插队"——只要锁空闲就立即获取,不管队列里有没有线程在等。理由是tryLock()通常用于避免死锁的试探性获取,如果强制排队反而可能导致活锁。如果需要遵守公平性,用tryLock(0, TimeUnit.SECONDS)。
面试追问 3:AQS 的 state 为什么是 int 而不是 long?如果需要 64 位的 state 怎么办?
→ 回答方向:Doug Lea 在设计时选择了 int,因为 CAS 操作在 32 位和 64 位 JVM 上都对 int 有原生支持,且对于锁计数、信号量许可数等场景,int 的范围足够。如果需要 64 位 state,可以参考java.util.concurrent.locks.StampedLock的实现——它用额外的 long 字段配合 Unsafe 的 CAS 操作。
面试追问 4:CountDownLatch 和 CyclicBarrier 都能实现"等待多个线程完成",它们在 AQS 层面的本质区别是什么?
→ 回答方向:CountDownLatch 基于 AQS 的共享模式,state是计数器,只能减不能增,一次性使用。CyclicBarrier 不基于 AQS,它内部用ReentrantLock + Condition实现循环等待,可以重复使用。本质区别:CountDownLatch 是"一个线程等 N 个事件",CyclicBarrier 是"N 个线程互相等"。
总结
AQS 不是锁,是造锁的模具——state 是锁芯,CLH 队列是锁体,tryAcquire/tryRelease 是钥匙孔的形状。
读完这篇你应该能:画出 AQS 的 acquire/release 完整流程图、解释 ReentrantLock 公平锁和非公平锁的那一行代码差异、说出 CountDownLatch 和 Semaphore 在 AQS 层面的 state 语义区别、理解 Redisson 分布式锁是对 AQS 思想的分布式延伸。
下次面试官问"AQS 是什么",别只说"一个队列"——告诉他:AQS 是 Doug Lea 用一个 volatile int 和一个双向链表,造出了整个 JUC 锁生态的基石。
