coding……
但行好事 莫问前程

Java编程拾遗『原子变量』

本篇文章我们来讲java.util.concurrent包下的原子变量,原子变量的引入主要是为了解决普通变量(int、Integer、Long等)修改操作不是原子的,进而导致必须使用同步机制才能保证安全更新的问题。举个例子:

public class Counter {
    private int count;

    public void incr(){
        count ++;
    }
    
    public int getCount() {
        return count;
    }
}

Counter类有两个方法,incr方法用于对成员变量count自增加1,getCount方法用于获取count值,通过如下方式使用:

public class Test {

    public static class Counter {
        private int count;

        public void incr() {
            count++;
        }

        public int getCount() {
            return count;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();

        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    counter.incr();
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 5; i++) {
            threads[i].join();
        }

        System.out.println(counter.getCount());
    }
}

main方法中,开启五个线程,每个线程的调用counter的incr方法10次,那么理论上最后counter调用getCount()方法获取的结果是50。但是通过多次执行发现,结果不一定是50,有时候可能是49,有时候可能是48。原因在之前讲synchronized使用的那篇文章讲过,因为++操作不是原子的。一个++操作涉及三个动作:读取、加1、回写。那么就有可能导致多线程执行时,多个线程读取到同一个值,导致最终count的值小于50。

解决办法之前也讲过,就是把incr和getCount方法声明为synchronized方法。但是对于count++这种操作来说,使用synchronzied成本太高了(虽然synchronized在Java6之后进行了一系列优化),获取锁、释放锁以及相关的上下文切换等,这些都需要成本。

对于这种情况,完全可以使用原子变量代替,Java并发包中的基本原子变量类型有:

  • AtomicBoolean:原子Boolean类型
  • AtomicInteger:原子Integer类型
  • AtomicLong:原子Long类型
  • AtomicReference:原子引用类型

针对Integer,Long和Reference类型,还有对应的数组类型:

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

原子方式修改对象内部成员变量类型:

  • AtomicReferenceFieldUpdater

AtomicReference还有两个类似的类,在某些情况下更为易用:

  • AtomicMarkableReference
  • AtomicStampedReference

可以发现,没有针对char,short,float,double类型的原子变量。有可能是因为这几种类型使用的相对较少吧。如果需要,可以转换为int/long,然后使用AtomicInteger或AtomicLong。比如,对于float,可以使用Float类如下方法和int相互转换:

public static int floatToIntBits(float value)
public static float intBitsToFloat(int bits)

1. AtomicInteger

1.1 基本方法说明

S.N.方法说明
1public AtomicInteger(int initialValue)构造函数,给定初始值
2public AtomicInteger()构造函数,初始值为0
3public final int get()获取AtomicInteger中的值
4public final void set(int newValue)设置AtomicInteger中的值
5public final boolean compareAndSet(int expect, int update)CAS,expect为期望的值,update为修改的值
6public final int getAndSet(int newValue)以原子方式设置新值,并返回旧值
7public final int getAndIncrement()以原子方式将当前值加1,并返回旧值
8public final int getAndDecrement()以原子方式将当前值减1,并返回旧值
9public final int getAndAdd(int delta)以原子方式将当前值加delta,并返回旧值
10public final int incrementAndGet()以原子方式将当前值加1,并返回新值
11public final int decrementAndGet()以原子方式将当前值减1,并返回新值
12public final int addAndGet(int delta)以原子方式将当前值加delta,并返回新值
13public final int getAndUpdate(IntUnaryOperator updateFunction)Java8新方法,以原子方式将当前值更新为函数结果,并返回旧值
14public final int updateAndGet(IntUnaryOperator updateFunction)Java8新方法,以原子方式将当前值更新为函数结果,并返回新值
15public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction)Java8新方法,以原子方式将当前值更新为函数结果,并返回旧值
16public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction)Java8新方法,以原子方式将当前值更新为函数结果,并返回新值

这里重点提一下compareAndSet方法,该方法以原子方式实现了如下功能:如果当前值等于expect,则更新为update,否则不更新,如果更新成功,返回true,否则返回false。(依赖于Unsafe类,Unsafe类的方法都试native方法)

AtomicInteger可以在程序中用作一个计数器,多个线程并发更新,也总能实现正确性,比如上面的例子改为:

public class Test1 {
    public static class Counter {
        private AtomicInteger count;

        public Counter(AtomicInteger count) {
            this.count = count;
        }

        public void incr() {
            count.incrementAndGet();
        }

        public int getCount() {
            return count.get();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter(new AtomicInteger());

        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    counter.incr();
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 5; i++) {
            threads[i].join();
        }

        System.out.println(counter.getCount());
    }
}

因为incrementAndGet()方法是原子的,所以程序总能输出正确的结果50,虽然没有使用synchronized同步。

1.2 AtomicInteger基本原理

AtomicInteger类声明为:

public class AtomicInteger extends Number implements java.io.Serializable {

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    //Methods
}

Unsafe类提供Java直接访问系统内存资源、自主管理内存的入口,提供了硬件级别的原子操作,使Java语言拥有了类似C语言指针一样操作内存空间的能力。AtomicInteger类中的Unsafe成员变量,就是用来操作成员变量value的内存空间的(实现原子方式改变value值)。

成员变量value就是用来存储值的,并且声明为volatile,所以可以保证可见性。而value访问的的原子性就是通过上述unsafe成员变量实现的。

1.2.1 incrementAndGet

AtomicInteger类中大部分更新方法实现都很类似,我们先来分析一下incrementAndGet实现:

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
    	//var5为当前值
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

Unsafe类的getAndAddInt方法,就是一直执行CAS操作,将当前值var5改为var5 + var4,并返回更改前的旧值var5。

与synchronized锁相比,这种原子更新方式代表一种不同的思维方式。

  • synchronized是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用CAS更新,也就是进行冲突检测,如果确实冲突了,那就继续尝试使用CAS更新
  • synchronized代表一种阻塞式算法,得不到锁的时候,进入锁等待队列,等待其他线程唤醒,有上下文切换开销。原子变量的更新逻辑是非阻塞式的,更新冲突的时候,它就重试,不会阻塞,不会有上下文切换开销

对于大部分比较简单的操作,无论是在低并发还是高并发情况下,这种乐观非阻塞方式的性能都要远高于悲观阻塞式方式。

1.2.2 compareAndSet

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

Unsafe类中compareAndSwapInt方法是native方法:

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5)

一般的计算机系统都在硬件层次上直接支持CAS指令,compareAndSwapInt肯定也是通过这些指令实现的CAS。关于Unsafe类,美团技术博客有篇文章讲述的很清楚,Java魔法类:Unsafe应用解析,感兴趣的同学可以去了解一下。

1.2.3 getAndUpdate(IntUnaryOperator updateFunction)

public final int getAndUpdate(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        prev = get();
        //将prev应用于函数式表达式,获取新值,并循环通过CAS将当前值改为新值
        next = updateFunction.applyAsInt(prev);
    } while (!compareAndSet(prev, next));

    //返回旧值
    return prev;
}

1.2.4 updateAndGet(IntUnaryOperator updateFunction)

public final int updateAndGet(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        prev = get();
        next = updateFunction.applyAsInt(prev);
    } while (!compareAndSet(prev, next));
    return next;
}

updateAndGet方法实现跟getAndUpdate一致,唯一不同的是updateAndGet方法返回的是修改后的值。

1.2.5 getAndAccumulate

public final int getAndAccumulate(int x,
                                  IntBinaryOperator accumulatorFunction) {
    int prev, next;
    do {
        prev = get();
        //将prev和参数x应用于函数式表达式,获取新值,并循环通过CAS将当前值改为新值
        next = accumulatorFunction.applyAsInt(prev, x);
    } while (!compareAndSet(prev, next));

    //返回旧值
    return prev;
}

1.2.6 accumulateAndGet

public final int accumulateAndGet(int x,
                                  IntBinaryOperator accumulatorFunction) {
    int prev, next;
    do {
        prev = get();
        //将prev和参数x应用于函数式表达式,获取新值,并循环通过CAS将当前值改为新值
        next = accumulatorFunction.applyAsInt(prev, x);
    } while (!compareAndSet(prev, next));

    //返回新值
    return next;
}

1.3 AtomicInteger应用

AtomicInteger最直观的应用肯定是计数器,其实AtomicInteger的CAS属性,除了可以实现乐观非阻塞算法,它也可以用来实现悲观阻塞式算法,比如锁。实际上,Java并发包中的所有阻塞式工具、容器、算法也都是基于CAS的。下面代码展示使用AtomicInteger实现一个锁,如下:

public class MyLock {
    private AtomicInteger status = new AtomicInteger(0);

    public void lock() {
        while (!status.compareAndSet(0, 1)) {
            Thread.yield();
        }
    }

    public void unlock() {
        status.compareAndSet(1, 0);
    }
}

在MyLock中,使用status表示锁的状态,0表示未锁定,1表示锁定,lock()/unlock()使用CAS方法更新,lock()只有在更新成功后才退出,实现了阻塞的效果,不过一般而言,这种阻塞方式过于消耗CPU。MyLock只是用于演示基本概念,实际开发中应该使用Java并发包中的类如ReentrantLock。

2. AtomicBoolean/AtomicLong/AtomicReference

AtomicBoolean/AtomicLong/AtomicReference的用法和原理与AtomicInteger非常类似,下面简单介绍一下。

2.1 AtomicBoolean

AtomicBoolean类声明如下:

public class AtomicBoolean implements java.io.Serializable {

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicBoolean.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    //Methods

}   

可以看到AtomicBoolean类内部的值value并不是boolean类型的,而实int类型,用1表示true, 0表示false,比如,其CAS方法实现为:

public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

就是把boolean值转化为int值,然后通过Unsafe类的compareAndSwapInt方法,以原子方式替换value值。

2.2 AtomicLong

AtomicLong可以用来在程序中生成唯一序列号,它的方法与AtomicInteger类似,就不赘述了。它的CAS方法调用的是unsafe的另一个方法,如:

public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

原理是同AtomicInteger一样的,使用方式也一致,这里不多讲了。

2.3 AtomicReference

AtomicReference用来以原子方式更新复杂类型,它有一个类型参数,使用时需要指定引用的类型。以下代码演示了其基本用法:

public class AtomicReferenceDemo {
    static class Pair {
        final private int first;
        final private int second;

        public Pair(int first, int second) {
            this.first = first;
            this.second = second;
        }

        public int getFirst() {
            return first;
        }

        public int getSecond() {
            return second;
        }
    }

    public static void main(String[] args) {
        Pair p = new Pair(100, 200);
        AtomicReference<Pair> pairRef = new AtomicReference<>(p);
        pairRef.compareAndSet(p, new Pair(200, 200));

        System.out.println(pairRef.get().getFirst());
    }
}

AtomicReference的CAS方法调用的是unsafe的另一个方法:

public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

3. 原子数组

原子数组方便以原子的方式更新数组中的每个元素,下面以AtomicIntegerArray为例来简要介绍下,AtomicLongArray、AtomicReferenceArray类似。

3.1 构造方法

public AtomicIntegerArray(int length)
public AtomicIntegerArray(int[] array) 

第一个会创建一个长度为length的空数组;第二个构造函数接受一个已有的数组,但不会直接操作该数组,而是会创建一个新数组,只是拷贝参数数组中的内容到新数组,对AtomicIntegerArray内容的修改不会影响原数组。

3.2 方法说明

AtomicIntegerArray中的原子更新方法大多带有数组索引参数,如下:

public final boolean compareAndSet(int i, int expect, int update)
public final int getAndIncrement(int i)
public final int getAndAdd(int i, int delta)

含义跟AtomicInteger相同,不同的是AtomicIntegerArray中的方法要上送数组的index,以原子方式修改数组特定index位置的元素。

3.3 使用示例

public class AtomicArrayDemo {
    public static void main(String[] args) {
        int[] arr = { 1, 2, 3, 4 };
        AtomicIntegerArray atomicArr = new AtomicIntegerArray(arr);
        atomicArr.compareAndSet(1, 2, 100);
        System.out.println(atomicArr.get(1));
        System.out.println(arr[1]);
    }
}

运行结果:

100
2

4. AtomicReferenceFieldUpdater

AtomicReferenceFieldUpdater方便以原子方式更新对象中的成员变量,成员变量不需要声明为原子变量,AtomicReferenceFieldUpdater是基于反射机制实现的。下面看一个简单的使用示例:

public class FieldUpdaterDemo {
    static class DemoObject {
        private volatile int num;
        private volatile Object ref;

        private static final AtomicIntegerFieldUpdater<DemoObject> numUpdater
            = AtomicIntegerFieldUpdater.newUpdater(DemoObject.class, "num");
        private static final AtomicReferenceFieldUpdater<DemoObject, Object>
            refUpdater = AtomicReferenceFieldUpdater.newUpdater(
                    DemoObject.class, Object.class, "ref");

        public boolean compareAndSetNum(int expect, int update) {
            return numUpdater.compareAndSet(this, expect, update);
        }

        public int getNum() {
            return num;
        }

        public Object compareAndSetRef(Object expect, Object update) {
            return refUpdater.compareAndSet(this, expect, update);
        }

        public Object getRef() {
            return ref;
        }
    }

    public static void main(String[] args) {
        DemoObject obj = new DemoObject();
        obj.compareAndSetNum(0, 100);
        obj.compareAndSetRef(null, new String("hello"));
        System.out.println(obj.getNum());
        System.out.println(obj.getRef());
    }
}

5. AtomicStampedReference

使用CAS方式更新有一个ABA问题,一个线程开始看到的值是A,随后使用CAS进行更新,它的实际期望是没有其他线程修改过才更新,但普通的CAS做不到,因为可能在这个过程中,已经有其他线程修改过了,比如先改为了B,然后又改回为了A。

ABA是不是一个问题与程序的逻辑有关,如果是一个问题,一个解决方法是使用AtomicStampedReference,在修改值的同时附加一个时间戳,只有值和时间戳都相同才进行修改,其CAS方法实现为:

public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

AtomicStampedReference在compareAndSet中要同时修改两个值,一个是引用,另一个是时间戳,通过上述代码,可以看到,AtomicStampedReference内部会将这两个值封装为Pair对象,并通过UNSAFE.compareAndSwapObject以原子方式更新。

使用示例:

public static void main(String[] args) {
    Pair<Integer, Integer> pair = new Pair<>(100, 200);
    int stamp = 1;
    AtomicStampedReference<Pair> pairRef = new AtomicStampedReference<Pair>(pair, stamp);
    int newStamp = 2;
    pairRef.compareAndSet(pair, new Pair<>(200, 200), stamp, newStamp);
}

6. AtomicMarkableReference

AtomicMarkableReference是另一个AtomicReference的增强类,与AtomicStampedReference类似,它也是给引用关联了一个字段,只是这次是一个boolean类型的标志位,只有引用值和标志位都相同的情况下才进行修改,同样可以解决ABA问题。来看一下CAS实现:

public boolean compareAndSet(V       expectedReference,
                             V       newReference,
                             boolean expectedMark,
                             boolean newMark) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedMark == current.mark &&
        ((newReference == current.reference &&
          newMark == current.mark) ||
         casPair(current, Pair.of(newReference, newMark)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

可以看到跟AtomicStampedReference实现方式一致,唯一的区别是标志位不同,AtomicStampedReference中使用的是int型变量,AtomicMarkableReference中使用的是boolean变量。

以上就是原子变量的所有内容,原子变量可以保证在无锁的环境下原子的改变变量,但是也存在一些局限性,如下:

  • ABA问题:普通的CAS操作检查值有没有发生变化,如果没发生变化则更新,但是如果一个值一开始是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号,在变量前追加上版本号,每次更新的时候把版本号加1,那么A -> B -> A就会编程1A -> 2B -> 3A。Java API atomic包内的AtomicStampedReference和AtomicMarkableReference都使用类似的思路解决了ABA问题。
  • 循环时间长开销大:类似于incrementAndGet方法都使用了自旋CAS,如果自旋长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令,那么效率会有一定的提升。pause指令有两个作用:第一,它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,有些处理器上延迟时间是零;第二,他可以避免在退出循环的时候因内存顺序冲突(Memory Order Violantion)而引起CPU流水线被清空(CPU Pipeline Flush),从而提高CPU执行效率。
  • 只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用CAS方式保证原子性,但是对多个共享变量操作时,就无法保证操作的原子性了,这时候还是需要使用锁。还有一个解决办法就是,把多个共享变量合成一个共享变量,然后通过AtomicReference类来原子更新合并的共享变量。

参考链接:

1. 《Java编程的逻辑》

2. Java API

3. 《Java并发编程的艺术》

赞(1) 打赏
Zhuoli's Blog » Java编程拾遗『原子变量』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址