coding……
但行好事 莫问前程

Java编程拾遗『并发容器——CopyOnWrite』

从本片文章开始我们来介绍并发容器类,之前的文章介绍过普通容器类,比如ArrayList、LinkedList等。但是这些容器都有个问题,就是非线程安全,多线程并发写,会存在线程安全问题,所以Java提供了并发容器类来解决普通容器类的线程安全问题。本篇文章我们先来看一种解决容器类线程安全的思想——CopyOnWrite(写时复制)。

1. 同步容器

在讲CopyOnWrite并发容器之前,先来介绍一下Java中提供的另一种用来解决容器线程安全问题的机制——同步容器。所谓同步容器,就是java.util.Collections类提供的基于普通容器类的包装类,它将普通容器类的方法都使用synchronized关键字重新实现了一遍,进而提供线程安全的容器类。Collections类提供一下获取同步容器的方法:

public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)

下面展示一下SynchronizedCollection的部分实现:

static class SynchronizedCollection<E> implements Collection<E> {
    final Collection<E> c;  // Backing Collection
    final Object mutex;     // Object on which to synchronize

    SynchronizedCollection(Collection<E> c) {
        if (c==null)
            throw new NullPointerException();
        this.c = c;
        mutex = this;
    }
    public int size() {
        synchronized (mutex) {return c.size();}
    }
    public boolean add(E e) {
        synchronized (mutex) {return c.add(e);}
    }
    public boolean remove(Object o) {
        synchronized (mutex) {return c.remove(o);}
    }
    //....
}

这里线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,不需要额外的同步操作,也不会出现错误的结果。

加了synchronized,所有方法调用变成了原子操作,客户端在调用时,是不是就绝对安全了呢?不是的,至少有以下情况需要注意:

  • 复合操作,比如先检查再更新
  • 伪同步
  • 迭代

1.1 复合操作

public class EnhancedMap <K, V> {
    Map<K, V> map;
    
    public EnhancedMap(Map<K,V> map){
        this.map = Collections.synchronizedMap(map);
    }
    
    public V putIfAbsent(K key, V value){
         V old = map.get(key);
         if(old!=null){
             return old;
         }
         map.put(key, value);
         return null;
     }
    
    public void put(K key, V value){
        map.put(key, value);
    }
    
    //... 其他代码
}

EnhancedMap是一个装饰类,接受一个Map对象,调用synchronizedMap转换为了同步容器对象map,增加了一个方法putIfAbsent,该方法只有在原Map中没有对应键的时候才添加。

map的每个方法都是安全的,但这个复合方法putIfAbsent是安全的吗?显然是否定的,这是一个检查然后再更新的复合操作,在多线程的情况下,可能有多个线程都执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,而这就破坏了putIfAbsent方法期望保持的语义。

1.2 伪同步

那给该方法加上synchronized就能实现安全吗?如下所示:

public synchronized V putIfAbsent(K key, V value){
    V old = map.get(key);
    if(old!=null){
        return old;
    }
    map.put(key, value);
    return null;
}

答案是否定的!为什么呢?同步错对象了。putIfAbsent同步使用的是EnhancedMap对象,而其他方法(如代码中的put方法)使用的是Collections.synchronizedMap返回的对象map,两者是不同的对象。要解决这个问题,所有方法必须使用相同的锁,可以使用EnhancedMap的对象锁,也可以使用map。使用EnhancedMap对象作为锁,则EnhancedMap中的所有方法都需要加上synchronized。使用map作为锁,putIfAbsent方法可以改为:

public V putIfAbsent(K key, V value){
    synchronized(map){
        V old = map.get(key);
         if(old!=null){
             return old;
         }
         map.put(key, value);
         return null;    
    }
}

1.3 迭代

对于同步容器对象,虽然单个操作是安全的,但迭代并不是。我们看个例子,创建一个同步List对象,一个线程修改List,另一个遍历,看看会发生什么,如下:

private static void startModifyThread(final List<String> list) {
    Thread modifyThread = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                list.add("item " + i);
                try {
                    Thread.sleep((int) (Math.random() * 10));
                } catch (InterruptedException e) {
                }
            }
        }
    });
    modifyThread.start();
}

private static void startIteratorThread(final List<String> list) {
    Thread iteratorThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                for (String str : list) {
                }
            }
        }
    });
    iteratorThread.start();
}

public static void main(String[] args) {
    final List<String> list = Collections
            .synchronizedList(new ArrayList<String>());
    startIteratorThread(list);
    startModifyThread(list);
}

运行该程序,程序抛出ConcurrentModificationException。

之前介绍容器类的时候,介绍过过这个异常,如果在遍历的同时容器发生了结构性变化,就会抛出该异常,同步容器并没有解决这个问题。如果要避免这个异常,需要在遍历的时候给整个容器对象加锁,比如,上面的代码,startIteratorThread可以改为:

private static void startIteratorThread(final List<String> list) {
    Thread iteratorThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                synchronized(list){
                    for (String str : list) {
                    }    
                }
            }
        }
    });
    iteratorThread.start();
}

除了以上这些注意事项,同步容器的性能也是比较低的,当并发访问量比较大的时候性能很差。所以Java中提供了很多并发容器类,比如CopyOnWriteArrayList、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet等。本文就先来介绍一下CopyOnWrite(写时复制)并发容器。

2. CopyOnWriteArrayList

CopyOnWriteArrayList实现了List接口,它的用法与其他List如ArrayList基本是一样的,区别如下:

  • CopyOnWriteArrayList是线程安全的,可以被多个线程并发访问
  • CopyOnWriteArrayList的迭代器不支持修改操作,但也不会抛出ConcurrentModificationException,而是抛出UnsupportedOperationException
  • CopyOnWriteArrayList以原子方式支持一些复合操作

比如使用CopyOnWriteArrayList就可以解决上面同步容器一边写一边遍历时抛ConcurrentModificationException异常的问题,如下:

public static void main(String[] args) {
    final List<String> list = new CopyOnWriteArrayList<>();
    startIteratorThread(list);
    startModifyThread(list);
}

因为CopyOnWriteArrayList实现了List接口,所以可以像使用ArrayList/LinkedList那样使用CopyOnWriteArrayList,除了List接口提供的方法之外,CopyOnWriteArrayList还提供了一些其它的方法,我们重点关注一下这部分特殊的方法,List接口的方法在这里就不单独介绍了。

2.1 addIfAbsent

上面讲到同步容器不能以线程安全的方式处理复合操作,CopyOnWriteArrayList提供了线程安全的复合操作方法addIfAbsent。

public boolean addIfAbsent(E e)

如果CopyOnWriteArrayList中存在元素e,返回false。如果CopyOnWriteArrayList中不存在元素e,添加成功后返回true。

2.2 addAllAbsent

public int addAllAbsent(Collection<? extends E> c)

批量添加c中的非重复元素,不存在才添加,返回实际添加的个数。

2.3 迭代器不支持修改操作

CopyOnWriteArrayList的迭代器支持不可变数组实现,所以不支持修改操作。

public void remove() {
    throw new UnsupportedOperationException();
}

public void set(E e) {
    throw new UnsupportedOperationException();
}

public void add(E e) {
    throw new UnsupportedOperationException();
}

上述remove、set、add方法是CopyOnWriteArrayList内部类COWIterator对ListIterator接口的实现,所以CopyOnWriteArrayList迭代器不支持修改操作。所以CopyOnWriteArrayList在一些基于迭代器修改操作的方法中会抛异常,比如Java7及以前的Collections.sort方法:

public static void sort(){
    CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
    list.add("c");
    list.add("a");
    list.add("b");
    Collections.sort(list);
}

上述代码在Java7及以前的版本会抛UnsupportedOperationException,因为Java7中Collections.sort方法的实现为:

public static <T extends Comparable<? super T>> void sort(List<T> list) {
    Object[] a = list.toArray();
    Arrays.sort(a);
    ListIterator<T> i = list.listIterator();
    for (int j=0; j<a.length; j++) {
        i.next();
        i.set((T)a[j]);
    }
}

调用了迭代器的set方法,所以在Java7中CopyOnWriteArrayList对象使用Collections.srot方法排序会抛UnsupportedOperationException。但是在Java8及之后的版本就不会报错了,在Java8中Collections.srot方法实现如下:

public static <T extends Comparable<? super T>> void sort(List<T> list) {
    list.sort(null);
}

在Java8中,List接口中新增了default sort方法,实现跟上述Java7中Collections.sort方法实现一致,但是在List接口的具体实现类如CopyOnWriteArrayList中,重写了该方法,重写的排序方法实现不依赖于迭代器:

public void sort(Comparator<? super E> c) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        Object[] newElements = Arrays.copyOf(elements, elements.length);
        @SuppressWarnings("unchecked") E[] es = (E[])newElements;
        Arrays.sort(es, c);
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}

2.4 实现原理

CopyOnWriteArrayList的内部是一个数组,跟ArrayList内部的数组不同的是,这个数组是以原子方式被整体更新的。每次修改操作,都会新建一个数组,复制原数组的内容到新数组,在新数组上进行需要的修改,然后以原子方式设置内部的数组引用,这就是写时拷贝。

所有的读操作,都是先拿到当前引用的数组,然后直接访问该数组,在读的过程中,可能内部的数组引用已经被修改了,但不会影响读操作,它依旧访问原数组内容。

换句话说,数组内容是只读的,写操作都是通过新建数组,然后原子性的修改数组引用来实现的。CopyOnWriteArrayList声明如下:

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

    /*保护写操作线程安全的锁*/
    final transient ReentrantLock lock = new ReentrantLock();

    /*实现并发容器存储元素的数组*/
    private transient volatile Object[] array;

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        return array;
    }

    /**
     * Sets the array.
     */
    final void setArray(Object[] a) {
        array = a;
    }

    //构造函数

    //methods
 }

这里有几点需要注意:

  • 内部数组声明为了volatile,这是必需的,保证内存可见性,写操作更改了之后,读操作能看到。
  • 只有两个方法用来访问/设置该数组,分别位getArray和setArray,像add、set等操作,都是先复制一个数组出来,然后操作新复制的数组,再通过setArray方法将复制的数组赋值给array。
  • 读不需要锁,可以并行,读和写也可以并行,但多个线程不能同时写,每个写操作都需要先获取锁,CopyOnWriteArrayList内部使用ReentrantLock。

2.4.1 add(E e)

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

add方法是修改操作,整个过程需要被锁保护,先拿到当前数组elements,然后复制了个长度加1的新数组newElements,在新数组中添加元素,最后调用setArray原子性的修改内部数组引用,这就是“写时复制”的概念。

2.4.2 indexOf(Object o)

public int indexOf(Object o) {
    Object[] elements = getArray();
    return indexOf(o, elements, 0, elements.length);
}
private static int indexOf(Object o, Object[] elements,
                           int index, int fence) {
    if (o == null) {
        for (int i = index; i < fence; i++)
            if (elements[i] == null)
                return i;
    } else {
        for (int i = index; i < fence; i++)
            if (o.equals(elements[i]))
                return i;
    }
    return -1;
}

indexOf方法访问的所有数据都是通过参数传递进来的,数组内容也不会被修改,不存在并发问题。

2.4.3 迭代器

CopyOnWriteArrayList迭代器是通过内部类COWIterator实现的,如下:

public Iterator<E> iterator() {
    return new COWIterator<E>(getArray(), 0);
}

COWIterator是内部类,传递给它的是不变的数组,不支持修改,所以在COWIterator内部的remove、set及add方法实现都是直接抛UnsupportedOperationException异常。其它读方法跟ArrayList内部迭代器实现一致,都是通过数组索引实现的。

每次修改都创建一个新数组,然后复制所有内容,这听上去是一个难以令人接受的方案,如果数组比较大,修改操作又比较频繁,可以想象,CopyOnWriteArrayList的性能是很低的。事实确实如此,CopyOnWriteArrayList不适用于数组很大,且修改频繁的场景。它是以优化读操作为目标的,读不需要同步,性能很高,但在优化读的同时就牺牲了写的性能。

之前我们介绍了保证线程安全的两种思路,一种是锁,使用synchronized或ReentrantLock,另外一种是循环CAS,写时拷贝体现了保证线程安全的另一种思路。对于绝大部分访问都是读,且有大量并发线程要求读,只有个别线程进行写,且只是偶尔写的场合,这种写时拷贝就是一种很好的解决方案。

3. CopyOnWriteArraySet

CopyOnWriteArraySet跟HashSet的实现思想是不同的,HashSet依赖于HashMap实现,而CopyOnWriteArraySet并没有使用这一思想,而是依赖CopyOnWriteArrayList实现,如下:

public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {

    private final CopyOnWriteArrayList<E> al;

    //构造函数
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }

    public CopyOnWriteArraySet(Collection<? extends E> c) {
        if (c.getClass() == CopyOnWriteArraySet.class) {
            @SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
                (CopyOnWriteArraySet<E>)c;
            al = new CopyOnWriteArrayList<E>(cc.al);
        }
        else {
            al = new CopyOnWriteArrayList<E>();
            al.addAllAbsent(c);
        }
    }

    //methods

}

CopyOnWriteArraySet实现了Set接口,持有一个CopyOnWriteArrayList的成员变量,依赖CopyOnWriteArrayList提供的addIfAbsent和addAllAbsent实现Set中元素的唯一性。

3.1 方法说明

3.1.1 add(E e)

public boolean add(E e) {
    return al.addIfAbsent(e);
}

调用了CopyOnWriteArrayList的addIfAbsent方法,保证线程安全和Set集合中元素的唯一性。

3.1.2 contains(Object o)

public boolean contains(Object o) {
    return al.contains(o);
}

调用了CopyOnWriteArrayList的contains方法。

由于CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的,所以与之前介绍过的Set的实现类如HashSet/TreeSet相比,它的性能比较低,不适用于元素个数特别多的集合。如果元素个数比较多,可以考虑ConcurrentHashMap或ConcurrentSkipListSet,这两个类,会在之后的文章介绍。

ConcurrentHashMap与HashMap类似,适用于不要求排序的场景,ConcurrentSkipListSet与TreeSet类似,适用于要求排序的场景。Java并发包中没有与HashSet对应的并发容器,但可以很容易的基于ConcurrentHashMap构建一个,利用Collections.newSetFromMap方法即可。

参考链接:

1. 《Java编程的逻辑》

2. Java API

赞(1) 打赏
Zhuoli's Blog » Java编程拾遗『并发容器——CopyOnWrite』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址