从字面意思上来看,Semaphore 就是信号量的意思,可以用来做并发控制。其实我们可以把它理解成一个资源池,类似java的线程池。Semaphore是基于AQS的共享模式来实现的,所有线程共用同一个资源池。
在构造方法中可以初始化资源的数量,例如new Semaphore(N),这里传入的N,其实就是将AQS中的state初始化为N,每个线程来了之后,通过acquire()方法获取到资源后,才能继续往下执行,获取资源其实就是对state进行-1 ,如果state为0了,说明没有资源可用了,需要等待别的线程释放资源。最后调用release()方法释放资源,释放资源对应的操作就是对state进行+1。
老规矩,先来看看他的构造方法。
// 这里传入的permits其实就是给state赋初始值
public Semaphore(int permits) {
// 默认使用非公平公平策略,当然还有个重载的构造方法可以指定公平策略
sync = new NonfairSync(permits);
}
Semaphore关键的方法就两个: acquire()和release(),先来看看acquire()方法
// 这里传入的arg为1,代表占用一个资源。
// 如果想获取超过一个以上的资源,可以调用带参数的aquire(int permits)方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquiredShared这个方法尝试获取资源,然后返回state-1(arg)后的值,代表可用的资源。
// 如果返回值<0,说明没有获取到资源,那么执行下面的doAcquireSharedInterruptibly
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
// 看到这里我们知道,tryAcquiredShared最终采用CAS自璇的方式尝试获取资源,
// 也就是执行state=state-1,如果CAS成功且state-1=>0,代表获取到了资源。
// 如果state-1<0代表没有资源了,尝试获取资源失败。
for (;;) {
int available = getState();
int remaining = available - acquires;
// 要么没资源,要么成功获取到资源,这个方法才能够返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
再回到acquireSharedInterruptibly这里,如果tryAcquireShared返回小于0,说明没获取到资源,这个时候执行doAcquireSharedInterruptibly,进入AQS等待(同步)队列中然后线程挂起,等待被唤醒。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 这里会通过自璇尝试将当前线程加入到AQS队列中,直到成功为止。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//注意:这里是自璇操作
for (;;) {
// 获取当前node的前一个节点
final Node p = node.predecessor();
// 当前节点的前一个节点是head,说明当前节点在等待队列中排在第一个
if (p == head) {
// 这里再次尝试获取资源,如果有线程释放了资源,就有机会获取到资源
// 只有获取资源成功,r >= 0 才会成立。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当时线程设置为head,然后唤醒队列中后面的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 到这里未获取到资源的线程就挂起了,
// 等待别的线程release释放资源,等到线程被唤醒后,继续往下执行
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
到这一步为获取到资源的线程就挂起了,需要其他线程执行release释放资源来唤醒它们,下面再来看一下release
public void release() {
// 对应的释放资源这里的arg也是1,代表释放一个资源
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// tryReleaseShared这里通过CAS自璇的方式尝试对state进行+1,
// CAS成功才会正常返回,代表释放了资源,这个方法返回true
if (tryReleaseShared(arg)) {
// 接着执行doReleaseShared,唤醒挂起的线程
doReleaseShared();
return true;
}
return false;
}
// 对state进行+1操作,代表释放资源
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 这个方法会唤醒AQS等待(同步)队列中head的下一个节点
private void doReleaseShared() {
/*
* 这里需要自璇,因为在唤醒的过程中可能又有新的线程加入进来,
* 而且CAS可能会失败
*/
for (;;) {
Node h = head;
// 判断队列是否为空
if (h != null && h != tail) {
// 获取头节点的状态
int ws = h.waitStatus;
// 我们前面说过线程在入队的时候会将前节点的状态设置为SIGNAL(-1)
// 所以正常来说会进入这个分支
if (ws == Node.SIGNAL) {
// 将head的waitStatus设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 如果head的waitStatus成功修改为0,唤醒head的下一个节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果执行到这里,head有变化说明线程已经被唤醒
// 那么进入下一轮循环,否则说明head没变化,退出循环
if (h == head) // loop if head changed
break;
}
}
//这个方法就是在唤醒head的下一个节点
private void unparkSuccessor(Node node) {
//获得head的状态
int ws = node.waitStatus;
//如果head的waitStatus小于0,那么这个时候把它置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//取出head的下一个节点
Node s = node.next;
//本来是要唤醒head的下一个节点,但是下一个节点可能已经取消了等待(waitStatus=1),
//所以需要找到阻塞队列(不包括head)中waitStatus<0的第一个节点
if (s == null || s.waitStatus > 0) {
s = null;
//从tail开始往前遍历,直到找出waitStatus<0排在等待队列最前面的那个节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//就是在这里唤醒的线程
LockSupport.unpark(s.thread);
}
前面我们说获取不到资源的线程在AQS同步队列中等待,到这里head的下一个节点被唤醒,唤醒后继续往下执行,我们再回到线程挂起的地方:parkAndCheckInterrupt(),线程就挂起在这里。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//注意:这里是自璇操作
for (;;) {
// 获取当前node的前一个节点
final Node p = node.predecessor();
// 当前节点的前一个节点是head,说明当前节点在等待队列中排在第一个
if (p == head) {
// 这里再次尝试获取资源,如果有线程释放了资源,就有机会获取到资源
// 只有获取资源成功,r >= 0 才会成立。
int r = tryAcquireShared(arg);
// 获取到资源后,这个方法就返回了
if (r >= 0) {
// 将当时线程设置为head,然后唤醒队列中后面的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 未获取到资源的线程就是在这里被挂起,
// 现在被唤醒后,继续往下执行,进入下一轮循环,再次获取资源
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
最后来做个总结:
因篇幅问题不能全部显示,请点此查看更多更全内容