从本片文章开始我们来介绍并发容器类,之前的文章介绍过普通容器类,比如ArrayList、LinkedList等。但是这些容器都有个问题,就是非线程安全,多线程并发写,会存在线程安全问题,所以Java提供了并发容器类来解决普通容器类的线程安全问题。本篇文章我们先来看一种解决容器类线程安全的思想——CopyOnWrite(写时复制)。
1. 同步容器
在讲CopyOnWrite并发容器之前,先来介绍一下Java中提供的另一种用来解决容器线程安全问题的机制——同步容器。所谓同步容器,就是java.util.Collections类提供的基于普通容器类的包装类,它将普通容器类的方法都使用synchronized关键字重新实现了一遍,进而提供线程安全的容器类。Collections类提供一下获取同步容器的方法:
public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
下面展示一下SynchronizedCollection的部分实现:
static class SynchronizedCollection<E> implements Collection<E> {
final Collection<E> c; // Backing Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
if (c==null)
throw new NullPointerException();
this.c = c;
mutex = this;
}
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
synchronized (mutex) {return c.remove(o);}
}
//....
}
这里线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,不需要额外的同步操作,也不会出现错误的结果。
加了synchronized,所有方法调用变成了原子操作,客户端在调用时,是不是就绝对安全了呢?不是的,至少有以下情况需要注意:
- 复合操作,比如先检查再更新
- 伪同步
- 迭代
1.1 复合操作
public class EnhancedMap <K, V> {
Map<K, V> map;
public EnhancedMap(Map<K,V> map){
this.map = Collections.synchronizedMap(map);
}
public V putIfAbsent(K key, V value){
V old = map.get(key);
if(old!=null){
return old;
}
map.put(key, value);
return null;
}
public void put(K key, V value){
map.put(key, value);
}
//... 其他代码
}
EnhancedMap是一个装饰类,接受一个Map对象,调用synchronizedMap转换为了同步容器对象map,增加了一个方法putIfAbsent,该方法只有在原Map中没有对应键的时候才添加。
map的每个方法都是安全的,但这个复合方法putIfAbsent是安全的吗?显然是否定的,这是一个检查然后再更新的复合操作,在多线程的情况下,可能有多个线程都执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,而这就破坏了putIfAbsent方法期望保持的语义。
1.2 伪同步
那给该方法加上synchronized就能实现安全吗?如下所示:
public synchronized V putIfAbsent(K key, V value){
V old = map.get(key);
if(old!=null){
return old;
}
map.put(key, value);
return null;
}
答案是否定的!为什么呢?同步错对象了。putIfAbsent同步使用的是EnhancedMap对象,而其他方法(如代码中的put方法)使用的是Collections.synchronizedMap返回的对象map,两者是不同的对象。要解决这个问题,所有方法必须使用相同的锁,可以使用EnhancedMap的对象锁,也可以使用map。使用EnhancedMap对象作为锁,则EnhancedMap中的所有方法都需要加上synchronized。使用map作为锁,putIfAbsent方法可以改为:
public V putIfAbsent(K key, V value){
synchronized(map){
V old = map.get(key);
if(old!=null){
return old;
}
map.put(key, value);
return null;
}
}
1.3 迭代
对于同步容器对象,虽然单个操作是安全的,但迭代并不是。我们看个例子,创建一个同步List对象,一个线程修改List,另一个遍历,看看会发生什么,如下:
private static void startModifyThread(final List<String> list) {
Thread modifyThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
list.add("item " + i);
try {
Thread.sleep((int) (Math.random() * 10));
} catch (InterruptedException e) {
}
}
}
});
modifyThread.start();
}
private static void startIteratorThread(final List<String> list) {
Thread iteratorThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
for (String str : list) {
}
}
}
});
iteratorThread.start();
}
public static void main(String[] args) {
final List<String> list = Collections
.synchronizedList(new ArrayList<String>());
startIteratorThread(list);
startModifyThread(list);
}
运行该程序,程序抛出ConcurrentModificationException。
之前介绍容器类的时候,介绍过过这个异常,如果在遍历的同时容器发生了结构性变化,就会抛出该异常,同步容器并没有解决这个问题。如果要避免这个异常,需要在遍历的时候给整个容器对象加锁,比如,上面的代码,startIteratorThread可以改为:
private static void startIteratorThread(final List<String> list) {
Thread iteratorThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
synchronized(list){
for (String str : list) {
}
}
}
}
});
iteratorThread.start();
}
除了以上这些注意事项,同步容器的性能也是比较低的,当并发访问量比较大的时候性能很差。所以Java中提供了很多并发容器类,比如CopyOnWriteArrayList、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet等。本文就先来介绍一下CopyOnWrite(写时复制)并发容器。
2. CopyOnWriteArrayList
CopyOnWriteArrayList实现了List接口,它的用法与其他List如ArrayList基本是一样的,区别如下:
- CopyOnWriteArrayList是线程安全的,可以被多个线程并发访问
- CopyOnWriteArrayList的迭代器不支持修改操作,但也不会抛出ConcurrentModificationException,而是抛出UnsupportedOperationException
- CopyOnWriteArrayList以原子方式支持一些复合操作
比如使用CopyOnWriteArrayList就可以解决上面同步容器一边写一边遍历时抛ConcurrentModificationException异常的问题,如下:
public static void main(String[] args) {
final List<String> list = new CopyOnWriteArrayList<>();
startIteratorThread(list);
startModifyThread(list);
}
因为CopyOnWriteArrayList实现了List接口,所以可以像使用ArrayList/LinkedList那样使用CopyOnWriteArrayList,除了List接口提供的方法之外,CopyOnWriteArrayList还提供了一些其它的方法,我们重点关注一下这部分特殊的方法,List接口的方法在这里就不单独介绍了。
2.1 addIfAbsent
上面讲到同步容器不能以线程安全的方式处理复合操作,CopyOnWriteArrayList提供了线程安全的复合操作方法addIfAbsent。
public boolean addIfAbsent(E e)
如果CopyOnWriteArrayList中存在元素e,返回false。如果CopyOnWriteArrayList中不存在元素e,添加成功后返回true。
2.2 addAllAbsent
public int addAllAbsent(Collection<? extends E> c)
批量添加c中的非重复元素,不存在才添加,返回实际添加的个数。
2.3 迭代器不支持修改操作
CopyOnWriteArrayList的迭代器支持不可变数组实现,所以不支持修改操作。
public void remove() {
throw new UnsupportedOperationException();
}
public void set(E e) {
throw new UnsupportedOperationException();
}
public void add(E e) {
throw new UnsupportedOperationException();
}
上述remove、set、add方法是CopyOnWriteArrayList内部类COWIterator对ListIterator接口的实现,所以CopyOnWriteArrayList迭代器不支持修改操作。所以CopyOnWriteArrayList在一些基于迭代器修改操作的方法中会抛异常,比如Java7及以前的Collections.sort方法:
public static void sort(){
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("c");
list.add("a");
list.add("b");
Collections.sort(list);
}
上述代码在Java7及以前的版本会抛UnsupportedOperationException,因为Java7中Collections.sort方法的实现为:
public static <T extends Comparable<? super T>> void sort(List<T> list) {
Object[] a = list.toArray();
Arrays.sort(a);
ListIterator<T> i = list.listIterator();
for (int j=0; j<a.length; j++) {
i.next();
i.set((T)a[j]);
}
}
调用了迭代器的set方法,所以在Java7中CopyOnWriteArrayList对象使用Collections.srot方法排序会抛UnsupportedOperationException。但是在Java8及之后的版本就不会报错了,在Java8中Collections.srot方法实现如下:
public static <T extends Comparable<? super T>> void sort(List<T> list) {
list.sort(null);
}
在Java8中,List接口中新增了default sort方法,实现跟上述Java7中Collections.sort方法实现一致,但是在List接口的具体实现类如CopyOnWriteArrayList中,重写了该方法,重写的排序方法实现不依赖于迭代器:
public void sort(Comparator<? super E> c) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
Object[] newElements = Arrays.copyOf(elements, elements.length);
@SuppressWarnings("unchecked") E[] es = (E[])newElements;
Arrays.sort(es, c);
setArray(newElements);
} finally {
lock.unlock();
}
}
2.4 实现原理
CopyOnWriteArrayList的内部是一个数组,跟ArrayList内部的数组不同的是,这个数组是以原子方式被整体更新的。每次修改操作,都会新建一个数组,复制原数组的内容到新数组,在新数组上进行需要的修改,然后以原子方式设置内部的数组引用,这就是写时拷贝。
所有的读操作,都是先拿到当前引用的数组,然后直接访问该数组,在读的过程中,可能内部的数组引用已经被修改了,但不会影响读操作,它依旧访问原数组内容。
换句话说,数组内容是只读的,写操作都是通过新建数组,然后原子性的修改数组引用来实现的。CopyOnWriteArrayList声明如下:
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/*保护写操作线程安全的锁*/
final transient ReentrantLock lock = new ReentrantLock();
/*实现并发容器存储元素的数组*/
private transient volatile Object[] array;
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}
//构造函数
//methods
}
这里有几点需要注意:
- 内部数组声明为了volatile,这是必需的,保证内存可见性,写操作更改了之后,读操作能看到。
- 只有两个方法用来访问/设置该数组,分别位getArray和setArray,像add、set等操作,都是先复制一个数组出来,然后操作新复制的数组,再通过setArray方法将复制的数组赋值给array。
- 读不需要锁,可以并行,读和写也可以并行,但多个线程不能同时写,每个写操作都需要先获取锁,CopyOnWriteArrayList内部使用ReentrantLock。
2.4.1 add(E e)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
add方法是修改操作,整个过程需要被锁保护,先拿到当前数组elements,然后复制了个长度加1的新数组newElements,在新数组中添加元素,最后调用setArray原子性的修改内部数组引用,这就是“写时复制”的概念。
2.4.2 indexOf(Object o)
public int indexOf(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length);
}
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
if (o == null) {
for (int i = index; i < fence; i++)
if (elements[i] == null)
return i;
} else {
for (int i = index; i < fence; i++)
if (o.equals(elements[i]))
return i;
}
return -1;
}
indexOf方法访问的所有数据都是通过参数传递进来的,数组内容也不会被修改,不存在并发问题。
2.4.3 迭代器
CopyOnWriteArrayList迭代器是通过内部类COWIterator实现的,如下:
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
COWIterator是内部类,传递给它的是不变的数组,不支持修改,所以在COWIterator内部的remove、set及add方法实现都是直接抛UnsupportedOperationException异常。其它读方法跟ArrayList内部迭代器实现一致,都是通过数组索引实现的。
每次修改都创建一个新数组,然后复制所有内容,这听上去是一个难以令人接受的方案,如果数组比较大,修改操作又比较频繁,可以想象,CopyOnWriteArrayList的性能是很低的。事实确实如此,CopyOnWriteArrayList不适用于数组很大,且修改频繁的场景。它是以优化读操作为目标的,读不需要同步,性能很高,但在优化读的同时就牺牲了写的性能。
之前我们介绍了保证线程安全的两种思路,一种是锁,使用synchronized或ReentrantLock,另外一种是循环CAS,写时拷贝体现了保证线程安全的另一种思路。对于绝大部分访问都是读,且有大量并发线程要求读,只有个别线程进行写,且只是偶尔写的场合,这种写时拷贝就是一种很好的解决方案。
3. CopyOnWriteArraySet
CopyOnWriteArraySet跟HashSet的实现思想是不同的,HashSet依赖于HashMap实现,而CopyOnWriteArraySet并没有使用这一思想,而是依赖CopyOnWriteArrayList实现,如下:
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private final CopyOnWriteArrayList<E> al;
//构造函数
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
public CopyOnWriteArraySet(Collection<? extends E> c) {
if (c.getClass() == CopyOnWriteArraySet.class) {
@SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
(CopyOnWriteArraySet<E>)c;
al = new CopyOnWriteArrayList<E>(cc.al);
}
else {
al = new CopyOnWriteArrayList<E>();
al.addAllAbsent(c);
}
}
//methods
}
CopyOnWriteArraySet实现了Set接口,持有一个CopyOnWriteArrayList的成员变量,依赖CopyOnWriteArrayList提供的addIfAbsent和addAllAbsent实现Set中元素的唯一性。
3.1 方法说明
3.1.1 add(E e)
public boolean add(E e) {
return al.addIfAbsent(e);
}
调用了CopyOnWriteArrayList的addIfAbsent方法,保证线程安全和Set集合中元素的唯一性。
3.1.2 contains(Object o)
public boolean contains(Object o) {
return al.contains(o);
}
调用了CopyOnWriteArrayList的contains方法。
由于CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的,所以与之前介绍过的Set的实现类如HashSet/TreeSet相比,它的性能比较低,不适用于元素个数特别多的集合。如果元素个数比较多,可以考虑ConcurrentHashMap或ConcurrentSkipListSet,这两个类,会在之后的文章介绍。
ConcurrentHashMap与HashMap类似,适用于不要求排序的场景,ConcurrentSkipListSet与TreeSet类似,适用于要求排序的场景。Java并发包中没有与HashSet对应的并发容器,但可以很容易的基于ConcurrentHashMap构建一个,利用Collections.newSetFromMap方法即可。
参考链接:
1. 《Java编程的逻辑》
2. Java API