coding……
但行好事 莫问前程

Java编程拾遗『线程中断』

之前的文章,我们已经介绍了线程的概念、使用、同步及线程协作,本篇文章来重点讲一下Java中如何取消或关闭一个线程。

1. 线程取消/关闭的场景

我们知道,通过线程的start方法启动一个线程后,线程开始执行run方法,run方法运行结束后线程退出,那为什么还需要结束一个线程呢?有多种情况,比如说:

  • 很多线程的运行模式是死循环,比如在生产者/消费者模式中,消费者主体就是一个死循环,它不停的从队列中接受任务,执行任务,在停止程序时,我们需要一种”优雅”的方法以关闭该线程。
  • 在一些图形用户界面程序中,线程是用户启动的,完成一些任务,比如从远程服务器上下载一个文件,在下载过程中,用户可能会希望取消该任务。
  • 有时,我们会启动多个线程做同一件事,比如类似抢火车票,我们可能会让多个好友帮忙从多个渠道买火车票,只要有一个渠道买到了,我们会通知取消其他渠道。

2. 取消/关闭的机制

我们都知道,Thread类中有个方法stop(),如下:

public final void stop()

stop方法停止一个线程会导致解锁其上被锁定的所有监视器(监视器以在栈顶产生ThreadDeath异常的方式被解锁)。如果之前被这些监视器保护的任何对象处于不一致状态,其它线程看到的这些对象就会处于不一致状态。这种对象被称为受损的 (damaged)。当线程在受损的对象上进行操作时,会导致任意行为。这种行为可能微妙且难以检测,也可能会比较明显。

不像其他未受检的(unchecked)异常, ThreadDeath悄无声息的杀死及其他线程。因此,用户得不到程序可能会崩溃的警告。崩溃会在真正破坏发生后的任意时刻显现,甚至在数小时或数天之后。这个方法也已经被标记为@Deprecated,简单的讲,我们不应该使用它来终止线程。

在Java中,停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出,本节我们主要就是来理解Java的中断机制。Thread类定义了如下关于中断的方法:

public boolean isInterrupted()
public void interrupt()
public static boolean interrupted() 

这三个方法名字类似,比较容易混淆,我们解释一下。isInterrupted()和interrupt()是实例方法,调用它们需要通过线程对象,interrupted()是静态方法,实际会调用Thread.currentThread()操作当前线程。

Java中每个线程都有一个标志位,表示该线程是否被中断了。

  • isInterrupted:就是返回对应线程的中断标志位是否为true。
  • interrupted:返回当前线程的中断标志位是否为true,但它还有一个重要的副作用,就是清空中断标志位,也就是说,连续两次调用interrupted(),第一次返回的结果为true,第二次一般就是false (除非同时又发生了一次中断)
  • interrupt:表示中断对应的线程,中断标志位设置为true。

3. 线程对中断的反应

interrupt()对线程的影响与线程的状态和线程在进行的IO操作有关。先主要考虑线程的状态,Java中线程有以下几种状态:

  • RUNNABLE:线程在运行或具备运行条件只是在等待操作系统调度
  • WAITING/TIMED_WAITING:线程在等待某个条件或超时
  • BLOCKED:线程在等待锁,试图进入同步块
  • NEW/TERMINATED:线程还未启动或已结束

对以上状态不熟悉的,可以去看一下之前这篇文章:Java编程拾遗『线程的使用』

3.1 RUNNABLE

如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标志位,没有任何其它作用。线程应该在运行过程中合适的位置检查中断标志位,比如说,如果主体代码是一个循环,可以在循环开始处进行检查,如下所示:

public class InterruptRunnableDemo extends Thread {
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            // ... 单次循环代码
        }
        System.out.println("done ");
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new InterruptRunnableDemo();
        thread.start();
        Thread.sleep(1000);
        thread.interrupt();
    }
}

3.2 WAITING/TIMED_WAITING

线程执行如下方法会进入WAITING状态:

public final void join() throws InterruptedException
public final void wait() throws InterruptedException

执行如下方法会进入TIMED_WAITING状态:

public final native void wait(long timeout) throws InterruptedException
public static native void sleep(long millis) throws InterruptedException
public final synchronized void join(long millis) throws InterruptedException

在这些状态时,对线程对象调用interrupt()会使得该线程抛出InterruptedException,需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。比如说,执行如下代码:

Thread t = new Thread (){
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println(isInterrupted());
        }
    }        
};
t.start();
try {
    Thread.sleep(100);
} catch (InterruptedException e) {
}
t.interrupt();

程序的输出为false,说明线程在WAITING/TIMED_WAITING状态下,线程对象调用interrupt方法会使线程抛出InterruptedException,同时将中断标志位清空,置为false。

sleep、join和wait方法声明都抛出了InterruptedException,InterruptedException是一个受检异常,线程必须进行处理。捕获到InterruptedException,通常表示希望结束该线程,线程大概有两种处理方式:

  • 向上传递该异常,这使得该方法也变成了一个可中断的方法,需要调用者进行处理
  • 有些情况,不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能抛出任何受检异常,这时,应该捕获异常,进行合适的清理操作,清理后,一般应该调用Thread的interrupt方法设置中断标志位,使得其他代码有办法知道它发生了中断

第一种方式的示例代码如下:

public void interruptibleMethod() throws InterruptedException{
    // ... 包含wait, join 或 sleep 方法
    Thread.sleep(1000);
}

第二种方式的示例代码如下:

public class InterruptWaitingDemo extends Thread {
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 模拟任务代码
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // ... 清理操作
                // 重设中断标志位
                Thread.currentThread().interrupt();
            }
        }
        System.out.println(isInterrupted());
    }

    public static void main(String[] args) {
        InterruptWaitingDemo thread = new InterruptWaitingDemo();
        thread.start();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        thread.interrupt();
    }
}

catch到InterruptedException后,将线程中断标志位重新设置为true,以通知所有线程中断。

3.3 BLOCKED

如果线程在等待锁,对线程对象调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正”中断”。我们看段代码:

public class InterruptSynchronizedDemo {
    private static Object lock = new Object();

    private static class A extends Thread {
        @Override
        public void run() {
            synchronized (lock) {
                while (!Thread.currentThread().isInterrupted()) {
                }
            }
            System.out.println("exit");
        }
    }

    public static void test() throws InterruptedException {
        synchronized (lock) {
            A a = new A();
            a.start();
            Thread.sleep(1000);

            a.interrupt();
            a.join();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        test();
    }
}

test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并等待线程线程a结束,线程a会结束吗?不会,interrupt方法只会设置线程的中断标志,而并不会使它从锁等待队列中出来。

微修改下代码,去掉test方法中的最后一行a.join,即变为:

public static void test() throws InterruptedException {
    synchronized (lock) {
        A a = new A();
        a.start();
        Thread.sleep(1000);

        a.interrupt();
    }
}

这时,程序就会退出。为什么呢?因为主线程不再等待线程a结束,释放锁lock后,线程a会获得锁,然后检测到发生了中断,所以会退出。

使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。如果这对程序是一个问题,应该使用显式锁,它支持以响应中断的方式获取锁。

3.4 NEW/TERMINATE

如果线程尚未启动(NEW),或者已经结束(TERMINATED),则调用interrupt()对它没有任何效果,中断标志位也不会被设置。比如说,以下代码的输出都是false。

public class InterruptNotAliveDemo {
    private static class A extends Thread {
        @Override
        public void run() {
        }
    }

    public static void test() throws InterruptedException {
        A a = new A();
        a.interrupt();
        System.out.println(a.isInterrupted());

        a.start();
        Thread.sleep(100);
        a.interrupt();
        System.out.println(a.isInterrupted());
    }

    public static void main(String[] args) throws InterruptedException {
        test();
    }
}

3.5 IO操作

如果线程在等待IO操作,如果线程中断标志被设置,跟BLOCKED状态类似,线程也不响应中断请求。InputStream的read调用,该操作是不可中断的,如果流中没有数据,read会阻塞 (但线程状态依然是RUNNABLE),但是线程并不会响应interrupt(),与synchronized类似,调用interrupt()只会设置线程的中断标志,而不会真正”中断”它,我们看段代码:

public class InterruptReadDemo {
    private static class A extends Thread {
        @Override
        public void run() {
            while(!Thread.currentThread().isInterrupted()){
                try {
                    System.out.println(System.in.read());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("exit");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        A t = new A();
        t.start();
        Thread.sleep(100);

        t.interrupt();
    }
}

线程t启动后调用System.in.read()从标准输入读入一个字符,不要输入任何字符,我们会看到,调用interrupt()不会中断read(),线程会一直运行。

不过,有一个办法可以中断read()调用,那就是调用流的close方法,我们将代码改为:

public class InterruptReadDemo {
    private static class A extends Thread {
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    System.out.println(System.in.read());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("exit");
        }

        public void cancel() {
            try {
                System.in.close();
            } catch (IOException e) {
            }
            interrupt();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        A t = new A();
        t.start();
        Thread.sleep(100);

        t.cancel();
    }
}

我们给线程定义了一个cancel方法,在该方法中,调用了流的close方法,同时调用了interrupt方法,这次,程序会输出:

-1
exit

也就是说,调用close方法后,read方法会返回,返回值为-1,表示流结束。 另外这里要注意的是,System.in类型为InputStream,初始化操作是JVM反射System类的initializeSystemClass方法实现的,所以System.in的真正类型是BufferedInputStream,最终通过native方法setIn0,完成System.in赋值。所以System.in跟具体操作系统有关联,在不同操作系统下上面代码的close操作未必可以真正关闭输入流。如果上述cancel方法不能终止线程,应该就是因为在当前环境下未能关闭System.in输入流的原因。以上示例代码,在Mac下可以正常终止。

4. 如何正确地取消/关闭线程

以上,我们可以看出,interrupt方法不一定会真正”中断”线程,它只是一种协作机制,如果不明白线程在做什么,不应该贸然的调用线程的interrupt方法,以为这样就能取消线程。

对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供单独的取消/关闭方法给调用者,类似于InterruptReadDemo中演示的cancel方法,外部调用者应该调用这些方法而不是直接调用interrupt

如果是单机环境,我们可以单独提供一个volatile的boolean成员变量,用以通知各个线程是否终止:

public class InterruptReadDemo1 extends Thread{
    private static volatile boolean terminateFlag = false;

    @Override
    public void run() {
        while (!terminateFlag) {
            //线程执行逻辑
        }

        System.out.println("exit");
    }

    public void cancel() {
        terminateFlag = true;
    }

    public static void main(String[] args) throws InterruptedException{
        InterruptReadDemo1 thread = new InterruptReadDemo1();
        thread.start();
        Thread.sleep(1000);
        thread.cancel();
    }
}

其实原理跟interrupt差不多,使用起来也很相似。比如BLOCKED状态的线程也不会响应terminateFlag。但也有一些不同:

但是对于WAITING/TIME_WAITING状态有些不同,这两种状态的线程对象调用interrupt方法会抛出InterruptException,需要在捕获到异常之后,重设中断标志位或向上传递异常。但使用terminateFlag,只要等线程从WAITING状态恢复到运行状态,那就可以终止了,不用“重设中断标志”。

对于NEW/TERMINATE状态,也有一些区别。terminateFlag对NEW/TERMINATE状态生效,处于NEW/TERMINATE状态的线程对象改变terminateFlag,依然会影响其他线程。

当然这里terminateFlag只能用于单机环境,对于分布式环境是不适用的。分布式环境可以考虑使用Redisson提供的分布式AtomicLong(这是我自己YY的,没见过这种用法)。

其实除了上述在线程中提供终止/取消的方法,Java中还可以通过钩子程序java.lang.Runtime.addShutdownHook() ,保证java程序安全退出。addShutdownHook方法可以加入一个钩子,在程序退出时触发该钩子(这里的退出是指ctrl+c或者kill -15,但如果用kill -9 那是没办法的)。比如我们可以在线程类中添加如下静态代码快:

static {
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        //这里是程序退出后执行的逻辑,可以在这里写中断标志,也可以等待线程结束的逻辑
        terminateFlag = true;
    }));
}

关于addShutdownHook可以参考文章最后的参考链接。

参考链接:

1. 《Java编程的逻辑》

2. Java API

3. 利用 java.lang.Runtime.addShutdownHook() 钩子程序,保证java程序安全退出

赞(1) 打赏
Zhuoli's Blog » Java编程拾遗『线程中断』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址