coding……
但行好事 莫问前程

Java编程拾遗『线程协作』

之前用了两篇文章介绍了多个线程竞争资源的解决方案synchronized,但其实多个线程之间除了竞争资源之外,还有相互协作。比如:

  • 生产者/消费者模式:这是一种常见的协作模式,生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列长度有限,在队列满的时候,生产者需要等待,而在队列为空的时候,消费者需要等待。
  • 同时开始:类似运动员比赛,在听到比赛开始枪响后同时开始,在一些程序,尤其是模拟仿真程序中,要求多个线程能同时开始。
  • 等待结束:主从协作模式也是一种常见的协作模式,主线程将任务分解为若干个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
  • 集合点:类似于学校或公司组团旅游,在旅游过程中有若干集合点,比如出发集合点,每个人从不同地方来到集合点,所有人到齐后进行下一项活动,在一些程序,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
  • 控制并发数目:类似于去银行柜台取款,总共只有4个窗口,但是有10个顾客,那么同时只有4个顾客能获得服务,当某个顾客结束后,再通过一个信号通知下一个顾客。再多线程中,其实也就是控同时访问资源的线程个数。
  • 异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦,一种常见的模式是将子线程的管理封装为异步调用,异步调用马上返回,但返回的不是最终的结果,而是一个一般称为Promise或Future的对象,通过它可以在随后获得最终的结果。

本篇文章就来介绍一下上述几种协作方式。

1. 生产者/消费者模式

在生产者/消费者模式中,生产者消费者协作的共享变量是队列,生产者往队列中写数据,如果队列满了就wait。而消费者从队列中取数据,如果队列为空也wait。将队列作为单独的类进行设计,代码如下:

public class MyBlockingQueue<T>  {
    private Queue<T> queue = null;
    private int limit;

    public MyBlockingQueue(int limit) {
        this.limit = limit;
        queue = new ArrayDeque<>(limit);
    }

    public synchronized void put(T e) throws InterruptedException {
        while (queue.size() == limit) {
            wait();
        }
        queue.add(e);
        notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        T e = queue.poll();
        notifyAll();
        return e;
    }
}

MyBlockingQueue是一个长度有限的队列,长度通过构造方法的参数进行传递,有两个方法put和take。put是给生产者使用的,往队列上放数据,满了就wait,放完之后调用notifyAll,通知可能的消费者。take是给消费者使用的,从队列中取数据,如果为空就wait,取完之后调用notifyAll,通知可能的生产者。

生产者代码如下所示:

public class Producer extends Thread {
    private MyBlockingQueue<String> queue;

    public Producer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        int num = 0;
        try {
            while (true) {
                String task = String.valueOf(num);
                queue.put(task);
                System.out.println("produce task " + task);
                num++;
                Thread.sleep((int) (Math.random() * 100));
            }
        } catch (InterruptedException e) {
        }
    }
}

消费者代码如下所示:

public class Consumer extends Thread {
    private MyBlockingQueue<String> queue;

    public Consumer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String task = queue.take();
                System.out.println("handle task " + task);
                Thread.sleep((int)(Math.random()*100));
            }
        } catch (InterruptedException e) {
        }
    }
}

调用示例:

public static void main(String[] args) {
    MyBlockingQueue<String> queue = new MyBlockingQueue<>(1);
    new Producer(queue).start();
    new Consumer(queue).start();
}

运行结果:

produce task 0
handle task 0
produce task 1
handle task 1
produce task 2
handle task 2
handle task 3
produce task 3
produce task 4
handle task 4
produce task 5
handle task 5
……

生产者和消费者交替出现,复合预期。我们实现的MyBlockingQueue主要用于演示,Java提供了专门的阻塞队列实现,包括:

  • 基于数组的实现类ArrayBlockingQueue
  • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque
  • 基于堆的实现类PriorityBlockingQueue

2. 同时开始

类似于运动员比赛,在听到比赛开始枪响后同时开始。下面,我们模拟下这个过程。有一个主线程和N个子线程,每个子线程模拟一个运动员,主线程模拟裁判,主线程下达开始命令后,各个子线程开始执行,代码如下所示:

public class FireFlag {
    private volatile boolean fired = false;

    public synchronized void waitForFire() throws InterruptedException {
        while (!fired) {
            wait();
        }
    }

    public synchronized void fire() {
        this.fired = true;
        notifyAll();
    }
}

子线程中调用waitForFire()等待发令枪响,而主线程调用fire()发射比赛开始信号。

public class Racer extends Thread {
    private FireFlag fireFlag;

    public Racer(FireFlag fireFlag) {
        this.fireFlag = fireFlag;
    }

    @Override
    public void run() {
        try {
            this.fireFlag.waitForFire();
            System.out.println("start run "
                    + Thread.currentThread().getName());
        } catch (InterruptedException ignored) {
        }
    }
}

子线程run方法中一开始就调用waitForFire,等待开始信号。如果开始信号没有到达,就会阻塞当前线程。

public class Test {
    public static void main(String[] args) throws Exception {
        int num = 10;
        FireFlag fireFlag = new FireFlag();
        Thread[] racers = new Thread[num];
        for (int i = 0; i < num; i++) {
            racers[i] = new Racer(fireFlag);
            racers[i].start();
        }
        Thread.sleep(1000);
        fireFlag.fire();
    }
}

运行结果:

start run Thread-9
start run Thread-6
start run Thread-4
start run Thread-5
start run Thread-1
start run Thread-0
start run Thread-8
start run Thread-7
start run Thread-2
start run Thread-3

观察运行结果可以发现,上述结果是在主线程休眠1秒后,调用了fire方法后,各个子线程才开始打印的。

3. 等待结束

等待其它线程结束,我们可以直接使用Thread对象的join方法。oin实际上就是调用了wait,其主要代码是:

while (isAlive()) {
    wait(0);
}

只要线程是活着的,isAlive()返回true,join就一直等待。谁来通知它呢?当线程运行结束的时候,Java系统调用notifyAll来通知。

但是使用join有个最大的问题是,需要主线程逐一等待每个子线程。这里我们通过线程协作来实现主线程等待各个子线程执行结束后继续执行主线程逻辑。主线程与各个子线程协作的共享变量是一个数,这个数表示未完成的线程个数,初始值为子线程个数,主线程等待该值变为0,而每个子线程结束后都将该值减1,当减为0时调用notifyAll,我们用MyLatch来表示这个协作对象,示例代码如下:

public class MyLatch {
    private int count;

    public MyLatch(int count) {
        this.count = count;
    }

    public synchronized void await() throws InterruptedException {
        while (count > 0) {
            wait();
        }
    }

    public synchronized void countDown() {
        count--;
        if (count <= 0) {
            notifyAll();
        }
    }
}

MyLatch构造方法的参数count应初始化为子线程的个数,主线程应该调用await(),而子线程在执行完后应该调用countDown()。工作子线程的示例代码如下:

public class Worker extends Thread {
    private MyLatch latch;

    public Worker(MyLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            // 模拟线程运行
            Thread.sleep((int) (Math.random() * 1000));

            this.latch.countDown();
        } catch (InterruptedException ignored) {
        }
    }
}

主线程的示例代码如下:

public class Test {
    public static void main(String[] args) throws Exception{
        int workerNum = 100;
        MyLatch latch = new MyLatch(workerNum);
        Worker[] workers = new Worker[workerNum];
        for (int i = 0; i < workerNum; i++) {
            workers[i] = new Worker(latch);
            workers[i].start();
        }
        latch.await();

        System.out.println("collect worker results");
    }
}

MyLatch是一个用于同步协作的工具类,主要用于演示基本原理,在Java中有一个专门的同步类CountDownLatch,功能和用法跟MyLatch一致。MyLatch的功能是比较通用的,它也可以应用于上面”同时开始”的场景,初始值设为1,Racer类调用await(),主线程调用countDown()即可,如下所示:

public class RacerWithLatchDemo {
    static class Racer extends Thread {
        private MyLatch latch;

        public Racer(MyLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                this.latch.await();
                System.out.println("start run "
                        + Thread.currentThread().getName());
            } catch (InterruptedException ignored) {
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int num = 10;
        MyLatch latch = new MyLatch(1);
        Thread[] racers = new Thread[num];
        for (int i = 0; i < num; i++) {
            racers[i] = new Racer(latch);
            racers[i].start();
        }
        Thread.sleep(2000);
        latch.countDown();
    }
}

4. 集合点

各个线程先是分头行动,然后各自到达一个集合点,在集合点需要集齐所有线程,交换数据,然后再进行下一步动作。怎么表示这种协作呢?协作的共享变量依然是一个数,这个数表示未到集合点的线程个数,初始值为子线程个数,每个线程到达集合点后将该值减一,如果不为0,表示还有别的线程未到,进行等待,如果变为0,表示自己是最后一个到的,调用notifyAll唤醒所有线程。我们用AssemblePoint类来表示这个协作对象,示例代码如下:

public class AssemblePoint {
    private int n;

    public AssemblePoint(int n) {
        this.n = n;
    }

    public synchronized void await() throws InterruptedException {
        if (n > 0) {
            n--;
            if (n == 0) {
                notifyAll();
            } else {
                while (n != 0) {
                    wait();
                }
            }
        }
    }
}

游客线程,先独立运行,然后使用该协作对象等待其他线程到达集合点,进行下一步动作,示例代码如下:

public class Tourist extends Thread {
    private int touristId;

    private AssemblePoint ap;

    public Tourist(int touristId, AssemblePoint ap) {
        this.touristId = touristId;
        this.ap = ap;
    }

    @Override
    public void run() {
        try {
            System.out.println("tourist:" + touristId + " start travel");
            // 模拟先各自独立运行
            Thread.sleep((int) (Math.random() * 1000));

            // 集合
            System.out.println( "tourist:" + touristId + " arrived, wait other tourist");
            ap.await();
            // ... 集合后执行其他操作
            System.out.println("all tourist arrived");
        } catch (InterruptedException e) {
        }
    }
}

调用示例:

public class Test {
    public static void main(String[] args) {
        int num = 5;
        Tourist[] threads = new Tourist[num];
        AssemblePoint ap = new AssemblePoint(num);
        for (int i = 0; i < num; i++) {
            threads[i] = new Tourist(i, ap);
            threads[i].start();
        }
    }
}

运行结果:

tourist:0 start travel
tourist:1 start travel
tourist:2 start travel
tourist:3 start travel
tourist:4 start travel
tourist:2 arrived, wait other tourist
tourist:0 arrived, wait other tourist
tourist:3 arrived, wait other tourist
tourist:4 arrived, wait other tourist
tourist:1 arrived, wait other tourist
all tourist arrived
all tourist arrived
all tourist arrived
all tourist arrived
all tourist arrived

这里实现的是AssemblePoint主要用于演示基本原理,Java中有一个专门的同步工具类CyclicBarrier可以替代它,功能和使用与AssemblePoint一致。

这里比较一下,会发现CyclicBarrier和CountDownLatch有些类似,都是实现线程等待。但是它们之间的侧重点不同,CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行。而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。

5. 控制并发数目

生活中当出现多人共享有限资源时(比如),就要控制好使用的先后顺序,不然就会出现混乱。还用哪个柜台取款的例子,只有4个窗口,但是有10个顾客,最多同时只能由4个顾客获得服务,一个顾客获得服务之前先要确认一下是否由可用窗口,没有的话就只能继续等待。当一个获得服务的顾客结束之后,要归还窗口,并通知其它等待的顾客,我们使用MySemaphore类来表示这个协作对象,如下:

public class MySemaphore {
    private int permits;

    public MySemaphore(int permits) {
        this.permits = permits;
    }

    //请求一个资源
    public synchronized void acquire() throws InterruptedException {
        acquire(1);
    }
    
    //请求acquire个资源
    public synchronized void acquire(int acquire) throws InterruptedException {
        while (permits - acquire < 0) {
            wait();
        }
        permits -= acquire;
    }

    //释放一个资源
    public synchronized void release() {
        release(1);
    }

    //释放acquire个资源
    public synchronized void release(int acquire) {
        permits += acquire;
        notifyAll();
    }
}

顾客线程,获得服务前要先通过acquire方法获取资源,服务结束通过release释放资源,如下:

public class Customer extends Thread {
    private int customerId;

    private MySemaphore mySemaphore;

    public Customer(int customerId, MySemaphore mySemaphore) {
        this.customerId = customerId;
        this.mySemaphore = mySemaphore;
    }

    @Override
    public void run() {
        try {
            mySemaphore.acquire();
            System.out.println("顾客" + this.customerId + "占用一个窗口...");
            Thread.sleep(2000);
            System.out.println("顾客" + this.customerId + "释放窗口");
            mySemaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

调用示例:

public class Test {
    public static void main(String[] args) {
        int N = 10;            //顾客数
        MySemaphore semaphore = new MySemaphore(4); //窗口数目
        for (int i = 0; i < N; i++)
            new Customer(i, semaphore).start();
    }
}

运行结果:

顾客0占用一个窗口...
顾客1占用一个窗口...
顾客2占用一个窗口...
顾客3占用一个窗口...
顾客0释放窗口
顾客9占用一个窗口...
顾客1释放窗口
顾客4占用一个窗口...
顾客2释放窗口
顾客8占用一个窗口...
顾客3释放窗口
顾客5占用一个窗口...
顾客9释放窗口
顾客7占用一个窗口...
顾客4释放窗口
顾客6占用一个窗口...
顾客5释放窗口
顾客8释放窗口
顾客7释放窗口
顾客6释放窗口

可以看到最多只能有4个顾客获得服务。Java中有一个专门的同步工具类Semaphore可以替代它,功能和使用与MySemaphore一致。

上面讲的MyLatch、AssemblePoint和MySemaphore的实现都是使用的额wait/nofity阻塞实现的。对应Java API中CountDownLatch、CyclicBarrier和Semaphore都是基于AQS(AbstractQueuedSynchronizer)实现的。具体代码实现会在接下来的文章里讲解。

6. 异步结果

在主从模式中,手工创建线程往往比较麻烦,一种常见的模式是异步调用,异步调用返回一个一般称为Promise或Future的对象,通过它可以获得最终的结果。在Java中,表示子任务的接口是Callable,声明为:

public interface Callable<V> {
    V call() throws Exception;
}

为表示异步调用的结果,我们定义一个接口MyFuture,如下所示:

public interface MyFuture <V> {
    V get() throws Exception ;
}

这个接口的get方法返回真正的结果,如果结果还没有计算完成,get会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。

为方便主线程调用子任务,我们定义一个类MyExecutor,其中定义一个public方法execute,表示执行子任务并返回异步结果,声明如下:

public <V> MyFuture<V> execute(final Callable<V> task)

利用该方法,对于主线程,它就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果,比如,在主线程中,可以类似这样启动异步调用并获取结果:

public class Test {
    public static void main(String[] args) {
        MyExecutor executor = new MyExecutor();
        // 子任务
        Callable<Integer> subTask = () -> {
            // ... 执行异步任务
            int millis = (int) (Math.random() * 1000);
            Thread.sleep(millis);
            return millis;
        };

        // 异步调用,返回一个MyFuture对象
        MyFuture<Integer> future = executor.execute(subTask);

        try {
            // 获取异步调用的结果
            Integer result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MyExecutor的execute方法是怎么实现的呢?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程,该子线程的代码如下所示:

public class ExecuteThread<V> extends Thread{
    private V result = null;
    private Exception exception = null;
    private boolean done = false;
    private Callable<V> task;
    private final Object lock;

    public ExecuteThread(Callable<V> task, Object lock) {
        this.task = task;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            result = task.call();
        } catch (Exception e) {
            exception = e;
        } finally {
            synchronized (lock) {
                done = true;
                lock.notifyAll();
            }
        }
    }

    public V getResult() {
        return result;
    }

    public boolean isDone() {
        return done;
    }

    public Exception getException() {
        return exception;
    }
}

这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量,执行结束后设置共享状态变量done为true并调用notifyAll以唤醒可能在等待结果的主线程。

MyExecutor类实现:

public class MyExecutor {
    public <V> MyFuture<V> execute(final Callable<V> task) {
        final Object lock = new Object();
        final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
        thread.start();

        return () -> {
            synchronized (lock) {
                while (!thread.isDone()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException ignored) {
                    }
                }
                if (thread.getException() != null) {
                    throw thread.getException();
                }
                return thread.getResult();
            }
        };
    }
}

execute启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束。

以上的MyExecutore和MyFuture主要用于演示基本原理,实际上,Java中已经包含了一套完善的框架Executors,相关的部分接口和类有:

  • 表示异步结果的接口Future和实现类FutureTask
  • 用于执行异步任务的接口Executor、以及有更多功能的子接口ExecutorService
  • 用于创建Executor和ExecutorService的工厂方法类Executors

参考链接:

1. 《Java编程的逻辑》

2. Java API

赞(1) 打赏
Zhuoli's Blog » Java编程拾遗『线程协作』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址