coding……
但行好事 莫问前程

Java编程拾遗『AQS』

上篇文章讲了Java中显式锁Lock和ReadWriteLock的概念及简单使用示例,本篇文章重点来看一下Java中显式锁的底层实现。在看这部分代码时,我任务比较好理解的方式是带着问题去看源码。对于显式锁,我们可能会有以下问题:

  • 问题1:显式锁底层是基于CAS实现,细节是怎样的?
  • 问题2:显式锁跟synchronized类似,无法获取锁时,会阻塞当前线程,如何实现阻塞的?
  • 问题3:显式锁提供还可以提供tryLock的操作,获取不到锁立即返回,如何实现?
  • 问题4:显式锁在等待时,可以响应中断,如何实现?
  • 问题5:显式锁可以提供公平锁和非公平锁,如何实现?
  • 问6:读写锁底层又是怎么实现的?

带着这些问题,我们来看Java显式锁的实现源码。

1. LockSupport

显式锁在无法获取到锁时,需要阻塞当前线程,synchronized中是依赖于底层操作系统函数实现的,在显式锁中必然也要有类似的机制,可以阻塞申请不到锁的线程。在显式锁中是依赖包java.util.concurrent.locks下的LockSupport实现的,它的基本方法有:

public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline)
public static void unpark(Thread thread)

park使得当前线程放弃CPU,进入等待状态(WAITING),操作系统不再对它进行调度,什么时候再调度呢?有其他线程对它调用了unpark,unpark需要指定一个线程,unpark会使之恢复可运行状态。看个例子:

public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread (){
        public void run(){
            LockSupport.park();
            System.out.println("exit");
        }
    };
    t.start();    
    Thread.sleep(1000);
    LockSupport.unpark(t);
}

线程t启动后调用park,会放弃CPU,主线程睡眠1秒钟后,调用unpark,线程t恢复运行,输出exit。

park不同于Thread.yield(),yield只是告诉操作系统可以先让其他线程运行,但自己依然是可运行状态,而park会放弃调度资格,使线程进入WAITING状态。

需要说明的是,park是响应中断的,当有中断发生时,park会返回,线程的中断状态会被设置

park有两个变体:

  • parkNanos:可以指定等待的最长时间,参数是相对于当前时间的纳秒数。
  • parkUntil:可以指定最长等到什么时候,参数是绝对时间,是相对于纪元时的毫秒数。 

当等待超时的时候,它们也会返回。

park方法还有一些变体,可以指定一个对象,表示是因为该对象进行等待的,以便于调试,通常传递的值是this,如下:

public static void park(Object blocker)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline)

LockSupport有一个方法,可以返回一个线程的blocker对象:

public static Object getBlocker(Thread t)

这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法,Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为LockSupport中的这些方法就是基本操作。

2. AbstractQueuedSynchronizer(AQS)

利用CAS和LockSupport提供的基本方法,肯定就可以实现ReentrantLock了。但Java中还有很多其他并发工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它们的实现有很多类似的地方,为了复用代码,Java提供了一个抽象类AbstractQueuedSynchronizer,我们简称为AQS,它简化了并发工具的实现。像ReentrantReadWriteLock、Semaphore、CountDownLatch都是基于AQS实现的。

AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列等。基于AQS来构建同步器可以带来很多好处,它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。 同时AQS在设计上充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。

AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

AQS通过内置的FIFO同步队列来完成线程获取锁的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。AQS类声明如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

    //methods
}

上述head和tail,就是内部的双向同步队列,state用来表示同步状态。另外,AQS继承了AbstractOwnableSynchronizer类,AbstractOwnableSynchronizer类中有一个Thread类型的成员变量,用于存储当前持有锁的线程。AbstractOwnableSynchronizer类提供getExclusiveOwnerThread方法,可以获取当前持有锁的线程。

2.1 方法说明

AQS主要提供了如下一些方法:

  • protected final int getState()

返回同步状态的当前值

  • protected final void setState(int newState)

设置当前同步状态

  • protected final boolean compareAndSetState(int expect, int update)

CAS修改同步状态,如果设置成功返回true,否则返回false,该方法能够保证状态设置的原子性

  • protected boolean tryAcquire(int arg)

独占式获取同步状态,获取同步状态成功后,返回true,其他线程需要等待该线程释放同步状态才能获取同步状态。获取同步状态失败,立即返回false

  • public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException

在指定超时时间内获取独占锁同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true

  • protected boolean tryRelease(int arg)

独占式释放同步状态

  • protected int tryAcquireShared(int arg)

共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败

  • public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException

在指定超时时间内共享锁同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true

  • protected boolean tryReleaseShared(int arg)

共享式释放同步状态

  • protected boolean isHeldExclusively()

当前同步器是否在独占式模式下被线程占用,即表示锁是否被当前线程所独占

  • public final void acquire(int arg)

独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回。否则,将会进入同步队列等待,直至获取锁

  • public final void acquireInterruptibly(int arg) throws InterruptedException

与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回,不会继续等待

  • public final void acquireShared(int arg)

共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态

  • public final void acquireSharedInterruptibly(int arg) throws InterruptedException

共享式获取同步状态,响应中断

  • public final boolean release(int arg)

释放独占式同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒

  • public final boolean releaseShared(int arg)

共享式释放同步状态

2.2 同步队列

AQS内部维护了一个同步队列,该队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理。当前线程获取同步状态失败时,AQS则会将当前线程和等待状态等信息构造成一个节点(Node)并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态(如果是非公平锁,当锁被释放时,不一定被同步队列首节点获取到锁,也有可能被正在申请锁的线程获取到,这就是非公平性的体现)。

同步队列中,一个节点表示一个正在等待的线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

static final class Node {
	volatile int waitStatus;

	volatile Node prev;

	volatile Node next;

	volatile Thread thread;

	Node nextWaiter;

	//methods
}

其中nextWaiter是显示条件(Condition)用来组织所有等待(await)的线程节点的,这点在下面讲显示条件实现时在介绍。先来看一下同步队列的组织形式:

head指向的节点比较特殊,假如当前只有一个线程等待锁,那么同步队列也会有两个节点。可以理解为head节点是当前正在使用锁的节点,该节点初始化时,prev域为null,thread域为null,waitStatus为0。head节点的后继节点才是等待锁的第一个线程对应的节点

3. 从AQS解释ReentrantLock实现

3.1 ReentrantLock定义

public class ReentrantLock implements Lock, java.io.Serializable {
	
	//用于实现显式锁的同步对象(sync其实就是队列同步器AQS)
	private final Sync sync;

	//AQS实现类
	abstract static class Sync extends AbstractQueuedSynchronizer {}

	//非公平锁
	static final class NonfairSync extends Sync {}

	//公平锁
	static final class FairSync extends Sync{}

	//默认构造函数
	public ReentrantLock() {
        sync = new NonfairSync();
    }

    //构造函数
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    //methods
}

可以看到,ReentrantLock为了复用AQS的代码,在内部通过继承于AQS的同步类Sync来使用AQS提供的同步队列等特性,这也是之前讲的AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态的体现。

Sync类是个抽象类,并没有完全实现AQS的抽象方法,这是因为ReentrantLock提供公平锁和非公平锁,这两种锁获取锁的策略是不同的,不同的逻辑分别在NonfairSync和FairSync中实现。 另外,通过构造函数可以看出,ReentrantLock的默认实现是非公平锁

先来看一下Sync、NoFairSync和FairSync的实现(这里先省略跟获取锁/解锁无关的逻辑):

abstract static class Sync extends AbstractQueuedSynchronizer {

    /**
     * 抽象方法,用于公平锁/非公平锁子类实现不同的获取锁的策略
     */
    abstract void lock();

    /**
     * 非公平尝试获取独占锁,获取不到立即返回false
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //如果锁没有被占有,立即CAS尝试占有锁,这就是非公平的体现
        //因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //此时线程是持有锁的,直接调用set方法设置同步状态即可
            setState(nextc);
            return true;
        }
        //除以上两种情况,都认为获取锁失败
        return false;
    }

    /**
     * 释放独占锁同步状态
     */
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        //如果当前线程非持有锁的线程,抛异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        //此时线程是持有锁的,直接调用set方法设置同步状态即可
        setState(c);
        return free;
    }
}
static final class NonfairSync extends Sync {

    /**
     * 非公平获取独占锁,获取不到会阻塞
     */
    final void lock() {
    	//直接CAS修改同步状态,非公平就体现在这,新申请锁的线程有可能优先于同步队列的线程获取到锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
        	//获取独占锁,如果获取不到会阻塞线程,直至获取到锁,acquire方法是AQS中定义的方法
            acquire(1);
    }

    /**
     * 非公平尝试获取独占锁,该方法是AQS的抽象方法,方法实现直接调用Sync的nonfairTryAcquire方法
     */
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {

	/**
	* 公平获取独占锁
	*/
    final void lock() {
    	//获取独占锁,跟非公平锁相比,少了CAS尝试,acquire方法是AQS中定义的方法
        acquire(1);
    }

    /**
     * 公平尝试获取独占锁
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
        	//这里跟给公平锁相比多了个条件,同步队列中没有等待锁的线程,才会通过CAS修改同步状态
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

这里可以看出,Sync类中定义了抽象方法lock(),在子类NoFairSync和FairSync中都给出了实现。在子类NoFairSync和FairSync中,除了实现了Sync类的lock方法,还覆盖了父类AQS中的tryAcquire方法,这一点也好理解,tryAcquire方法用于尝试获取独占锁,在公平锁和非公平锁模式下肯定是不同的,需要各自实现。

3.2 lock()

这里以非公平锁为例,来分析一下非公平锁加锁方法lock的实现细节。ReentrantLock中lock方法实现如下:

public void lock() {
    sync.lock();
}

上面我们知道,通过ReentrantLock的构造函数,会对成员变量sync赋值,并且不同的构造函数,得到的sync对象是不同的(NoFairSync/FairSync)。所以当sync是非公平锁对象时,这里调用的肯定是NoFairSync类的lock方法:

/**
 * 非公平获取独占锁,获取不到会阻塞
 */
final void lock() {
	//直接CAS修改同步状态,修改成功,就成功获取了锁
    //非公平就体现在这,新申请锁的线程有可能优先于同步队列的线程获取到锁
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
    	//获取独占锁,如果获取不到会阻塞线程,直至获取到锁,acquire方法是AQS中定义的方法
        acquire(1);
}

上述代码的逻辑很简单,当线程申请锁时,默认会让当前线程CAS尝试修改同步状态,如果修改成功,当前线程就获取了锁,并调用AbstractOwnableSynchronizer类的setExclusiveOwnerThread方法将持有锁的线程设置为当前线程。因为上述这个直接CAS的过程,即使同步队列已经有线程在等待锁,锁也有可能被后来申请锁的线程拿到,这就是非公平的体现。当CAS修改同步状态失败时,就调用AQS的acquire方法获取锁:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire方法实现中,首先会调用tryAcquire方法尝试获取锁,关于tryAcquire方法,上面讲过NoFairSync和FairSync都实现了该方法,对于非公平锁肯定调用的是NoFairSync类的tryAcquire方法:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}    

/**
 * 非公平尝试获取独占锁,获取不到立即返回false
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果锁没有被占有,立即CAS尝试占有锁,这也是非公平的体现
    //因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //此时线程是持有锁的,直接调用set方法设置同步状态即可
        setState(nextc);
        return true;
    }
    //除以上两种情况,都认为获取锁失败
    return false;
}

tryAcquire方法实现比较简单,这里不多讲了,关键点都在上面方法的注释中。回到AQS的acquire方法,如果tryAcquire方法尝试获取锁失败,则会调用acquireQueued方法获取锁,acquireQueued方法的第一个参数是通过addWaiter方法获取的:

/**
* 向同步队列中插入一个节点
*/
private Node addWaiter(Node mode) {
	//新生成一个节点node,表示当前等待锁的线程
    Node node = new Node(Thread.currentThread(), mode);

    //获取前驱节点tail(node要插入到tail后面)
    Node pred = tail;
    //tail非null,说明同步队列中已经有节点,只要把node插入到当前tail节点后,并将tail设置为node即可
    if (pred != null) {
        node.prev = pred;
        //CAS原子方式设置tail指向
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //tail为null(同步队列为空)或上述CAS设置tail指向失败,调用enq方法入队
    enq(node);
    return node;
}
/**
* 同步队列为空或上述CAS设置tail指向失败时,node入队
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
        //tail为null,表示同步队列中无节点,这时候需要初始化
            if (compareAndSetHead(new Node()))
            	//head指向一个空节点,该节点Thread域为null,waitStatus为0
            	//理解这点很重要,第一个等待锁的线程节点并不是同步队列的head节点,head节点是一个空节点
            	//head节点的后继节点是第一个等待锁的节点
                tail = head;
        } else {
        	//node入队
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

也就是讲,通过addWaiter方法,返回了新入队的等待锁的节点引用。这里要重点强调一下,同步队列的head节点其实指向的是个空节点,第一个等待锁的线程节点是head的后继节点。下面来看acquireQueued方法实现:

/**
* 排队获取锁对象
* 走到这,说明线程尝试获取锁失败,只能排队获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //这里是个死循环,调出循环的条件是当前节点是head的后继节点,并且尝试获取锁成功
        for (;;) {
        	//获取node的前驱节点p
            final Node p = node.predecessor();
            //如果node是p的后继节点,并尝试获取锁成功
            if (p == head && tryAcquire(arg)) {
            	//node已经获取到锁,则node节点修改为同步队列的head,并断开node的prev连接
                setHead(node);
                //setHead方法已经断开了node的prev连接,这里在断开之前head的next连接,之前的head节点就可以被GC回收了
                p.next = null; // help GC

                failed = false;
                //返回interrupted标志,interrupted表示线程在等待锁期间是否被中断
                return interrupted;
            }
            //如果节点尝试获取锁失败,就要阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued方法简单的讲就是让同步队列中的node节点获取锁,获取锁的过程是个死循环,只有等node节点获取到锁,才能从循环中跳出。所以node获取锁的过程中可能会进行多次尝试,每次尝试失败时,都要阻塞自己的线程

acquireQueued方法核心就是如下三点:

  1. 循环获取锁,可以获取锁的前提是node节点是head的后继节点,并且尝试获取锁成功
  2. 如果尝试获取锁失败就要检查是否需要阻塞当前线程,如果node的前驱节点waitStatus是SIGNAL,则需要阻塞
  3. 则塞线程调用LockSupport.park方法,该方法是个响应中断的方法,线程可以从阻塞中返回的前提是别的线程调用了unpark方法唤醒了当前线程

检查等待锁的线程是否需要阻塞的方法shouldParkAfterFailedAcquire实现:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //前驱节点为SIGNAL(-1),表示前面节点的线程释放锁会通过unpark唤醒node,
   		//因此当node获取锁失败时,需要调用park堵塞阻塞线程,等待被前面的节点唤醒
        return true;
    if (ws > 0) {
        //前驱节点状态为CANCLE(1),表示节点代表的线程已经不需要获取锁,删除前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node; //更新next指针,删除取消状态的节点
    } else {
    	//前驱节点状态未0(初始状态),则CAS将前驱节点状态修改为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //走到这说明前驱节点waitStatus状态不是SIGNAL,当前线程不需要park,
    //在下次检查是该方法会返回true,当前线程会park阻塞
    return false;
}

总结起来就是,如果node的前驱节点waitStatus状态是SIGNAL(-1),那么node代表的节点就需要阻塞等待。然后来看一下重点逻辑,当线程获取不到锁时,是如何实现阻塞的,就是parkAndCheckInterrupt方法:

/**
* 阻塞线程,返回线程在阻塞期间是否被中断
*/
private final boolean parkAndCheckInterrupt() {
    //阻塞线程,调用该方法后,线程会释放CPU,进入阻塞状态
    LockSupport.park(this);

    //走到这里,说明线程已经从阻塞中返回了
    //前提是其他线程调用了unpark方法,唤醒了当前线程
    return Thread.interrupted(); //返回线程中断状态,并清除中断状态
}

如果该方法返回true,说明线程在阻塞等待锁期间被中断过。但是从acquireQueued方法实现可以看出,即使线程在等待期间被中断了,也只是设置了中断标记局部变量interrupted,并不会从等待中返回,所以acquireQueued是不响应中断的,锁是否相应中断,其实区别就在parkAndCheckInterrupt方法返回true时的处理上

到这里加锁过程基本就讲完了,回到最开始的几个问题,我们至少可以回答前两个了:

  • 问题1:显式锁底层是基于CAS实现,细节是怎样的?

显示锁是基于AQS实现的,AQS维护锁的状态是通过内部int类型的成员变量state来表示的,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。加锁的过程就是通过CAS修改state成员变量的过程。

  • 问题2:显式锁跟synchronized类似,无法获取锁时,会阻塞当前线程,如何实现阻塞的?

显示锁中,当尝试获取锁失败时,会调用acquireQueued方法排队获取锁,这个队列就是AQS内部维护的双向同步队列,只有当前线程对应的节点是双向队列head节点的后继节点并且尝试获取锁成功时,才表示线程最终获取锁成功,否则就需要继续阻塞等待,直至满足上述条件。阻塞等待是通过LockSupport.park方法实现的。

3.3 tryLock()

之前的文章中讲过,显示锁中提供tryLock方法,可以避免死锁。在持有一个锁,获取另一个锁,获取不到的时候,可以释放已持有的锁,给其他线程机会获取锁。也就是讲,在tryLock方法中,如果尝试获取锁失败时,并不会去排队获取锁并阻塞线程,而是会直接返回果false(获取锁失败)。这里以非公平锁的tryLock方法为示例:

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

可以看到tryLock就是直接通过调用Sync类的nofairTryAcquire方法实现的,nofairTryAcquire方法的实现之前已经讲过,其实就是非公平方式尝试获取锁:

/**
 * 非公平尝试获取独占锁,获取不到立即返回false
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果锁没有被占有,立即CAS尝试占有锁,这也是非公平的体现
    //因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //此时线程是持有锁的,直接调用set方法设置同步状态即可
        setState(nextc);
        return true;
    }
    //除以上两种情况,都认为获取锁失败
    return false;
}

也就是讲noFairTryAcquire的前提条件是(以下条件满足一个即可):

  • 线程在申请锁时,锁没有被其它线程持有,并且当前线程成功通过CAS修改了同步状态
  • 有线程持有锁,但是持有锁的线程是当前线程,此次加锁只是一次重入,只需要将同步状态加1即可

除了上述两种情况,都视为获取锁失败,直接返回false。

回到最开始的问题3:显式锁提供还可以提供tryLock的操作,获取不到锁立即返回,如何实现?

显示锁tryLock与lock方法相比,方法最开始都会通过通过noFairTryAcquire方法尝试获取锁,但是对于尝试获取锁失败的情况,tryLock直接返回false,并不会阻塞当前线程,lock方法会调用acquireQueued方法,排队获取锁,如果获取不到锁就阻塞线程,直到成功获取锁。

3.4 lockInterruptibly

上面讲lock方法的时候,我们知道lock方法获取锁是不响应中断的。也就是讲在lock方法在获取不到锁阻塞等待的过程中,如果线程被中断,线程并不会从中断中返回,还是会继续等待获取锁。但是显示锁提供了另一个加锁方法lockInterruptibly,可以响应中断,如下:

public void lockInterruptibly() throws InterruptedException {
	//直接调用AQS的acquireInterruptibly方法
	sync.acquireInterruptibly(1);
}
/**
 * 响应中断获取独占锁
 */
public final void acquireInterruptibly(int arg) throws InterruptedException {
	//获取中断状态,并清除中断标识,如果线程被中断,直接抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //先尝试获取独占锁,如果获取锁成功,方法直接结束
    //否则调用doAcquireInterruptibly方法,以响应中断的方式获取独占锁
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
/**
 * 以响应中断模式获取独占锁
 * 这里的实现跟acquireQueued基本一致,唯一的区别在于park阻塞等待过程中,线程被中断的处理上
 */
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //如果线程阻塞等待锁的过程中被中断,直接抛异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里可以看到,响应中断获取锁和普通获取锁的方法实现上基本是一致的,唯一的区别就在park阻塞等待过程中,如果线程被中断的处理上。lock方法调用的acquireQueued方法,在发生中断时,只是记了中断标记,并没有中断线程,还是会继续等待锁。但是lockInterruptibly方法调用的doAcquireInterruptibly方法,在发生中断时,直接抛InterruptedException,放弃继续等待锁,所以可以响应中断。这也是最开始的问题4的答案。

3.5 unLock

unLock方法就是解锁,按照之前加锁的过程,我们大概能猜出解锁操作需要做的事情:修改state同步变量、unpark唤醒在同步队列等待锁的线程。下面来看一下unLock方法实现:

public void unlock() {
    //直接调用ReentrantLock类的内部类Sync类的release方法释放锁
    sync.release(1);
}
/**
 * 释放线程持有的锁
 */
public final boolean release(int arg) {
    //尝试释放锁(修改同步状态state)
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
/**
 * 尝试释放锁
 */
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //如果当前线程并没有持有锁,抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    //锁是否被完全释放(因为可以重入)
    boolean free = false;
    if (c == 0) {
    	//如果锁完全释放,将持有锁的线程设置为null
        free = true;
        setExclusiveOwnerThread(null);
    }
    //此刻线程还持有锁,所以直接调用set方法修改state即可,不用CAS
    setState(c);
    return free;
}
/**
 * 锁被完全释放后,unpark唤醒等待队列中等待的线程
 */
private void unparkSuccessor(Node node) {
	//显式独占锁解锁时,这里的node为双向同步队列的head指向的节点
    int ws = node.waitStatus;
    //如果head节点的waitStatus状态为SIGNAL(-1),说明后面的节点需要被unpark唤醒竞争锁
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //寻找node后第一个没有被取消的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //唤醒node后第一个没有被取消的节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁,也就是修改AQS同步对象state,并unpark唤醒同步队列中等待锁的线程的过程。

3.6 公平和非公平性

关于公平和非公平的实现,看似一个非常复杂的问题,但ReentrantLock实现方式是很简单的。根本区别就在ReentrantLock类的内部类FairSync和NoFairSync方法tryAcquire实现上,对于非公平锁,允许后请求锁的线程直接获取锁,即使同步队列中已经有线程在等待。而对于公平锁而言,如果同步队列中有等待锁的线程,必须入队等待,等队头的节点一一获取锁并释放后,才能获取锁。代码之前都解释过:

  • 非公平锁
/**
 * 非公平尝试获取独占锁,该方法是AQS的抽象方法,方法实现直接调用Sync的nonfairTryAcquire方法
 */
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * 非公平尝试获取独占锁,获取不到立即返回false
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果锁没有被占有,立即CAS尝试占有锁,这就是非公平的体现
    //因为同步队列中有可能有之前申请锁阻塞的线程却没有优先获取锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果锁已经被占有,并且占有锁的线程是当前线程,直接修改锁的重入次数state
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //此时线程是持有锁的,直接调用set方法设置同步状态即可
        setState(nextc);
        return true;
    }
    //除以上两种情况,都认为获取锁失败
    return false;
}
  • 公平锁
/**
 * 公平尝试获取独占锁
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    	//这里跟给公平锁相比多了个条件,同步队列中没有等待锁的线程,才会通过CAS修改同步状态
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

这里问题5也能解释了。

4. 从AQS解释显示条件Condition实现

上篇文章介绍过显示条件Condition的使用,其实显示条件的实现也是基于AQS实现的,这里我们分析显示条件是如何实现的。在看源码时,我自己有一个技巧,在真正看之前,先彻底搞明白这个东西到底是干嘛的,需要做哪些工作才能完成这些功能。然后带着疑问去看源码,这样往往比较好理解。

回到显示条件,我们肯定知道显示条件的await方法的作用就是让线程释放锁并进入WAITING状态等待,直至其他线程调用signal唤醒重新去竞争锁。能调用await和signal的前提是,当前线程是持有锁的。

显示条件的实现依赖于一个单向队列,队列的每个节点复用了上文同步队列的Node节点(为了区别于之前的同步队列,我们之后管这个单向队列叫Condition队列)。Node类中有一个Node类型的成员变量nextWaiter就是Condition队列中使用的(在同步操作中并没有使用nextWaiter)。在JUC中,不止一个类要使用到显式锁,所以AQS中提供了一个内部类ConditionObject,在ConditionObject类中实现了await、signal的逻辑。ConditionObject类声明如下:

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;

    /** Last node of condition queue. */
    private transient Node lastWaiter;

    //构造函数
    public ConditionObject() { }

    //methods,包括await、signal等方法
}  

4.1 await

public final void await() throws InterruptedException {
    // 如果等待前中断标志位已被设置,直接抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1.为当前线程创建节点,加入条件等待队列
    Node node = addConditionWaiter();
    // 2.释放持有的锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 3.放弃CPU,进行等待,直到被中断或isOnSyncQueue变为true
    // isOnSyncQueue为true表示节点被其他线程从Condition队列
    // 移到了同步队列,等待的条件已满足
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4.重新获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //清除Condition队列中所有CANCELLED状态的节点
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 5.处理中断,抛出异常或设置中断标志位
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

可以看到,显示条件的实现是通过一个单向Condition队列实现的,await方法首先会创建一个Node节点并插入到Condition队列尾部,然后释放对锁的持有,park释放CPU资源进入WAITING状态,直至当前节点从Condition队列转移至同步队列(其他线程调用了signal方法,将node添加到同步队列,并唤醒node对应的线程)。线程会唤醒后会调用acquireQueued方法排队获取锁,获取锁成功后才能从await方法返回

4.2 awaitNanos

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    long lastTime = System.nanoTime();
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            //等待超时,将节点从条件等待队列移到外部的锁等待队列
            transferAfterCancelledWait(node);
            break;
        }
        //限定等待的最长时间
        LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;

        long now = System.nanoTime();
        //计算下次等待的最长时间
        nanosTimeout -= now - lastTime;
        lastTime = now;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return nanosTimeout - (System.nanoTime() - lastTime);
}

实现跟await基本一致,不同的是,添加了一个超时时间控制。

4.3 signal

public final void signal() {
    //验证当前线程持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //调用doSignal将Condition队列中第一个节点添加到同步队列,并unpark唤醒节点对应的线程
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

调用signal方法后,会将Condition队列中等待的第一个节点添加到同步队列并unpark唤醒线程,这时候await方法会从park中返回去竞争锁。

5. 从AQS解释ReentrantReadWriteLock实现

前面一篇文章介绍了读写锁ReentrantReadWriteLock的使用细节,我们了解到ReentrantReadWriteLock相比于ReentrantLock是一种更加细粒度的锁,将读取与写入分开处理,在读取数据之前必须获取用来读取的锁定,而写入的时候必须获取用来写入的锁定。并且允许多个线程同时获取读锁,但是同时只允许一个线程获取写锁。从而达到“读读”不互斥、“读写”互斥、“写写”互斥的效果。

ReentrantReadWriteLock在实现上也是通过AQS实现的,通过上面介绍的ReentrantLock源码分析,我们知道AQS通过sate状态进行锁的获取与释放,同时构造了双向FIFO同步队列进行线程节点的等待,线程节点通过waitStatus来判断自己需要挂起还是唤醒去获取锁。如果已经掌握了AQS的原理,那么ReentrantReadWriteLock的实现也是非常好理解的。先看一下ReentrantReadWriteLock类声明:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    /** Performs all synchronization mechanics */
    final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implements Lock, java.io.Serializable {}

    public static class WriteLock implements Lock, java.io.Serializable {}

    //other methods

}

简单介绍一下ReentrantReadWriteLock内部类的作用:

  • Sync:继承AQS,锁功能的主要实现者
  • FairSync:继承Sync,实现公平锁
  • NofairSync:继承Sync,主要实现非公平锁
  • ReadLock:读锁,通过成员变量sync实现读锁功能
  • WriteLock:写锁,通过成员变量sync实现写锁功能
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;    

    /*
     * Read vs write count extraction constants and functions.
     * Lock state is logically divided into two unsigned shorts:
     * The lower one representing the exclusive (writer) lock hold count,
     * and the upper the shared (reader) hold count.
     */    static final int SHARED_SHIFT   = 16;
    
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;    

    /** Returns the number of shared holds represented in count  */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    /** Returns the number of exclusive holds represented in count  */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    //……
}

ReentrantReadWriteLock的内部类Sync继承了AQS,主要提供了锁功能的实现,但是跟ReentrantLock的内部类Sync不同的是上面这几个final类型的int型成员变量,除此之外,方法定义基本一致(因为都继承了AQS),这里提前介绍一下这几个成员变量的作用。我们知道AQS是通过int型成员变量state来维护同步状态的,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。但是在读写锁中,读锁和写锁是分开的,如何使用一个变量state表示两种锁的同步状态?答案是通过位运算,将int型成员变量的32为分为高16位和低16位,高16位表示读锁,低16位表示写锁。所以再回头看上述Sync类的方法sharedCount,其实就是获取c低16位的值,即读锁被持有的数量。exclusiveCount,其实就是获取c高16位的值,即写锁被持有的数量。这里再简单回顾一下上篇文章讲的读锁和写锁的进入条件:

  • 线程进入读锁的条件(满足其中一个即可):
    • 没有任何线程持有写锁
    • 有线程持有写锁,但是持有写锁的线程是当前线程
  • 线程进入写锁的条件(满足其中一个即可)
    • 没有任何线程持有读锁或写锁
    • 有线程持有写锁,但是持有写锁的线程是当前线程

5.1 写锁lock

/**
* 阻塞获取写锁
*/
public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

看过上面讲的ReentrantLock获取锁,写锁lock的实现代码一定非常熟悉,实现跟ReentrantLock的lock方法完全一致。也是调用了Sync类的acquire方法(继承自AQS),然后在acquire方法中通过回调子类的tryAcquire方法尝试获取锁,其他的逻辑都不再单独介绍了,这里重点介绍ReentrantReadWriteLock内部类Sync的tryAcquire逻辑实现,tryAcquire其实就是非阻塞尝试获取写锁,如下:

protected final boolean tryAcquire(int acquires) {
    //获取当前线程
    Thread current = Thread.currentThread();
    
    //获取同步状态
    int c = getState();

    //获取写锁持有的数量
    int w = exclusiveCount(c);

    //同步状态不等于0,说明已经锁已经被获取过了(可以是读锁也可以是写锁)
    if (c != 0) {
        //1. c!=0说明是有锁被获取的,那么w==0,
        //说明写锁是没有被获取,也肯定是读锁被获取了,
        //由于写锁和读锁的互斥,所以获取获取写锁失败,返回false
        //2. w!=0,说明写锁被获取了,但是current != getExclusiveOwnerThread() ,
        // 说明是被别的线程获取了,return false;
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //走到这说明w!=0 && current == getExclusiveOwnerThread()
        //当前持有写锁的线程跟请求写锁的线程是同一线程,本次加锁是写锁重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //写锁重入,直接调用setState方法修改同步状态
        setState(c + acquires);
        return true;
    }
    // c == 0,说明没有线程持有锁,CAS尝试获取锁
    //writerShouldBlock方法在FairSync和NoFairSync中各自实现,分别保证公平和非公平性
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

关键点都在注释中说明了,比较好理解。我们回顾一下ReentrantLock中tryAcquire的实现,ReentrantLock中内部类FairSync和NoFairSync都重写了AQS的tryAcquire方法,但是实现逻辑都是首先处理c==0(锁没有被获取)直接CAS(公平锁需要另外判断同步队列是否有排队的线程),然后处理c!=0,判断是否为锁重入。ReentrantReadWriteLock中,只在Sync类中重写了tryAcquire逻辑,FairSync和NoFairSync中并没有重写该方法,而是提供了writerShouldBlock和readerShouldBlock方法供Sync类的tryAcquire方法调用,判断是否可以直接CAS竞争锁,从而体现公平性和非公平性。在实现逻辑上,跟ReentrantLock中是一致的,只不过在ReentrantReadWriteLock中先判断的c!=0的情况,然后判断c==0的情况,跟ReentrantLock中处理顺序有点差别。

static final class NonfairSync extends Sync {

    /**
    * 非公平锁模式下请求写锁的线程是否需要排队获取锁
    */
    final boolean writerShouldBlock() {
        //直接竞争,不保证公平性
        return false;
    }

    /**
    * 非公平锁模式下请求读锁的线程是否需要排队获取锁
    */
    final boolean readerShouldBlock() {
        //理论上也可以直接返回false,但是为了避免同步队列中等待的写锁饥饿
        //所以做了额外的判断,这里牺牲了一定的不公平性
        return apparentlyFirstQueuedIsExclusive();
    }
}

static final class FairSync extends Sync {
    
    /**
    * 公平锁模式下请求写锁的线程是否需要排队获取锁
    */
    final boolean writerShouldBlock() {
        //如果同步队列中有排队的线程,当前线程请求锁就需要排队
        return hasQueuedPredecessors();
    }

    /**
    * 公平锁模式下请求读锁的线程是否需要排队获取锁
    */
    final boolean readerShouldBlock() {
        //如果同步队列中有排队的线程,当前线程请求锁就需要排队
        return hasQueuedPredecessors();
    }
}

5.2 读锁lock

/**
* 阻塞式获取共享读锁
*/
public void lock() {
    sync.acquireShared(1);
}

阻塞式获取共享读锁的实现是通过调用ReentrantReadWriteLock的内部类Sync的acquireShared方法(继承自AQS)实现的。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

ReentrantReadWriteLock的内部类Sync重写了AQS的tryAcquireShared方法,所以最终会调用Sync类的tryAcquireShared:

/**
* 非阻塞尝试获取读锁
*/
protected final int tryAcquireShared(int unused) {
    //获取当前线程
    Thread current = Thread.currentThread();

    //获取同步状态
    int c = getState();

    //exclusiveCount写锁持有数目!=0,并且持有写锁的线程非当前线程,
    //读写互斥,return -1,表示尝试获取读锁失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;

    //走到这里说明写锁没有被持有,或者写锁被持有写锁被持有但是持有写锁的线程是当前线程
    //这两种情况,都说明允许获取读锁       
    int r = sharedCount(c); //获取读锁持有数目

    //CAS获取读锁,获取成功return 1
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //处理CAS失败和重入读锁的情况,不阻塞
    return fullTryAcquireShared(current);
}

如果tryAcquireShared方法尝试获取锁失败,就会调用AQS的diAcquiredShared方法排队获取共享读锁,逻辑跟之前讲的acquiredShared方法非常相似:

/**
* 尝试获取共享读锁
*/
private void doAcquireShared(int arg) {
    //等待读锁节点入同步队列
    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;
    try {
        boolean interrupted = false;
        //死循环等待获取锁,跳出循环的前提是,当前节点是head的后继节点,并且尝试获取锁成功
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //如果没有获取到锁,就要park阻塞当前请求锁的线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

跟acquireQueued的区别就在于尝试获取锁的逻辑和获取到锁的后续操作。

5.3 写锁释放unlock

/**
* 释放独占写锁
*/
public void unlock() {
    //调用AQS的release方法
    sync.release(1);
}
/**
* AQS释放独占锁,该方法在ReentrantLock和ReentrantReadWriteLock都有使用
*/
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
/**
* ReentrantReadWriteLock内部类Sync重写tryRelease方法
*/
protected final boolean tryRelease(int releases) {
    //如果当前请求释放写锁的线程不是持有写锁的线程,抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //释放后锁的持有数量,因为写锁在低16位,所以直接减就可以,不用移位
    int nextc = getState() - releases;

    boolean free = exclusiveCount(nextc) == 0;
    //如果写锁持有数量为0,说明写锁已经被完全释放
    //讲写锁持有线程设置为null
    if (free)
        setExclusiveOwnerThread(null);
    //这里锁还没有释放,直接通过set方法设置同步状态
    setState(nextc);
    return free;
}

5.3 读锁释放unlock

/**
* ReentrantReadWriteLock释放共享读锁
*/
public void unlock() {
    //直接调用AQS的releaseShared方法释放共享锁
    sync.releaseShared(1);
}
/**
* 释放共享锁
*/
protected final boolean tryReleaseShared(int unused) {
   Thread current = Thread.currentThread();
   if (firstReader == current) {
       // assert firstReaderHoldCount > 0;
       if (firstReaderHoldCount == 1)//如果是首次获取读锁,那么第一次获取读锁释放后就为空了
           firstReader = null;
       else
           firstReaderHoldCount--;
   } else {
       HoldCounter rh = cachedHoldCounter;
       if (rh == null || rh.tid != getThreadId(current))
           rh = readHolds.get();
       int count = rh.count;
       if (count <= 1) { //表示全部释放完毕
           readHolds.remove();  //释放完毕,那么久把保存的记录次数remove掉
           if (count <= 0)
               throw unmatchedUnlockException();
       }
       --rh.count;
   }
   for (;;) {
       int c = getState();
        // nextc 是 state 高 16 位减 1 后的值
       int nextc = c - SHARED_UNIT;
       if (compareAndSetState(c, nextc)) //CAS设置状态
           return nextc == 0; //这个判断如果高16位减1后的值==0,那么就是读状态和写状态都释放了
   }
}

关于ReentrantReadWriteLock实现分析,就先讲到这里。其它比如tryLock、lockInterruptibly就不一一讲解了,实现思想跟ReentrantLock类似,只不过ReentrantReadWriteLock对同步状态分为高16位和低16位分别处理。同时通过代码分析可以知道,无论是ReentrantLock还是ReentrantReadWriteLock都最大程度上复用了AQS提供的方法,可以说设计非常精巧。除了ReentrantLock和ReentrantReadWriteLock,像之前文章介绍的同步工具Semaphore也使用了AQS,可以讲AQS是JUC的基础。

5.4 读写锁同步状态分析

上面讲到读写锁ReentrantReadWriteLock中,将同步状态分为高16位和低16位,分别表示读锁和写锁,读锁位共享锁写锁位独占锁,从而实现读写锁分离。这里我们就来介绍一下ReentrantReadWriteLock是如何实现实现分别维护读写锁状态的。

static final int SHARED_SHIFT   = 16;

//65536
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        
//65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        
//65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 获取读的状态  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        
/** 获取写锁的获取状态 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

我们按照图示内容的数据进行运算,图示的32位二进制数据为:00000000000000100000000000000011

  • 读状态获取

00000000000000100000000000000011 >>> 16,无符号右移16位(获取高16位值),结果如下:00000000000000000000000000000010,换算成10进制数等于2,说明读状态为:2。

  • 写状态获取

00000000000000100000000000000011 & 65535,转换成2进制运算为00000000000000100000000000000011 & 00000000000000001111111111111111

这里其实就是获取低16位值,运算结果为: 00000000000000100000000000000011 ,换算成10进制为3。

这里的设计非常巧妙,在不修改AQS的代码前提下,仅仅通过原来的State变量就满足了读锁和写锁的分离。

参考链接:

1. Java API

2. 《Java编程的逻辑》

3. 【死磕Java并发】—–J.U.C之AQS(一篇就够了)

4. Java锁之ReentrantReadWriteLock

赞(1) 打赏
Zhuoli's Blog » Java编程拾遗『AQS』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址