本文重点介绍Condition的实现原理,Condition是基于ReentrantLok来实现的,要使用Condition必须先持有锁,所以Condition和ReentrantLock是配对使用的。Condition主要有两个方法await等待和signal唤醒,适用于生产者-消费者使用场景。
下面是一个Condition的使用示例:
public class AQSReentrantLock {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] objects;
private int currSize,putIndex, takeIndex;
public AQSReentrantLock(int size) {
this.objects = new Object[size];
}
public void put(Object item) throws InterruptedException {
lock.lock();
try {
while (currSize == objects.length) {
System.out.println("队列已经满啦......");
notFull.await();
}
objects[putIndex] = item;
if (++ putIndex == objects.length)
putIndex = 0;
currSize++;
System.out.println(Thread.currentThread().getName()+"生产了消息:"+ putIndex);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (currSize <= 0) {
System.out.println("队列是空的,没有可消费的数据......");
notEmpty.await();
}
Object item = objects[takeIndex];
objects[takeIndex] = null;
if (++ takeIndex == objects.length)
takeIndex = 0;
currSize--;
System.out.println(Thread.currentThread().getName()+"消费了消息:" + takeIndex);
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final AQSReentrantLock aqs = new AQSReentrantLock(100);
new Thread(new Runnable(){
@Override
public void run() {
try {
while (true) {
aqs.put(1);
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程1").start();
new Thread(new Runnable(){
@Override
public void run() {
try {
while (true){
aqs.take();
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程2").start();
}
}
Condition是通过ReentrantLock new出来的。老规矩,先来看下Condition的构造方法
public Condition newCondition() {
//咦,这里居然是调用的sync.newCondition
return sync.newCondition();
}
//返回的是一个 ConditionObject对象,它是在AQS中的,Condition的实现居然就是AQS中的一个内部类
final ConditionObject newCondition() {
return new ConditionObject();
}
看下Condition这个类中有哪些重要属性
//这里就只有两个关键属性
public class ConditionObject implements Condition, java.io.Serializable {
/** 看名字就知道,condition队列的第一个节点 */
private transient Node firstWaiter;
/** condition队列的最后一个节点 */
private transient Node lastWaiter;
......
}
中介绍过AQS有一个等待队列,所有获 取不到锁的线程会阻塞,等待被唤醒。
AQS本身是一个双向链表,而使用Condition的时候,就会用到AQS中的condition队列,AQS node中有一个属性nextWaiter就是专门用于condition队列的。
static final class Node {
/** 标记节点处于共享模式 */
static final Node SHARED = new Node();
/** 标记节点处于独占模式 */
static final Node EXCLUSIVE = null;
/** waitStatus 为1时,表明线程取消了获取锁 */
static final int CANCELLED = 1;
/** waitStatus 为-1时,表明当前节点的下一个节点对应的线程需要被唤醒 */
static final int SIGNAL = -1;
/** waitStatus 为-2时在condition的时候才使用,表名节点在等待某种条件*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
//取值为上面的1,-1,-2,-3和0
volatile int waitStatus;
//前一个节点
volatile Node prev;
//下一个节点
volatile Node next;
//当前节点代表的线程
volatile Thread thread;
//condition条件队列中的下一个节点
Node nextWaiter;
......
}
这样的话,ReentrantLock+Condition组合的结构就显而易见了,由一个等待队列和N个condition队列组成。等待队列是双向链表,condition队列是单向链表,其结构图大致如下:
那么我们先来看一下Condition的await()方法
// 这个方法首先会将当前线程加入到condition队列中
// 然后需要完全释放掉独占锁,完全释放是因为锁是可重入的
// 之后当前线程就阻塞在这里,直到被signal唤醒或者interrupted中断
public final void await() throws InterruptedException {
// 判断中断状态,响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到condition队列中
Node node = addConditionWaiter();
// 注意:当前线程池有锁。才能执行await()方法
// 这里要完全释放掉当前线程持有的锁,其他线程才能持有锁,返回值为之前的state
// 由于ReentrantLock锁可重入,所以这里返回的savedState可能大于1
int savedState = fullyRelease(node);
// 定义中断状态
int interruptMode = 0;
// 这里的isOnSyncQueue()返回true就可以退出while循环。
// 代表当前节点已经从condition队列转移到等待队列的尾部
while (!isOnSyncQueue(node)) {
// 返回false代表当前节点还在condition队列中排队
// 这个时候通过park将当前线程挂起
// 等待signal唤醒后,转移到等待队列的最后,就可以跳出循环继续往下执行
LockSupport.park(this);
// checkInterruptWhileWaiting()不为0,代表发生了线程中断,这个时候break跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 我们先不要管这个interruptMode
// 线程进入等待队列后,等待获取独占锁。由于是可重入锁。acquireQueue传入了saveState
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 从condition队列中将所有已取消等待的节点清楚出去
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await方法中有几个比较关键的步骤:addConditionWaiter(),fullyRelease(),isOnSyncQueue(),aquireQueued()
那就一个个来看一下。
// 将当前线程封装成node加入到condition队列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果condition队列中的最后一个节点取消了等待,把它清除出去
// condition队列中的节点初始化的时候,waitStatus=Node.CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 遍历condition队列,将取消等待的节点清除出去
unlinkCancelledWaiters();
t = lastWaiter;
}
// 初始化condition队列中的node,并且给waitStatus赋值为Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果condition队列为空。直接放到condition队列的最前面
if (t == null)
firstWaiter = node;
// 否则放到最后面(lastWaiter)
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 遍历链表,清除已取消节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// 从头节点开始遍历
while (t != null) {
Node next = t.nextWaiter;
// 如果取消了排队,从condition队列中清除出去
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
加入到condition条件队列后,执行fullRelease完全释放锁
// 这个方法释放当前线程持有的锁。返回之前的state
// 如果释放锁失败,将节点waitStatus设置为取消状态
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取之前的state,返回的是重入的次数
int savedState = getState();
// 重入了n次,那么在这里执行release(n)将state置为0,代表完全释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 释放锁失败会抛出异常
throw new IllegalMonitorStateException();
}
} finally {
// 释放锁失败,取消节点
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
完全释放锁之后,不断地自璇去判断当前线程是否已经加入到AQS等待队列中
// 循环判断,如果不在等待队列中,那么挂起线程。加入到等待队列后,退出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// Signal唤醒操作通常由其他线程来完成,会将condition队列中的节点转移到等待队列中。
// 所以是在循环中执行isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
// 如果waitStatus=Node.CONDITION,说明还在condition队列中
// 如果node的前节点为null,那肯定说明它没在等待队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果node的下一个节点next不为空,说明它肯定已经进入到等待队列,直接返回。
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* 这里会遍历等待队列,从后往前查找,如果找到相等的node,说明确实已经进入到等待队列。
* 为什么不通过node.prev==null来判断是否进入到等待队列中?
* 在node前节点为空的情况下,可能还是没有进入到等待队列,因为CAS入队可能失败(有竞争)
*/
return findNodeFromTail(node);
}
// 通过从tail往前查找的方式搜索node,如果node存在于等待队列中,返回true
private boolean findNodeFromTail(Node node) {
Node t = tail;
// 循环查找
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
按照await()的逻辑,node如果没有进入到等待队列,那么将会执行LockSupport.park(this)将当前线程挂起,等待signal唤醒。下面我们就看一下signal的整个流程
// 唤醒Codition队列的头结点,
// signal唤醒其实就是将condition的firstWaiter转移到等待队列的最后
public final void signal() {
// 如果没有持有锁,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 这里传入的就是condition队列的头结点,对其进行唤醒
doSignal(first);
}
// 将node从condition队列中移除,转移到等待队列中
// 这里传入的first其实就是condition队列的头结点(fristWaiter)
private void doSignal(Node first) {
do {
// 将condition队列的firstWaiter指向first的nextWaiter
// 也就是将原本的firstWaiter从condition队列中移除
if ( (firstWaiter = first.nextWaiter) == null)
// 如果first的下一个节点为null,说明后面没有节点了,那么将lastWaiter置为null
lastWaiter = null;
first.nextWaiter = null;
// 这里的do while 循环从condition队列的头节点往后遍历,
// 将第一个未取消等待的节点转移到等待队列
// 因为condition队列中的节点可能已经取消了等待,
// 所以需要往后遍历,根据waitStatus判断节点是否已经取消了等待。
// 如果转移firstWaiter不成功,
// 那么下一次循环会转移firstWaiter的下一个节点,直到转移成功退出循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 这个方法将node从condition队列转移到等待队列
// 返回是否转移成功
final boolean transferForSignal(Node node) {
/*这里使用CAS将waitStatus从Node.CONDITION修改为0
* 如果CAS失败,说明waitStatus已经不是Node.CONDITION,代表这个节点已经取消了等待
* 返回false
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 这里使用自旋的方式将node添加到等待队列的最后
* 返回值P是node转移到等待队列后的前节点
*/
Node p = enq(node);
// 前节点的状态
int ws = p.waitStatus;
// 如果前节点的waiterStatus>0,说明前节点可能取消了等待,这个时候直接唤醒node对应的线程
// 如果前节点的waiterStatus<=0,node入队后,
//需要调用compareAndSetWaitStatus把它前节点的waiterStatus修改为NODE.SIGNAL
// 如果CAS失败,也直接唤醒node对应的线程
// 一般情况下这里不会唤醒线程,前节点不会取消等待,修改前节点状态也是能够成功的
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// node前节点取消等待,或者CAS失败,唤醒线程
LockSupport.unpark(node.thread);
// 转移成功,返回true
return true;
}
// 进行入队操作,有必要的话对head进行初始化
private Node enq(final Node node) {
//使用CAS自旋插入队列的最后面,即设置当前线程为tail
for (;;) {
Node t = tail;
// 如果队列是空的
if (t == null) { // Must initialize
//对head进行初始化,这个时候head节点的waitStatus还是0
if (compareAndSetHead(new Node()))
//head初始化好了,将tail指向head
tail = head;
} else {// 将当前node添加到阻塞队列的最后
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
//只有入队成功,这个方法才会return
return t;
}
}
}
}
之前我们说当前线程执行condition.await()之后,就阻塞在那里了。等到另一个线程执行condition.signal()之后,将当前线程由condition队列成功转移到等待队列中,之后就是在等待队列中准备获取锁。只要等待队列中当前线程的前一个节点释放了锁后,就会唤醒当前线程,当前线程获取到锁后,就可以继续往下执行。
讲到这里,其实Condition从await到signal的整个流程已经非常清晰了。剩下的是一些个中断处理(interruptMode)。
最后总结一下condition的大致流程:
因篇幅问题不能全部显示,请点此查看更多更全内容