
期待你加入Java人的提桶跑路群:共同富裕的Java人
Condition,Condition为AQS“家族”提供了等待与唤醒的能力,使AQS"家族"具备了像synchronized一样暂停与唤醒线程的能力。我们先来看两道关于Condition的面试题目:
- 什么是
Condition队列?
Condition和Object的等待与唤醒有什么区别?
Condition的面纱吧。
Condition是什么
Condition是Java中的接口,提供了与Object#wait和Object#notify相同的功能。Doug Lea在Condition接口的描述中提到了这点:
Conditions (also known as condition queues or condition variables provide a means for one thread to suspend execution (to "wait" until notified by another thread that some state condition may now be true。
Condition接口中提供了哪些方法:
public interface Condition {
void await( throws InterruptedException;
void awaitUninterruptibly(;
long awaitNanos(long nanosTimeout throws InterruptedException;
boolean await(long time, TimeUnit unit throws InterruptedException;
boolean awaitUntil(Date deadline throws InterruptedException;
void signal(;
void signalAll(;
}
Condition只提供了两个功能:等待(await)和唤醒(signal),与Object提供的等待与唤醒时相似的:public final void wait( throws InterruptedException; public final void wait(long timeoutMillis, int nanos throws InterruptedException; public final native void wait(long timeoutMillis throws InterruptedException; @HotSpotIntrinsicCandidate public final native void notify(; @HotSpotIntrinsicCandidate public final native void notifyAll(;唤醒功能上,
Condition与Object的差异并不大:
-
Condition#signalAll\(=\
Object#notifyAll
Condition#signal\(\approx\Object#notify
多个线程处于等待状态时,Object#notify(是“随机”唤醒线程,而Condition#signal则由具体实现决定如何唤醒线程,如:ConditionObject唤醒的是最早进入等待的线程,但两个方法均只唤醒一个线程。
Condition与Object的共同点是:都会释放持有的资源,Condition释放锁,Object释放Monitor,即进入等待状态后允许其他线程获取锁/监视器。主要的差异体现在Condition支持了更加丰富的场景,通过一张表格来对比下:
Condition方法 |
Object方法 |
解释 |
|---|---|---|
Condition#await( |
Object#wait( |
暂停线程,抛出线程中断异常 |
Condition#awaitUninterruptibly( |
/ | 暂停线程,不抛出线程中断异常 |
Condition#await(time, unit |
Object#wait(timeoutMillis, nanos |
暂停线程,直到被唤醒或等待指定时间后,超时后自动唤醒返回false,否则返回true |
Condition#awaitUntil(deadline |
/ | 暂停线程,直到被唤醒或到达指定时间点,超时后自动唤醒返回false,否则返回true |
Condition#awaitNanos(nanosTimeout |
/ | 暂停线程,直到被唤醒或等待指定时间后,返回值表示被唤醒时的剩余时间(nanosTimeout-耗时),结果为负数表示超时 |
Condition还支持创建多个等待队列,即同一把锁拥有多个等待队列,线程在不同队列中等待,而Object只有一个等待队列。《Java并发编程的艺术》中也有一张类似的表格,放在这里供大家参考:
Tips:
- 实际上signal翻译为唤醒并不恰当~~
- 涉及到
Condition的实现部分,下文通过AQS中的ConditionObject详细解释。
Condition怎么用
Condition与Object提供的等待与唤醒功能相同,那么它们的用法是不是也很相似呢?
Object#wait和Object#notifyAll必须处于synchronized修饰的代码中一样(获取Monitor),调用Condition#await和Condition#signalAll的前提是要先获取锁。但不同的是,使用Condition前,需要先通过锁去创建Condition。
ReentrantLock中提供的Condition为例,首先是创建Condition对象:
ReentrantLock lock = new ReentrantLock(;
Condition condition = lock.newCondition(;
然后是获取锁并调用await方法:
new Thread(( -> {
lock.lock(;
try {
condition.await(;
} catch (InterruptedException e {
throw new RuntimeException(e;
}
lock.unlock(;
}
最后,通过调用singalAll唤醒全部阻塞中的线程:
new Thread(( -> {
lock.lock(;
condition.signalAll(;
lock.unlock(;
}
ConditionObject的源码分析
作为接口Condition非常惨,因为在Java中只有AQS中的内部类ConditionObject实现了Condition接口:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
// 省略
}
}
ConditionObject只有两个Node类型的字段,分别是链式结构中的头尾节点,ConditionObject就是通过它们实现的等待队列。那么ConditionObject的等待队列起到了怎样的作用呢?是类似于AQS中的排队机制吗?带着这两个问题,我们正是开始源码的分析。await方法的实现
Condition接口中定义了4个线程等待的方法:
void awaitUninterruptibly(;long awaitNanos(long nanosTimeout throws InterruptedException;boolean await(long time, TimeUnit unit throws InterruptedException;boolean awaitUntil(Date deadline throws InterruptedException;
void await( throws InterruptedException
public final void await( throws InterruptedException {
// 线程中断,抛出异常
if (Thread.interrupted( {
throw new InterruptedException(;
}
// 注释1:加入到Condition的等待队列中
Node node = addConditionWaiter(;
// 注释2:释放持有锁(调用AQS的release)
int savedState = fullyRelease(node;
int interruptMode = 0;
// 注释3:判断是否在AQS的等待队列中
while (!isOnSyncQueue(node {
LockSupport.park(this;
// 中断时退出方法
if ((interruptMode = checkInterruptWhileWaiting(node != 0 {
break;
}
}
// 加入到AQS的等待队列中,调用AQS的acquireQueued方法
if (acquireQueued(node, savedState && interruptMode != THROW_IE {
interruptMode = REINTERRUPT;
}
// 断开与Condition队列的联系
if (node.nextWaiter != null {
unlinkCancelledWaiters(;
}
if (interruptMode != 0 {
reportInterruptAfterWait(interruptMode;
}
}
注释1的部分,调用addConditionWaiter方法添加到Condition队列中:
private Node addConditionWaiter( {
// 判断当前线程是否为持有锁的线程
if (!isHeldExclusively( {
throw new IllegalMonitorStateException(;
}
// 获取Condition队列的尾节点
Node t = lastWaiter;
// 断开不再位于Condition队列的节点
if (t != null && t.waitStatus != Node.CONDITION {
unlinkCancelledWaiters(;
t = lastWaiter;
}
// 创建Node.CONDITION模式的Node节点
Node node = new Node(Node.CONDITION;
if (t == null {
// 队列为空的场景,将node设置为头节点
firstWaiter = node;
} else {
// 队列不为空的场景,将node添加到尾节点的后继节点上
t.nextWaiter = node;
}
// 更新尾节点
lastWaiter = node;
return node;
}
可以看到,Condition的队列是一个朴实无华的双向链表,每次调用addConditionWaiter方法,都会加入到Condition队列的尾部。
释放线程持有的锁,同时移出AQS的队列,内部调用了AQS的release方法:
=final int fullyRelease(Node node {
try {
int savedState = getState(;
if (release(savedState {
return savedState;
}
throw new IllegalMonitorStateException(;
} catch (Throwable t {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
因为已经分析过AQS的release方法和ReentrantLock实现的tryRelease方法,这里我们就不过多赘述了。
isOnSyncQueue判断当前线程是否在AQS的等待队列中,我们来看此时存在的情况:
- 如果
- 如果
isOnSyncQueue返回true,即线程在AQS的队列中,不进入自旋,执行后续逻辑。
isOnSyncQueue返回false,即线程不在AQS的队列中,进入自旋,调用LockSupport#park暂停线程;
Condition#await的实现原理了就很清晰了:
- 当调用
Condition#await时,将线程添加到Condition的队列中(注释1),同时从AQS队列中移出(注释2); - 接着判断线程位于的队列:
- 位于
Condition队列中,该线程需要被暂停,调用LockSupport#park; - 位于AQS队列中,该线程正在等待获取锁。
- 位于
Condition与AQS分别维护了一个等待队列,而且是互斥的,即同一个节点只会出现在一个队列中;
Condition#signalAll的原理了:
- 将线程从
- 调用
LockSupport.unpark唤醒线程。
Condition队列中移出,并添加到AQS的队列中;
Tips:如果忘记了AQS中相关方法是如何实现的,可以回顾下《AQS的今生,构建出JUC的基础》。
signal和signalAll方法的实现
signal和signalAll的源码:
// 唤醒一个处于等待中的线程
public final void signal( {
if (!isHeldExclusively( {
throw new IllegalMonitorStateException(;
}
// 获取Condition队列中的第一个节点
Node first = firstWaiter;
if (first != null {
// 唤醒第一个节点
doSignal(first;
}
}
// 唤醒全部处于等待中的线程
public final void signalAll( {
if (!isHeldExclusively({
throw new IllegalMonitorStateException(;
}
Node first = firstWaiter;
if (first != null {
// 唤醒所有节点
doSignalAll(first;
}
}
两个方法唯一的差别在于头节点不为空的场景下,是调用doSignal唤醒一个线程还是调用doSignalAll唤醒所有线程:
private void doSignal(Node first {
do {
// 更新头节点
if ( (firstWaiter = first.nextWaiter == null {
// 无后继节点的场景
lastWaiter = null;
}
// 断开节点的连接
first.nextWaiter = null;
// 唤醒头节点
} while (!transferForSignal(first && (first = firstWaiter != null;
}
private void doSignalAll(Node first {
// 将Condition的队列置为空
lastWaiter = firstWaiter = null;
do {
// 断开链接
Node next = first.nextWaiter;
first.nextWaiter = null;
// 唤醒当前头节点
transferForSignal(first;
// 更新头节点
first = next;
} while (first != null;
}
可以看到,无论是doSignal还是doSignalAll都只是将节点移出Condition队列,而真正起到唤醒作用的是transferForSignal方法,从方法名可以看到该方法是通过“转移”进行唤醒的,我们来看源码:
final boolean transferForSignal(Node node {
// 通过CAS替换node的状态
// 如果替换失败,说明node不处于Node.CONDITION状态,不需要唤醒
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0 {
return false;
}
// 将节点添加到AQS的队列的队尾
// 并返回老队尾节点,即node的前驱节点
Node p = enq(node;
int ws = p.waitStatus;
// 对前驱节点状态的判断
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL {
LockSupport.unpark(node.thread;
}
return true;
}
transferForSignal方法中,调用enq方法将node重新添加到AQS的队列中,并返回node的前驱节点,随后对前驱节点的状态进行判断:
- 当\(ws > 0\时,前驱节点处于
- 当\(ws \leq 0\时,通过CAS修改前驱节点的状态为
Node.SIGNAL,设置失败时,直接唤醒node。
Node.CANCELLED状态,前驱节点退出锁的争抢,node可以直接被唤醒;
waitStatus的5种状态,其中Node.SIGNAL状态表示需要唤醒后继节点。另外,在分析shouldParkAfterFailedAcquire方法的源码时,我们知道在进入AQS的等待队列时,需要将前驱节点的状态更新为Node.SIGNAL。
enq的实现:
private Node enq(Node node {
for (;; {
// 获取尾节点
Node oldTail = tail;
if (oldTail != null {
// 更新当前节点的前驱节点
node.setPrevRelaxed(oldTail;
// 更新尾节点
if (compareAndSetTail(oldTail, node {
oldTail.next = node;
// 返回当前节点的前驱节点(即老尾节点)
return oldTail;
}
} else {
initializeSyncQueue(;
}
}
}
enq的实现就非常简单了,通过CAS更新AQS的队列尾节点,相当于添加到AQS的队列中,并返回尾节点的前驱节点。好了,唤醒方法的源码到这里就结束了,是不是和我们当初的猜想一模一样呢?图解ConditionObject原理
Condition实现了AQS版
Object#wait和Object#notify,用法上也与之相似,需要先获取锁,即需要在lock与unlock之间调用。原理上,简单来说就是线程在AQS的队列和Condition的队列之间的转移。线程t持有锁
ReentrantLock,线程t1,t2和t3正在AQS的队列中等待,我们可以得到这样的结构:
线程t执行Condition#await
Condition#await方法,线程t进入
Condition的等待队列中,线程t1获取ReentrantLock,并从AQS的队列中移出,结构如下:线程t1执行Condition#await
Condition#await方法,同样线程t1进入
Condition队列中,线程t2获取到ReentrantLock,结构如下:线程t2执行Condition#signal
Condition#signal,唤醒
Condition队列中的第一个线程,此时结构如下:Condition队列与AQS队列中转移的:
结语
Condition的内容到这里就结束了,无论是理解,使用还是剖析原理,
Condition的难度并不高,只不过大家可能平时用得比较少,因此多少有些陌生。好了,今天就到这里了,Bye~~