coding……
但行好事 莫问前程

Java编程拾遗『显式锁』

在之前讲线程同步时,介绍了synchronized的锁的使用及底层原理,也介绍了synchronized锁的一些使用局限,本篇文章来介绍一下Java中提供的另一种线程同步机制——显式锁。Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类有:

  • 锁接口Lock,主要实现类是ReentrantLock
  • 读写锁接口ReadWriteLock,主要实现类是ReentrantReadWriteLock

1. Lock接口

S.N.方法说明
1void lock()获取锁,获取锁不成功会阻塞当前线程
2void unlock()释放锁
3void lockInterruptibly() throws InterruptedException获取锁,获取锁不成功会阻塞等待,如果等待期间被其他线程中断了,抛出InterruptedException
4boolean tryLock()尝试获取锁,立即返回,不阻塞,如果获取成功,返回true,否则返回false
5boolean tryLock(long time, TimeUnit unit) throws InterruptedException先尝试获取锁,如果能成功则立即返回true,否则阻塞等待,但等待的最长时间为指定的参数,在等待的同时响应中断,如果发生了中断,抛出InterruptedException,如果在等待的时间内获得了锁,返回true,否则返回false
6Condition newCondition()新建一个条件,用于显式锁的协作,使用Condition条件,可以起到和synchronized锁使用wait/signal同样的效果

从上述方法可以看出,相比synchronized,显式锁支持以非阻塞方式获取锁、可以响应中断、可以限时,比synchronized灵活的多。

2. ReentrantLock

Lock接口的主要实现类是ReentrantLock,它的基本用法lock/unlock实现了与synchronized一样的语义,包括:

  • 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁
  • 可以解决竞态条件问题
  • 可以保证内存可见性

2.1 方法说明

S.N.
方法说明
1public ReentrantLock()构造函数,获取非公平所对象
2public ReentrantLock(boolean fair)构造函数,获取锁对象,fair参数用于控制是否为公平锁
3public boolean isLocked()判断锁是否被持有,只要有线程持有就返回true(不一定是当前线程持有)
4public boolean isHeldByCurrentThread()判断锁是否被当前线程持有
5public int getHoldCount()锁被当前线程持有的数量,如果锁不被当前线程持有返回0
6public final boolean isFair()判断锁是否公平
7public final boolean hasQueuedThreads()判断是否有线程在等待该锁
8public final boolean hasQueuedThread(Thread thread)判断指定的线程thread是否在等待该锁
9public final int getQueueLength()获取在等待该锁的线程个数

ReentrantLock实现了Lock接口,所以除了上述方法,还实现了上述Lock的所有方法。

2.2 使用示例

还用之前的计数器做示例,之前为了安全地改变计数,使用了synchronized同步,这里展示一下如何使用显式锁同步,如下:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private volatile int count;

    public void incr() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

需要注意的是,使用显式锁,一定要记得调用unlock,一般而言,应该将lock之后的代码包装到try语句内,在finally语句内释放锁。上述示例代码是显式锁最基本的用法,使用起来跟synchronized非常类似,下面分别介绍一下Lock接口提供的lockInterruptibly和tryLock。

2.2.1 lockInterruptibly响应中断

之前在Java编程拾遗『线程中断』一文中讲到,
使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。也就是讲使用synchronized锁,处于BLOCKED状态的线程对象调用interrupt()方法,线程是不会响应的,所以线程自然也不会终止。但是显式锁中提供了lockInterruptibly()方法,可以在等待锁的过程中响应interrupt()方法。先来看一下之前使用synchronized锁的情况:

public class InterruptSynchronizedDemo {
    private static Object lock = new Object();

    private static class A extends Thread {
        @Override
        public void run() {
            synchronized (lock) {
                while (!Thread.currentThread().isInterrupted()) {
                }
            }
            System.out.println("exit");
        }
    }

    public static void test() throws InterruptedException {
        synchronized (lock) {
            A a = new A();
            a.start();
            Thread.sleep(1000);

            a.interrupt();
            a.join();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        test();
    }
}

test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并等待线程线程a结束。但事实上,test方法会一直运行下去,无法终止。说明synchronized锁是无法响应中断的。下面我们换成显式锁:

public class InterruptSynchronizedDemo {
    private static Lock lock = new ReentrantLock();

    private static class A extends Thread {
        @Override
        public void run() {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                System.out.println("获取锁期间线程被中断");
                return;
            }

            try {
                while (!Thread.currentThread().isInterrupted()) {
                }
            } finally {
                lock.unlock();
            }

            System.out.println("exit");
        }
    }

    public static void test() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            A a = new A();
            a.start();
            Thread.sleep(1000);

            a.interrupt();
            a.join();

        } finally {
            lock.unlock();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        test();
    }
}

运行结果:

获取锁期间线程被中断

说明显式锁的lockInterruptibly方法获取锁,实可以响应中断的。使用lockInterruptibly获取锁等待期间,如果线程被终止,线程将抛出InterruptedException。

2.2.2 tryLock避免死锁

使用lock方法获取锁时,获取不到锁就会阻塞当前线程,之后重新获取锁。所以如果出现相互等待锁的情况,lock获取锁的方式就会发生死锁,如下例子,银行账户之间转账,用类Account表示账户:

public class Account {
    private Lock lock = new ReentrantLock();
    private volatile double money;

    public Account(double initialMoney) {
        this.money = initialMoney;
    }

    public void add(double money) {
        lock.lock();
        try {
            this.money += money;
        } finally {
            lock.unlock();
        }
    }

    public void reduce(double money) {
        lock.lock();
        try {
            this.money -= money;
        } finally {
            lock.unlock();
        }
    }

    public double getMoney() {
        return money;
    }

    void lock() {
        lock.lock();
    }

    void unlock() {
        lock.unlock();
    }

    boolean tryLock() {
        return lock.tryLock();
    }
}

Account里的money表示当前余额,add/reduce用于修改余额。在账户之间转账,需要两个账户都锁定,如果不使用tryLock,直接使用lock,代码看上去可以这样:

public class AccountMgr {
    public static class NoEnoughMoneyException extends Exception {}

    public static void transfer(Account from, Account to, double money)
            throws NoEnoughMoneyException {
        from.lock();
        try {
            to.lock();
            try {
                if (from.getMoney() >= money) {
                    from.reduce(money);
                    to.add(money);
                } else {
                    throw new NoEnoughMoneyException();
                }
            } finally {
                to.unlock();
            }
        } finally {
            from.unlock();
        }
    }
}

但这么写是有问题的,如果两个账户同时给对方转账,都先获取了第一个锁,则会发生死锁。我们写段代码来模拟这个过程:

public static void simulateDeadLock() {
    final int accountNum = 10;
    final Account[] accounts = new Account[accountNum];
    final Random rnd = new Random();
    for (int i = 0; i < accountNum; i++) {
        accounts[i] = new Account(rnd.nextInt(10000));
    }

    int threadNum = 100;
    Thread[] threads = new Thread[threadNum];
    for (int i = 0; i < threadNum; i++) {
        threads[i] = new Thread() {
            public void run() {
                int loopNum = 100;
                for (int k = 0; k < loopNum; k++) {
                    int i = rnd.nextInt(accountNum);
                    int j = rnd.nextInt(accountNum);
                    int money = rnd.nextInt(10);
                    if (i != j) {
                        try {
                            transfer(accounts[i], accounts[j], money);
                        } catch (NoEnoughMoneyException e) {
                        }
                    }
                }
            }
        };
        threads[i].start();
    }
}

以上创建了10个账户,100个线程,每个线程执行100次循环,在每次循环中,随机挑选两个账户进行转账。基本上每次都会发生死锁。

下面使用tryLock来进行修改,先定义一个tryTransfer方法:

public static boolean tryTransfer(Account from, Account to, double money)
        throws NoEnoughMoneyException {
    if (from.tryLock()) {
        try {
            if (to.tryLock()) {
                try {
                    if (from.getMoney() >= money) {
                        from.reduce(money);
                        to.add(money);
                    } else {
                        throw new NoEnoughMoneyException();
                    }
                    return true;
                } finally {
                    to.unlock();
                }
            }
        } finally {
            from.unlock();
        }
    }
    return false;
}

如果两个锁都能够获得,且转账成功,则返回true,否则返回false,不管怎样,结束都会释放所有锁。transfer方法可以循环调用该方法以避免死锁,代码如下:

public static void transfer(Account from, Account to, double money)
        throws NoEnoughMoneyException {
    boolean success = false;
    do {
        success = tryTransfer(from, to, money);
        if (!success) {
            Thread.yield();
        }
    } while (!success);
}

上述代码,使用tryLock(),可以避免死锁。在持有一个锁,获取另一个锁,获取不到的时候,可以释放已持有的锁,给其他线程机会获取锁,然后再重试获取所有锁

2.2.3 显式条件

锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronzied相对应,而显式条件与wait/notify相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。

条件与锁相关联,创建条件变量需要通过显式锁,Lock接口定义了创建方法:

Condition newCondition()

Condition表示条件变量,是一个接口,定义了线程之间协作的各种方法,如下:

S.N.方法说明
1void await() throws InterruptedException对应Object的wait(),使当前线程等待,又RUNNING进入WAITTING状态
boolean await(long time, TimeUnit unit) throws InterruptedException等待时间是相对时间,如果由于等待超时返回,返回值为false,否则为true,等待期间线程被中断会抛异常
long awaitNanos(long nanosTimeout) throws InterruptedException等待时间是相对时间,参数单位是纳秒,返回值是nanosTimeout减去实际等待的时间
boolean awaitUntil(Date deadline) throws InterruptedException等待时间是绝对时间,如果由于等待超时返回,返回值为false,否则为true
void awaitUninterruptibly()该方法不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置
void signal()唤醒一个等待的线程
void signalAll()唤醒所有等待的线程

一般而言,与Object的wait方法一样,调用await方法前需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从await方法中退出

另外,与Object的wait方法一样,await返回后,不代表其等待的条件就一定满足了,通常要将await的调用放到一个循环内,只有条件满足后才退出

一般而言,signal/signalAll与notify/notifyAll一样,调用它们需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。signal与notify一样,挑选一个线程进行唤醒,signalAll与notifyAll一样,唤醒所有等待的线程,但这些线程被唤醒后都需要重新竞争锁,获取锁后才会从await调用中返回。

2.2.3.1 使用示例

之前讲线程协作的文章Java编程拾遗『线程协作』中讲到同时开始(发令枪)的协作场景,使用Object的wait()合notifyAll()方法实现。这里来看一下如何使用显示条件来实现:

public class WaitThread extends Thread {
    private volatile boolean fire = false;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            try {
                while (!fire) {
                    condition.await();
                }
            } finally {
                lock.unlock();
            }
            System.out.println("fired");
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    public void fire() {
        lock.lock();
        try {
            this.fire = true;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        WaitThread waitThread = new WaitThread();
        waitThread.start();
        Thread.sleep(1000);
        System.out.println("fire");
        waitThread.fire();
    }
}

需要特别注意的是,不要将signal/signalAll与notify/notifyAll混淆,notify/notifyAll是Object中定义的方法,Condition对象也有,稍不注意就会误用,比如,对上面例子中的fire方法,可能会写为:

public void fire() {
    lock.lock();
    try {
        this.fire = true;
        condition.notify();
    } finally {
        lock.unlock();
    }
}

写成这样,编译器不会报错,但运行时会抛出IllegalMonitorStateException,因为notify的调用不在synchronized语句内。

同样,避免将锁与synchronzied混用,那样非常令人混淆,比如:

public void fire() {
    synchronized(lock){
        this.fire = true;
        condition.signal();
    }
}

总之,要记住一个规则:显式条件与显式锁配合使用,wait/notify与synchronized配合使用

2.2.3.1 使用显示条件实现消费者生产者模式
public class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;
    private Lock lock = new ReentrantLock();
    private Condition notFull  = lock.newCondition();
    private Condition notEmpty = lock.newCondition();


    public MyBlockingQueue(int limit) {
        this.limit = limit;
        queue = new ArrayDeque<>(limit);
    }

    public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try{
            while (queue.size() == limit) {
                notFull.await();
            }
            queue.add(e);
            notEmpty.signal();    
        }finally{
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        lock.lockInterruptibly();
        try{
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            E e = queue.poll();
            notFull.signal();
            return e;    
        }finally{
            lock.unlock();
        }
    }
}

定义了两个等待条件:不满(notFull)、不空(notEmpty),在put方法中,如果队列满,则在noFull上等待,在take方法中,如果队列空,则在notEmpty上等待,put操作后通知notEmpty,take操作后通知notFull。而在之前Java编程拾遗『线程协作』那篇文章介绍的生产者消费者实现,使通过notifyAll通知所有等待线程实现的,唤醒了本不必要唤醒的线程。而使用显示条件避免了不必要的唤醒和检查。

3. ReadWriteLock

上面介绍了java.util.concurrent.locks包下的的显式锁Lock,接下来介绍该包下另一个锁——读写锁接口ReadWriteLock,及其主要实现类ReentrantReadWriteLock。

读写锁模式将读取与写入分开处理,在读取数据之前必须获取用来读取的锁定,而写入的时候必须获取用来写入的锁定。因为读取时实例的状态不会改变,所以多个线程可以同时读取;但是,写入会改变实例的状态,所以当有一个线程写入的时候,其它线程既不能读取与不能写入。也就是说,读写锁要遵守以下三个原则:

  • 允许多个线程同时读共享变量
  • 只允许一个线程写共享变量
  • 如果一个写线程正常执行写操作,此时禁止读线程读取共享变量

ReentrantReadWriteLock提供一把读锁、一把写锁实现了读写锁模式,本篇文章仅简单介绍以下ReentrantReadWriteLock读写锁的使用,关于实现原理会在下篇文章中介绍。

  • 线程进入读锁的条件(满足其中一个即可):
    • 没有任何线程持有写锁
    • 有线程持有写锁,但是持有写锁的线程是当前线程
  • 线程进入写锁的条件(满足其中一个即可)
    • 没有任何线程持有读锁或写锁
    • 有线程持有写锁,但是持有写锁的线程是当前线程

从上述条件我们可以看出,如果一个线程在持有读锁的前提下,是不能获取写锁的(读写锁不支持升级)。但是如果一个线程在持有写锁的前提下,实可以获取读锁的(读写锁支持降级)。并且单独看读锁合写锁,都是可重入的

如下代码会产生死锁,因为同一个线程中,在没有释放读锁的情况下,就去申请写锁,这属于锁升级,ReentrantReadWriteLock是不支持的:

 ReadWriteLock rtLock = new ReentrantReadWriteLock();
 rtLock.readLock().lock();
 System.out.println("get readLock.");
 rtLock.writeLock().lock();
 System.out.println("blocking");

ReentrantReadWriteLock支持锁降级,如下代码不会产生死锁:

ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.writeLock().lock();
System.out.println("writeLock");

rtLock.readLock().lock();
System.out.println("get read lock");

但是这段锁降级的代码虽然不会导致死锁,但没有正确的释放锁。从写锁降级成读锁,并不会自动释放当前线程获取的写锁,仍然需要显示的释放,否则别的线程永远也获取不到写锁

下面通过缓存的示例,来看一下读写锁的使用:

public class Cache {
    private Map<String, Object> map = new HashMap<>(128);
    private ReadWriteLock rwl = new ReentrantReadWriteLock();

    public Object get(String id) {
        Object value = null;
        rwl.readLock().lock();//首先开启读锁,从缓存中去取
        try {
            if (map.get(id) == null) {  //如果缓存中没有请求的数据,释放读锁,上写锁
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try {
                    if (map.get(id) == null) { //防止多写线程重复查询赋值
                        value = getFromDB();  //此时可以去数据库中查找,这里简单的模拟一下
                        map.put(id, value); //持有写锁,写缓存
                    }
                    rwl.readLock().lock(); //加读锁降级写锁
                } finally {
                    rwl.writeLock().unlock(); //释放写锁
                }
            } else {
                value = map.get(id);
                System.out.println("命中缓存");
            }
        } finally {
            rwl.readLock().unlock(); //最后释放读锁
        }
        return value;
    }

    private String getFromDB() {
        String value = String.valueOf(new Random().nextInt(100));
        System.out.println("数据库查询");
        return value;
    }
}
public class CacheTest {
    private static Cache cache = new Cache();

    public static void main(String[] args) {
        Random random = new Random();

        Thread[] threads = new Thread[10];

        /**
         * 这里启动10个线程,每个线程进行50次查询,所有线程每次查询的key分布比较集中(1 ~ 10)
         * 缓存命中的概率会很高
         */
        for (int i = 0; i < threads.length; i++) {

            Runnable runnable = () -> {
                for (int j = 0; j < 50; j ++) {
                    String key = String.valueOf(random.nextInt(10));
                    Object value = cache.get(key);
                }
            };

            threads[i] = new Thread(runnable);
            threads[i].start();
        }
    }
}

从运行结果可以看出,最开始会进行几次数据库查询操作,之后都是从缓存中获取的。

在多线程的环境下,对同一份数据进行读写,会涉及到线程安全的问题。比如在一个线程读取数据的时候,另外一个线程在写数据,而导致前后数据的不一致性;一个线程在写数据的时候,另一个线程也在写,同样也会导致线程前后看到的数据的不一致性。这时候可以在读写方法中加入互斥锁(synchronized、显式锁),任何时候只能允许一个线程的一个读或写操作,而不允许其他线程的读或写操作,这样是可以解决这样以上的问题,但是效率却大打折扣了。因为在真实的业务场景中,一份数据,读取数据的操作次数通常高于写入数据的操作,而线程与线程间的读读操作是不涉及到线程安全的问题,没有必要加入互斥锁,只要在读-写,写-写场景下互斥就行了,对于读读操作场景,多个线程可以共享读锁。总的来说,ReentrantReadWriteLock相比于ReentrantLock,是一种更细粒度的锁,对于读多写少的场景,可以获得更高的效率

最后总结一下显式锁和synchronized的区别:

  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而显式锁在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用显式锁时需要在finally块中释放锁
  • 显式锁可以让等待锁的线程响应中断,而synchronized不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断
  • 显式锁语义更丰富,可以提供公平锁/非公平锁,但是synchronized只能提供非公平锁
  • 通过显式锁可以获取锁的状态(锁是否被持有、是否被当前线程持有等),而synchronized却无法办到
  • 显式锁可以提供更细粒度的锁——读写锁,可以提高多个线程进行读操作的效率,synchronized只能做到所有操作都阻塞
  • 性能上来说,在资源竞争不激烈的情形下,Lock性能稍微比synchronized差点(JDK对synchronized进行了一些列优化)。但是当同步非常激烈的时候,ReentrantLock性能要高于synchronized的性能

参考链接:

1. 《Java编程的逻辑》

2. Java API

3. 【死磕Java并发】—–J.U.C之AQS(一篇就够了)

4. Java锁之ReentrantReadWriteLock

5. 并发库应用之五 & ReadWriteLock场景应用

赞(1) 打赏
Zhuoli's Blog » Java编程拾遗『显式锁』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址