上篇文章讲了Java中显式锁Lock和ReadWriteLock的概念及简单使用示例,本篇文章重点来看一下Java中显式锁的底层实现。在看这部分代码时,我任务比较好理解的方式是带着问题去看源码。对于显式锁,我们可能会有以下问题:
- 问题1:显式锁底层是基于CAS实现,细节是怎样的?
- 问题2:显式锁跟synchronized类似,无法获取锁时,会阻塞当前线程,如何实现阻塞的?
- 问题3:显式锁提供还可以提供tryLock的操作,获取不到锁立即返回,如何实现?
- 问题4:显式锁在等待时,可以响应中断,如何实现?
- 问题5:显式锁可以提供公平锁和非公平锁,如何实现?
- 问6:读写锁底层又是怎么实现的?
带着这些问题,我们来看Java显式锁的实现源码。
1. LockSupport
显式锁在无法获取到锁时,需要阻塞当前线程,synchronized中是依赖于底层操作系统函数实现的,在显式锁中必然也要有类似的机制,可以阻塞申请不到锁的线程。在显式锁中是依赖包java.util.concurrent.locks下的LockSupport实现的,它的基本方法有:
public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline)
public static void unpark(Thread thread)
park使得当前线程放弃CPU,进入等待状态(WAITING),操作系统不再对它进行调度,什么时候再调度呢?有其他线程对它调用了unpark,unpark需要指定一个线程,unpark会使之恢复可运行状态。看个例子:
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread (){
public void run(){
LockSupport.park();
System.out.println("exit");
}
};
t.start();
Thread.sleep(1000);
LockSupport.unpark(t);
}
线程t启动后调用park,会放弃CPU,主线程睡眠1秒钟后,调用unpark,线程t恢复运行,输出exit。
park不同于Thread.yield(),yield只是告诉操作系统可以先让其他线程运行,但自己依然是可运行状态,而park会放弃调度资格,使线程进入WAITING状态。
需要说明的是,park是响应中断的,当有中断发生时,park会返回,线程的中断状态会被设置。
park有两个变体:
- parkNanos:可以指定等待的最长时间,参数是相对于当前时间的纳秒数。
- parkUntil:可以指定最长等到什么时候,参数是绝对时间,是相对于纪元时的毫秒数。
当等待超时的时候,它们也会返回。
park方法还有一些变体,可以指定一个对象,表示是因为该对象进行等待的,以便于调试,通常传递的值是this,如下:
public static void park(Object blocker)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline)
LockSupport有一个方法,可以返回一个线程的blocker对象:
public static Object getBlocker(Thread t)
这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法,Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为LockSupport中的这些方法就是基本操作。
2. AbstractQueuedSynchronizer(AQS)
利用CAS和LockSupport提供的基本方法,肯定就可以实现ReentrantLock了。但Java中还有很多其他并发工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它们的实现有很多类似的地方,为了复用代码,Java提供了一个抽象类AbstractQueuedSynchronizer,我们简称为AQS,它简化了并发工具的实现。像ReentrantReadWriteLock、Semaphore、CountDownLatch都是基于AQS实现的。
AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列等。基于AQS来构建同步器可以带来很多好处,它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。 同时AQS在设计上充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
AQS通过内置的FIFO同步队列来完成线程获取锁的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。AQS类声明如下:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
//methods
}
上述head和tail,就是内部的双向同步队列,state用来表示同步状态。另外,AQS继承了AbstractOwnableSynchronizer类,AbstractOwnableSynchronizer类中有一个Thread类型的成员变量,用于存储当前持有锁的线程。AbstractOwnableSynchronizer类提供getExclusiveOwnerThread方法,可以获取当前持有锁的线程。
2.1 方法说明
AQS主要提供了如下一些方法:
- protected final int getState()
返回同步状态的当前值
- protected final void setState(int newState)
设置当前同步状态
- protected final boolean compareAndSetState(int expect, int update)
CAS修改同步状态,如果设置成功返回true,否则返回false,该方法能够保证状态设置的原子性
- protected boolean tryAcquire(int arg)
独占式获取同步状态,获取同步状态成功后,返回true,其他线程需要等待该线程释放同步状态才能获取同步状态。获取同步状态失败,立即返回false
- public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
在指定超时时间内获取独占锁同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true
- protected boolean tryRelease(int arg)
独占式释放同步状态
- protected int tryAcquireShared(int arg)
共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败
- public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException
在指定超时时间内共享锁同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true
- protected boolean tryReleaseShared(int arg)
共享式释放同步状态
- protected boolean isHeldExclusively()
当前同步器是否在独占式模式下被线程占用,即表示锁是否被当前线程所独占
- public final void acquire(int arg)
独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回。否则,将会进入同步队列等待,直至获取锁
- public final void acquireInterruptibly(int arg) throws InterruptedException
与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回,不会继续等待
- public final void acquireShared(int arg)
共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态
- public final void acquireSharedInterruptibly(int arg) throws InterruptedException
共享式获取同步状态,响应中断
- public final boolean release(int arg)
释放独占式同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
- public final boolean releaseShared(int arg)
共享式释放同步状态
2.2 同步队列
AQS内部维护了一个同步队列,该队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理。当前线程获取同步状态失败时,AQS则会将当前线程和等待状态等信息构造成一个节点(Node)并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态(如果是非公平锁,当锁被释放时,不一定被同步队列首节点获取到锁,也有可能被正在申请锁的线程获取到,这就是非公平性的体现)。
同步队列中,一个节点表示一个正在等待的线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
//methods
}
其中nextWaiter是显示条件(Condition)用来组织所有等待(await)的线程节点的,这点在下面讲显示条件实现时在介绍。先来看一下同步队列的组织形式:
head指向的节点比较特殊,假如当前只有一个线程等待锁,那么同步队列也会有两个节点。可以理解为head节点是当前正在使用锁的节点,该节点初始化时,prev域为null,thread域为null,waitStatus为0。head节点的后继节点才是等待锁的第一个线程对应的节点。
3. 从AQS解释ReentrantLock实现
3.1 ReentrantLock定义
public class ReentrantLock implements Lock, java.io.Serializable {
//用于实现显式锁的同步对象(sync其实就是队列同步器AQS)
private final Sync sync;
//AQS实现类
abstract static class Sync extends AbstractQueuedSynchronizer {}
//非公平锁
static final class NonfairSync extends Sync {}
//公平锁
static final class FairSync extends Sync{}
//默认构造函数
public ReentrantLock() {
sync = new NonfairSync();
}
//构造函数
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//methods
}
可以看到,ReentrantLock为了复用AQS的代码,在内部通过继承于AQS的同步类Sync来使用AQS提供的同步队列等特性,这也是之前讲的AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态的体现。
Sync类是个抽象类,并没有完全实现AQS的抽象方法,这是因为ReentrantLock提供公平锁和非公平锁,这两种锁获取锁的策略是不同的,不同的逻辑分别在NonfairSync和FairSync中实现。 另外,通过构造函数可以看出,ReentrantLock的默认实现是非公平锁。
先来看一下Sync、NoFairSync和FairSync的实现(这里先省略跟获取锁/解锁无关的逻辑):
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 抽象方法,用于公平锁/非公平锁子类实现不同的获取锁的策略
*/
abstract void lock();
/**
* 非公平尝试获取独占锁,获取不到立即返回false
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁没有被占有,立即CAS尝试占有锁,这就是非公平的体现
//因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//此时线程是持有锁的,直接调用set方法设置同步状态即可
setState(nextc);
return true;
}
//除以上两种情况,都认为获取锁失败
return false;
}
/**
* 释放独占锁同步状态
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果当前线程非持有锁的线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//此时线程是持有锁的,直接调用set方法设置同步状态即可
setState(c);
return free;
}
}
static final class NonfairSync extends Sync {
/**
* 非公平获取独占锁,获取不到会阻塞
*/
final void lock() {
//直接CAS修改同步状态,非公平就体现在这,新申请锁的线程有可能优先于同步队列的线程获取到锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//获取独占锁,如果获取不到会阻塞线程,直至获取到锁,acquire方法是AQS中定义的方法
acquire(1);
}
/**
* 非公平尝试获取独占锁,该方法是AQS的抽象方法,方法实现直接调用Sync的nonfairTryAcquire方法
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
/**
* 公平获取独占锁
*/
final void lock() {
//获取独占锁,跟非公平锁相比,少了CAS尝试,acquire方法是AQS中定义的方法
acquire(1);
}
/**
* 公平尝试获取独占锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//这里跟给公平锁相比多了个条件,同步队列中没有等待锁的线程,才会通过CAS修改同步状态
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
这里可以看出,Sync类中定义了抽象方法lock(),在子类NoFairSync和FairSync中都给出了实现。在子类NoFairSync和FairSync中,除了实现了Sync类的lock方法,还覆盖了父类AQS中的tryAcquire方法,这一点也好理解,tryAcquire方法用于尝试获取独占锁,在公平锁和非公平锁模式下肯定是不同的,需要各自实现。
3.2 lock()
这里以非公平锁为例,来分析一下非公平锁加锁方法lock的实现细节。ReentrantLock中lock方法实现如下:
public void lock() {
sync.lock();
}
上面我们知道,通过ReentrantLock的构造函数,会对成员变量sync赋值,并且不同的构造函数,得到的sync对象是不同的(NoFairSync/FairSync)。所以当sync是非公平锁对象时,这里调用的肯定是NoFairSync类的lock方法:
/**
* 非公平获取独占锁,获取不到会阻塞
*/
final void lock() {
//直接CAS修改同步状态,修改成功,就成功获取了锁
//非公平就体现在这,新申请锁的线程有可能优先于同步队列的线程获取到锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//获取独占锁,如果获取不到会阻塞线程,直至获取到锁,acquire方法是AQS中定义的方法
acquire(1);
}
上述代码的逻辑很简单,当线程申请锁时,默认会让当前线程CAS尝试修改同步状态,如果修改成功,当前线程就获取了锁,并调用AbstractOwnableSynchronizer类的setExclusiveOwnerThread方法将持有锁的线程设置为当前线程。因为上述这个直接CAS的过程,即使同步队列已经有线程在等待锁,锁也有可能被后来申请锁的线程拿到,这就是非公平的体现。当CAS修改同步状态失败时,就调用AQS的acquire方法获取锁:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法实现中,首先会调用tryAcquire方法尝试获取锁,关于tryAcquire方法,上面讲过NoFairSync和FairSync都实现了该方法,对于非公平锁肯定调用的是NoFairSync类的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* 非公平尝试获取独占锁,获取不到立即返回false
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁没有被占有,立即CAS尝试占有锁,这也是非公平的体现
//因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//此时线程是持有锁的,直接调用set方法设置同步状态即可
setState(nextc);
return true;
}
//除以上两种情况,都认为获取锁失败
return false;
}
tryAcquire方法实现比较简单,这里不多讲了,关键点都在上面方法的注释中。回到AQS的acquire方法,如果tryAcquire方法尝试获取锁失败,则会调用acquireQueued方法获取锁,acquireQueued方法的第一个参数是通过addWaiter方法获取的:
/**
* 向同步队列中插入一个节点
*/
private Node addWaiter(Node mode) {
//新生成一个节点node,表示当前等待锁的线程
Node node = new Node(Thread.currentThread(), mode);
//获取前驱节点tail(node要插入到tail后面)
Node pred = tail;
//tail非null,说明同步队列中已经有节点,只要把node插入到当前tail节点后,并将tail设置为node即可
if (pred != null) {
node.prev = pred;
//CAS原子方式设置tail指向
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//tail为null(同步队列为空)或上述CAS设置tail指向失败,调用enq方法入队
enq(node);
return node;
}
/**
* 同步队列为空或上述CAS设置tail指向失败时,node入队
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//tail为null,表示同步队列中无节点,这时候需要初始化
if (compareAndSetHead(new Node()))
//head指向一个空节点,该节点Thread域为null,waitStatus为0
//理解这点很重要,第一个等待锁的线程节点并不是同步队列的head节点,head节点是一个空节点
//head节点的后继节点是第一个等待锁的节点
tail = head;
} else {
//node入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
也就是讲,通过addWaiter方法,返回了新入队的等待锁的节点引用。这里要重点强调一下,同步队列的head节点其实指向的是个空节点,第一个等待锁的线程节点是head的后继节点。下面来看acquireQueued方法实现:
/**
* 排队获取锁对象
* 走到这,说明线程尝试获取锁失败,只能排队获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//这里是个死循环,调出循环的条件是当前节点是head的后继节点,并且尝试获取锁成功
for (;;) {
//获取node的前驱节点p
final Node p = node.predecessor();
//如果node是p的后继节点,并尝试获取锁成功
if (p == head && tryAcquire(arg)) {
//node已经获取到锁,则node节点修改为同步队列的head,并断开node的prev连接
setHead(node);
//setHead方法已经断开了node的prev连接,这里在断开之前head的next连接,之前的head节点就可以被GC回收了
p.next = null; // help GC
failed = false;
//返回interrupted标志,interrupted表示线程在等待锁期间是否被中断
return interrupted;
}
//如果节点尝试获取锁失败,就要阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued方法简单的讲就是让同步队列中的node节点获取锁,获取锁的过程是个死循环,只有等node节点获取到锁,才能从循环中跳出。所以node获取锁的过程中可能会进行多次尝试,每次尝试失败时,都要阻塞自己的线程。
acquireQueued方法核心就是如下三点:
- 循环获取锁,可以获取锁的前提是node节点是head的后继节点,并且尝试获取锁成功
- 如果尝试获取锁失败就要检查是否需要阻塞当前线程,如果node的前驱节点waitStatus是SIGNAL,则需要阻塞
- 则塞线程调用LockSupport.park方法,该方法是个响应中断的方法,线程可以从阻塞中返回的前提是别的线程调用了unpark方法唤醒了当前线程
检查等待锁的线程是否需要阻塞的方法shouldParkAfterFailedAcquire实现:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驱节点为SIGNAL(-1),表示前面节点的线程释放锁会通过unpark唤醒node,
//因此当node获取锁失败时,需要调用park堵塞阻塞线程,等待被前面的节点唤醒
return true;
if (ws > 0) {
//前驱节点状态为CANCLE(1),表示节点代表的线程已经不需要获取锁,删除前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; //更新next指针,删除取消状态的节点
} else {
//前驱节点状态未0(初始状态),则CAS将前驱节点状态修改为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//走到这说明前驱节点waitStatus状态不是SIGNAL,当前线程不需要park,
//在下次检查是该方法会返回true,当前线程会park阻塞
return false;
}
总结起来就是,如果node的前驱节点waitStatus状态是SIGNAL(-1),那么node代表的节点就需要阻塞等待。然后来看一下重点逻辑,当线程获取不到锁时,是如何实现阻塞的,就是parkAndCheckInterrupt方法:
/**
* 阻塞线程,返回线程在阻塞期间是否被中断
*/
private final boolean parkAndCheckInterrupt() {
//阻塞线程,调用该方法后,线程会释放CPU,进入阻塞状态
LockSupport.park(this);
//走到这里,说明线程已经从阻塞中返回了
//前提是其他线程调用了unpark方法,唤醒了当前线程
return Thread.interrupted(); //返回线程中断状态,并清除中断状态
}
如果该方法返回true,说明线程在阻塞等待锁期间被中断过。但是从acquireQueued方法实现可以看出,即使线程在等待期间被中断了,也只是设置了中断标记局部变量interrupted,并不会从等待中返回,所以acquireQueued是不响应中断的,锁是否相应中断,其实区别就在parkAndCheckInterrupt方法返回true时的处理上。
到这里加锁过程基本就讲完了,回到最开始的几个问题,我们至少可以回答前两个了:
- 问题1:显式锁底层是基于CAS实现,细节是怎样的?
显示锁是基于AQS实现的,AQS维护锁的状态是通过内部int类型的成员变量state来表示的,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。加锁的过程就是通过CAS修改state成员变量的过程。
- 问题2:显式锁跟synchronized类似,无法获取锁时,会阻塞当前线程,如何实现阻塞的?
显示锁中,当尝试获取锁失败时,会调用acquireQueued方法排队获取锁,这个队列就是AQS内部维护的双向同步队列,只有当前线程对应的节点是双向队列head节点的后继节点并且尝试获取锁成功时,才表示线程最终获取锁成功,否则就需要继续阻塞等待,直至满足上述条件。阻塞等待是通过LockSupport.park方法实现的。
3.3 tryLock()
之前的文章中讲过,显示锁中提供tryLock方法,可以避免死锁。在持有一个锁,获取另一个锁,获取不到的时候,可以释放已持有的锁,给其他线程机会获取锁。也就是讲,在tryLock方法中,如果尝试获取锁失败时,并不会去排队获取锁并阻塞线程,而是会直接返回果false(获取锁失败)。这里以非公平锁的tryLock方法为示例:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
可以看到tryLock就是直接通过调用Sync类的nofairTryAcquire方法实现的,nofairTryAcquire方法的实现之前已经讲过,其实就是非公平方式尝试获取锁:
/**
* 非公平尝试获取独占锁,获取不到立即返回false
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁没有被占有,立即CAS尝试占有锁,这也是非公平的体现
//因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//此时线程是持有锁的,直接调用set方法设置同步状态即可
setState(nextc);
return true;
}
//除以上两种情况,都认为获取锁失败
return false;
}
也就是讲noFairTryAcquire的前提条件是(以下条件满足一个即可):
- 线程在申请锁时,锁没有被其它线程持有,并且当前线程成功通过CAS修改了同步状态
- 有线程持有锁,但是持有锁的线程是当前线程,此次加锁只是一次重入,只需要将同步状态加1即可
除了上述两种情况,都视为获取锁失败,直接返回false。
回到最开始的问题3:显式锁提供还可以提供tryLock的操作,获取不到锁立即返回,如何实现?
显示锁tryLock与lock方法相比,方法最开始都会通过通过noFairTryAcquire方法尝试获取锁,但是对于尝试获取锁失败的情况,tryLock直接返回false,并不会阻塞当前线程,lock方法会调用acquireQueued方法,排队获取锁,如果获取不到锁就阻塞线程,直到成功获取锁。
3.4 lockInterruptibly
上面讲lock方法的时候,我们知道lock方法获取锁是不响应中断的。也就是讲在lock方法在获取不到锁阻塞等待的过程中,如果线程被中断,线程并不会从中断中返回,还是会继续等待获取锁。但是显示锁提供了另一个加锁方法lockInterruptibly,可以响应中断,如下:
public void lockInterruptibly() throws InterruptedException {
//直接调用AQS的acquireInterruptibly方法
sync.acquireInterruptibly(1);
}
/**
* 响应中断获取独占锁
*/
public final void acquireInterruptibly(int arg) throws InterruptedException {
//获取中断状态,并清除中断标识,如果线程被中断,直接抛异常
if (Thread.interrupted())
throw new InterruptedException();
//先尝试获取独占锁,如果获取锁成功,方法直接结束
//否则调用doAcquireInterruptibly方法,以响应中断的方式获取独占锁
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* 以响应中断模式获取独占锁
* 这里的实现跟acquireQueued基本一致,唯一的区别在于park阻塞等待过程中,线程被中断的处理上
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果线程阻塞等待锁的过程中被中断,直接抛异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里可以看到,响应中断获取锁和普通获取锁的方法实现上基本是一致的,唯一的区别就在park阻塞等待过程中,如果线程被中断的处理上。lock方法调用的acquireQueued方法,在发生中断时,只是记了中断标记,并没有中断线程,还是会继续等待锁。但是lockInterruptibly方法调用的doAcquireInterruptibly方法,在发生中断时,直接抛InterruptedException,放弃继续等待锁,所以可以响应中断。这也是最开始的问题4的答案。
3.5 unLock
unLock方法就是解锁,按照之前加锁的过程,我们大概能猜出解锁操作需要做的事情:修改state同步变量、unpark唤醒在同步队列等待锁的线程。下面来看一下unLock方法实现:
public void unlock() {
//直接调用ReentrantLock类的内部类Sync类的release方法释放锁
sync.release(1);
}
/**
* 释放线程持有的锁
*/
public final boolean release(int arg) {
//尝试释放锁(修改同步状态state)
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 尝试释放锁
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果当前线程并没有持有锁,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//锁是否被完全释放(因为可以重入)
boolean free = false;
if (c == 0) {
//如果锁完全释放,将持有锁的线程设置为null
free = true;
setExclusiveOwnerThread(null);
}
//此刻线程还持有锁,所以直接调用set方法修改state即可,不用CAS
setState(c);
return free;
}
/**
* 锁被完全释放后,unpark唤醒等待队列中等待的线程
*/
private void unparkSuccessor(Node node) {
//显式独占锁解锁时,这里的node为双向同步队列的head指向的节点
int ws = node.waitStatus;
//如果head节点的waitStatus状态为SIGNAL(-1),说明后面的节点需要被unpark唤醒竞争锁
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//寻找node后第一个没有被取消的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒node后第一个没有被取消的节点
if (s != null)
LockSupport.unpark(s.thread);
}
释放锁,也就是修改AQS同步对象state,并unpark唤醒同步队列中等待锁的线程的过程。
3.6 公平和非公平性
关于公平和非公平的实现,看似一个非常复杂的问题,但ReentrantLock实现方式是很简单的。根本区别就在ReentrantLock类的内部类FairSync和NoFairSync方法tryAcquire实现上,对于非公平锁,允许后请求锁的线程直接获取锁,即使同步队列中已经有线程在等待。而对于公平锁而言,如果同步队列中有等待锁的线程,必须入队等待,等队头的节点一一获取锁并释放后,才能获取锁。代码之前都解释过:
- 非公平锁
/**
* 非公平尝试获取独占锁,该方法是AQS的抽象方法,方法实现直接调用Sync的nonfairTryAcquire方法
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* 非公平尝试获取独占锁,获取不到立即返回false
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁没有被占有,立即CAS尝试占有锁,这就是非公平的体现
//因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//此时线程是持有锁的,直接调用set方法设置同步状态即可
setState(nextc);
return true;
}
//除以上两种情况,都认为获取锁失败
return false;
}
- 公平锁
/**
* 公平尝试获取独占锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//这里跟给公平锁相比多了个条件,同步队列中没有等待锁的线程,才会通过CAS修改同步状态
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里问题5也能解释了。
4. 从AQS解释显示条件Condition实现
上篇文章介绍过显示条件Condition的使用,其实显示条件的实现也是基于AQS实现的,这里我们分析显示条件是如何实现的。在看源码时,我自己有一个技巧,在真正看之前,先彻底搞明白这个东西到底是干嘛的,需要做哪些工作才能完成这些功能。然后带着疑问去看源码,这样往往比较好理解。
回到显示条件,我们肯定知道显示条件的await方法的作用就是让线程释放锁并进入WAITING状态等待,直至其他线程调用signal唤醒重新去竞争锁。能调用await和signal的前提是,当前线程是持有锁的。
显示条件的实现依赖于一个单向队列,队列的每个节点复用了上文同步队列的Node节点(为了区别于之前的同步队列,我们之后管这个单向队列叫Condition队列)。Node类中有一个Node类型的成员变量nextWaiter就是Condition队列中使用的(在同步操作中并没有使用nextWaiter)。在JUC中,不止一个类要使用到显式锁,所以AQS中提供了一个内部类ConditionObject,在ConditionObject类中实现了await、signal的逻辑。ConditionObject类声明如下:
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
//构造函数
public ConditionObject() { }
//methods,包括await、signal等方法
}
4.1 await
public final void await() throws InterruptedException {
// 如果等待前中断标志位已被设置,直接抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 1.为当前线程创建节点,加入条件等待队列
Node node = addConditionWaiter();
// 2.释放持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3.放弃CPU,进行等待,直到被中断或isOnSyncQueue变为true
// isOnSyncQueue为true表示节点被其他线程从Condition队列
// 移到了同步队列,等待的条件已满足
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4.重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清除Condition队列中所有CANCELLED状态的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 5.处理中断,抛出异常或设置中断标志位
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
可以看到,显示条件的实现是通过一个单向Condition队列实现的,await方法首先会创建一个Node节点并插入到Condition队列尾部,然后释放对锁的持有,park释放CPU资源进入WAITING状态,直至当前节点从Condition队列转移至同步队列(其他线程调用了signal方法,将node添加到同步队列,并唤醒node对应的线程)。线程会唤醒后会调用acquireQueued方法排队获取锁,获取锁成功后才能从await方法返回。
4.2 awaitNanos
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
//等待超时,将节点从条件等待队列移到外部的锁等待队列
transferAfterCancelledWait(node);
break;
}
//限定等待的最长时间
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
//计算下次等待的最长时间
nanosTimeout -= now - lastTime;
lastTime = now;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return nanosTimeout - (System.nanoTime() - lastTime);
}
实现跟await基本一致,不同的是,添加了一个超时时间控制。
4.3 signal
public final void signal() {
//验证当前线程持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//调用doSignal将Condition队列中第一个节点添加到同步队列,并unpark唤醒节点对应的线程
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用signal方法后,会将Condition队列中等待的第一个节点添加到同步队列并unpark唤醒线程,这时候await方法会从park中返回去竞争锁。
5. 从AQS解释ReentrantReadWriteLock实现
前面一篇文章介绍了读写锁ReentrantReadWriteLock的使用细节,我们了解到ReentrantReadWriteLock相比于ReentrantLock是一种更加细粒度的锁,将读取与写入分开处理,在读取数据之前必须获取用来读取的锁定,而写入的时候必须获取用来写入的锁定。并且允许多个线程同时获取读锁,但是同时只允许一个线程获取写锁。从而达到“读读”不互斥、“读写”互斥、“写写”互斥的效果。
ReentrantReadWriteLock在实现上也是通过AQS实现的,通过上面介绍的ReentrantLock源码分析,我们知道AQS通过sate状态进行锁的获取与释放,同时构造了双向FIFO同步队列进行线程节点的等待,线程节点通过waitStatus来判断自己需要挂起还是唤醒去获取锁。如果已经掌握了AQS的原理,那么ReentrantReadWriteLock的实现也是非常好理解的。先看一下ReentrantReadWriteLock类声明:
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {}
//other methods
}
简单介绍一下ReentrantReadWriteLock内部类的作用:
- Sync:继承AQS,锁功能的主要实现者
- FairSync:继承Sync,实现公平锁
- NofairSync:继承Sync,主要实现非公平锁
- ReadLock:读锁,通过成员变量sync实现读锁功能
- WriteLock:写锁,通过成员变量sync实现写锁功能
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/ static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//……
}
ReentrantReadWriteLock的内部类Sync继承了AQS,主要提供了锁功能的实现,但是跟ReentrantLock的内部类Sync不同的是上面这几个final类型的int型成员变量,除此之外,方法定义基本一致(因为都继承了AQS),这里提前介绍一下这几个成员变量的作用。我们知道AQS是通过int型成员变量state来维护同步状态的,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。但是在读写锁中,读锁和写锁是分开的,如何使用一个变量state表示两种锁的同步状态?答案是通过位运算,将int型成员变量的32为分为高16位和低16位,高16位表示读锁,低16位表示写锁。所以再回头看上述Sync类的方法sharedCount,其实就是获取c低16位的值,即读锁被持有的数量。exclusiveCount,其实就是获取c高16位的值,即写锁被持有的数量。这里再简单回顾一下上篇文章讲的读锁和写锁的进入条件:
- 线程进入读锁的条件(满足其中一个即可):
- 没有任何线程持有写锁
- 有线程持有写锁,但是持有写锁的线程是当前线程
- 线程进入写锁的条件(满足其中一个即可)
- 没有任何线程持有读锁或写锁
- 有线程持有写锁,但是持有写锁的线程是当前线程
5.1 写锁lock
/**
* 阻塞获取写锁
*/
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
看过上面讲的ReentrantLock获取锁,写锁lock的实现代码一定非常熟悉,实现跟ReentrantLock的lock方法完全一致。也是调用了Sync类的acquire方法(继承自AQS),然后在acquire方法中通过回调子类的tryAcquire方法尝试获取锁,其他的逻辑都不再单独介绍了,这里重点介绍ReentrantReadWriteLock内部类Sync的tryAcquire逻辑实现,tryAcquire其实就是非阻塞尝试获取写锁,如下:
protected final boolean tryAcquire(int acquires) {
//获取当前线程
Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//获取写锁持有的数量
int w = exclusiveCount(c);
//同步状态不等于0,说明已经锁已经被获取过了(可以是读锁也可以是写锁)
if (c != 0) {
//1. c!=0说明是有锁被获取的,那么w==0,
//说明写锁是没有被获取,也肯定是读锁被获取了,
//由于写锁和读锁的互斥,所以获取获取写锁失败,返回false
//2. w!=0,说明写锁被获取了,但是current != getExclusiveOwnerThread() ,
// 说明是被别的线程获取了,return false;
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//走到这说明w!=0 && current == getExclusiveOwnerThread()
//当前持有写锁的线程跟请求写锁的线程是同一线程,本次加锁是写锁重入
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//写锁重入,直接调用setState方法修改同步状态
setState(c + acquires);
return true;
}
// c == 0,说明没有线程持有锁,CAS尝试获取锁
//writerShouldBlock方法在FairSync和NoFairSync中各自实现,分别保证公平和非公平性
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
关键点都在注释中说明了,比较好理解。我们回顾一下ReentrantLock中tryAcquire的实现,ReentrantLock中内部类FairSync和NoFairSync都重写了AQS的tryAcquire方法,但是实现逻辑都是首先处理c==0(锁没有被获取)直接CAS(公平锁需要另外判断同步队列是否有排队的线程),然后处理c!=0,判断是否为锁重入。ReentrantReadWriteLock中,只在Sync类中重写了tryAcquire逻辑,FairSync和NoFairSync中并没有重写该方法,而是提供了writerShouldBlock和readerShouldBlock方法供Sync类的tryAcquire方法调用,判断是否可以直接CAS竞争锁,从而体现公平性和非公平性。在实现逻辑上,跟ReentrantLock中是一致的,只不过在ReentrantReadWriteLock中先判断的c!=0的情况,然后判断c==0的情况,跟ReentrantLock中处理顺序有点差别。
static final class NonfairSync extends Sync {
/**
* 非公平锁模式下请求写锁的线程是否需要排队获取锁
*/
final boolean writerShouldBlock() {
//直接竞争,不保证公平性
return false;
}
/**
* 非公平锁模式下请求读锁的线程是否需要排队获取锁
*/
final boolean readerShouldBlock() {
//理论上也可以直接返回false,但是为了避免同步队列中等待的写锁饥饿
//所以做了额外的判断,这里牺牲了一定的不公平性
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
/**
* 公平锁模式下请求写锁的线程是否需要排队获取锁
*/
final boolean writerShouldBlock() {
//如果同步队列中有排队的线程,当前线程请求锁就需要排队
return hasQueuedPredecessors();
}
/**
* 公平锁模式下请求读锁的线程是否需要排队获取锁
*/
final boolean readerShouldBlock() {
//如果同步队列中有排队的线程,当前线程请求锁就需要排队
return hasQueuedPredecessors();
}
}
5.2 读锁lock
/**
* 阻塞式获取共享读锁
*/
public void lock() {
sync.acquireShared(1);
}
阻塞式获取共享读锁的实现是通过调用ReentrantReadWriteLock的内部类Sync的acquireShared方法(继承自AQS)实现的。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock的内部类Sync重写了AQS的tryAcquireShared方法,所以最终会调用Sync类的tryAcquireShared:
/**
* 非阻塞尝试获取读锁
*/
protected final int tryAcquireShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//exclusiveCount写锁持有数目!=0,并且持有写锁的线程非当前线程,
//读写互斥,return -1,表示尝试获取读锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//走到这里说明写锁没有被持有,或者写锁被持有写锁被持有但是持有写锁的线程是当前线程
//这两种情况,都说明允许获取读锁
int r = sharedCount(c); //获取读锁持有数目
//CAS获取读锁,获取成功return 1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//处理CAS失败和重入读锁的情况,不阻塞
return fullTryAcquireShared(current);
}
如果tryAcquireShared方法尝试获取锁失败,就会调用AQS的diAcquiredShared方法排队获取共享读锁,逻辑跟之前讲的acquiredShared方法非常相似:
/**
* 尝试获取共享读锁
*/
private void doAcquireShared(int arg) {
//等待读锁节点入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//死循环等待获取锁,跳出循环的前提是,当前节点是head的后继节点,并且尝试获取锁成功
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//如果没有获取到锁,就要park阻塞当前请求锁的线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
跟acquireQueued的区别就在于尝试获取锁的逻辑和获取到锁的后续操作。
5.3 写锁释放unlock
/**
* 释放独占写锁
*/
public void unlock() {
//调用AQS的release方法
sync.release(1);
}
/**
* AQS释放独占锁,该方法在ReentrantLock和ReentrantReadWriteLock都有使用
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* ReentrantReadWriteLock内部类Sync重写tryRelease方法
*/
protected final boolean tryRelease(int releases) {
//如果当前请求释放写锁的线程不是持有写锁的线程,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//释放后锁的持有数量,因为写锁在低16位,所以直接减就可以,不用移位
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//如果写锁持有数量为0,说明写锁已经被完全释放
//讲写锁持有线程设置为null
if (free)
setExclusiveOwnerThread(null);
//这里锁还没有释放,直接通过set方法设置同步状态
setState(nextc);
return free;
}
5.3 读锁释放unlock
/**
* ReentrantReadWriteLock释放共享读锁
*/
public void unlock() {
//直接调用AQS的releaseShared方法释放共享锁
sync.releaseShared(1);
}
/**
* 释放共享锁
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)//如果是首次获取读锁,那么第一次获取读锁释放后就为空了
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) { //表示全部释放完毕
readHolds.remove(); //释放完毕,那么久把保存的记录次数remove掉
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// nextc 是 state 高 16 位减 1 后的值
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) //CAS设置状态
return nextc == 0; //这个判断如果高16位减1后的值==0,那么就是读状态和写状态都释放了
}
}
关于ReentrantReadWriteLock实现分析,就先讲到这里。其它比如tryLock、lockInterruptibly就不一一讲解了,实现思想跟ReentrantLock类似,只不过ReentrantReadWriteLock对同步状态分为高16位和低16位分别处理。同时通过代码分析可以知道,无论是ReentrantLock还是ReentrantReadWriteLock都最大程度上复用了AQS提供的方法,可以说设计非常精巧。除了ReentrantLock和ReentrantReadWriteLock,像之前文章介绍的同步工具Semaphore也使用了AQS,可以讲AQS是JUC的基础。
5.4 读写锁同步状态分析
上面讲到读写锁ReentrantReadWriteLock中,将同步状态分为高16位和低16位,分别表示读锁和写锁,读锁位共享锁写锁位独占锁,从而实现读写锁分离。这里我们就来介绍一下ReentrantReadWriteLock是如何实现实现分别维护读写锁状态的。
static final int SHARED_SHIFT = 16;
//65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 获取读的状态 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 获取写锁的获取状态 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
我们按照图示内容的数据进行运算,图示的32位二进制数据为:00000000000000100000000000000011
- 读状态获取
00000000000000100000000000000011 >>> 16
,无符号右移16位(获取高16位值),结果如下:00000000000000000000000000000010
,换算成10进制数等于2,说明读状态为:2。
- 写状态获取
00000000000000100000000000000011 & 65535
,转换成2进制运算为00000000000000100000000000000011 & 00000000000000001111111111111111
这里其实就是获取低16位值,运算结果为: 00000000000000100000000000000011
,换算成10进制为3。
这里的设计非常巧妙,在不修改AQS的代码前提下,仅仅通过原来的State变量就满足了读锁和写锁的分离。
参考链接:
1. Java API
2. 《Java编程的逻辑》