Java におけるタスクのキャンセル処理


タスクのキャンセル

Java にはスレッドを強制的に停止する安全な方法は存在しません。

Thread.stopThread.suspend などはこの用途に適したものではありますが、Java非推奨スレッド・プリミティブ にある通り、これらは非推奨であり@Deprecated(since="1.2") とマークされているため使うべきではありません。

あるタスクをキャンセルするには協力的な方法を取るしかありあせん。

つまり、タスクやタスクを実行しているスレッドにキャンセルを依頼し、タスクやタスクを実行しているスレッドが安全に終了するのを待つことしかできません。


単純なキャンセル可能タスク

単純なタスクであれば、フラグを経由してキャンセルを行うことができます。

public class CancelableTask implements Runnable {

    private volatile boolean cancelled;

    @Override
    public void run() {
        while (!cancelled) {
            System.out.println(this.toString());
        }
    }

    public void cancel() {
        cancelled = true;
    }
}

注意点としては、フラグは volatile 宣言しなければならない点です。 volatile 宣言を忘れた場合には、メモリの可視性に起因して、タスクが永遠にキャンセルされない可能性があります。

タスクはスレッドのオーナー側から以下のようにキャンセルを依頼することができます。

CancelableTask task = new CancelableTask();
new Thread(task).start();
// ...
task.cancel();

この例では、単純に println() しているだけですが、通常はより大きな(時間のかかる)処理を行うため、キャンセルを依頼したとしても直ちにキャンセルされるとは限りません。

加えて、処理の中で Socket I/O や BlockingQueue への追加といった、ブロックメソッドのコールが行われていた場合、キャンセルフラグのチェックに永遠に到達せずにキャンセルが行われない可能性があります。


スレッドのインタラプト

Thread にはスレッドへの割り込み(interrupt)を扱うステータスがあります。キャンセル処理には通常この割り込みを使うのが妥当です。

スレッドのインスタンスに対して interrupt() を呼び出すことで、Thread の割り込みステータスを変更することができます。

このメソッドはスレッドのステータスを変更するだけで、それに対してどのように振る舞うかは実装で定義する必要があります。何も行わない ということも出来ます。

インタラプトは以下のように実現できます。

public class CancelableTaskThread extends Thread {

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println(this.toString());
                Thread.sleep(3000);
            }
        } catch (InterruptedException e) {
            System.out.println(e);
            Thread.currentThread().interrupt();
        }
    }

    public void cancel() {
        interrupt();
    }
}

処理の先頭で isInterrupted() で割り込みのステータスをチェックしています。この時点で interrupt() されていた場合は処理がキャンセルされたものとして早期脱出します。

Thread.sleep() のようなブロックメソッドは概ね以下のようなチェックを行っています(Thread.sleep() は native メソッドですが概念的には同じです)。

if (Thread.interrupted()) throw new InterruptedException();

Thread.interrupted() は現在の割り込みステータスを返すとともに、割り込みステータスをクリアします。 InterruptedException がブロックメソッド側で発生するためブロックメソッドの処理途中でキャンセルとして扱うことができます。

InterruptedException を catch した場合、そのまま例外をスローするか、割り込みステータスを設定して呼び出し側でそれを適切に処理できるようにすることができます(前述の通りInterruptedException がスローされた時点で割り込みステータスはクリアされるため)。

ここでの例では例外を catch して握りつぶす代わりに、割り込みステータスを復元しています。


Future によるキャンセル

Java 1.5 からは、タスクのキャンセルは Future を使います。

Callable<Void> task = new Callable<>() {
    @Override
    public Void call() throws Exception {
        while (true) {
            if (Thread.interrupted())
                throw new InterruptedException();
            System.out.println(this.toString());
            Thread.sleep(3000);
        }
    }
};

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Void> future = executor.submit(task);
// ...
future.cancel(true);

ExecutorServicesubmit することで Future が得られるため、タスクのキャンセルは Future.cancel() で行います。

Future.cancel() は引数にブーリアンの mayInterruptIfRunning を受け取ります。これが true の場合は、どこかのスレッドで実行中のタスクがインタラプトされます。false とした場合は、タスクがスタートしていない場合に限りタスクの実行がキャンセルされます。

なお上記例では Callable を使っていますが、InterruptedException をスローする必要がなければ(catch して割り込みステータスを復元するなどを行うなど) Runnable を使うこともできます( Runnable はチェック例外を投げることができない )。


Future による実行時間の制限

Future を使えばタイムアウトが必要な処理も簡単に実現できます。

public class TimedRun {

    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public static void submit(
            Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
        Future<?> task = executor.submit(r);
        try {
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // logger.warning("Timeout:" + e.getMessage());
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        } finally {
            task.cancel(true);
        }
    }

    private static RuntimeException launderThrowable(Throwable t) {
        if (t instanceof RuntimeException) {
            return (RuntimeException) t;
        } else if (t instanceof Error) {
            throw (Error) t;
        } else {
            throw new IllegalStateException("Not unchecked", t);
        }
    }
}

Future.get(long timeout, TimeUnit unit) で結果取得のタイムアウトを指定できます。指定時間内に結果取得が出来ない場合には TimeoutException がとなり finally にて処理がキャンセルされます(処理が時間内に完了していたとしても cancel の呼び出しは無害です )。

ExecutionException は処理の過程で発生したアプリケーション例外となるため、getCause() にて取り出した結果を再度スローしています。


インタラプトできないブロッキング

Java API のブロック処理は、たいていの場合インタラプトに応じて、InterruptedException をスローしますが、ソケットの同期I/Oなど、インタラプトに応答しないものも存在します。

インタラプトに応答しないブロックをキャンセル処理するには、ブロックしている原因に応じた操作が必要になります。

ブロックするメソッドは概ね、 JavaDoc に割り込み方法が明記されています。

その一例を以下に示します。

操作 インタラプト方法
java.io のソケット同期I/O ソケットをクローズすれば read/write でブロックしているスレットが SocketException をスローする
java.nio の同期I/O InterruptibleChannel インターフェースを実装したチャネル上でブロックされたスレッドがある場合にはブロックされたスレッドの interrupt を呼び出すことでチャネルがクローズし、ブロックされたスレッドが ClosedByInterruptException を受け取る
このチャネルの close メソッドを呼び出すことで、ブロックされたスレットが AsynchronousCloseException を受け取る
Selector による非同期I/O select() によってブロックされたスレッドは、セレクタの wakeup メソッドの呼び出しか、セレクタの close メソッドの呼び出し、またはブロックされたスレッドの interrupt の呼び出しにより他のスレッドからの割り込みを受け付ける。
synchronized による固有ロック synchronized でブロックしている場合は、このスレッドをキャンセルすることはできない。
明示的にロックを行うLock クラスのlockInterruptibly を使えばロックを待機しながらインタラプトに応答することができる。

一例として、Socket のI/O を扱う以下のタスクは割り込みに応答せず、ブロックから抜け出すことができません。

ServerSocket serverSocket = new ServerSocket(9090);
Callable<Integer> task = () -> {
    Socket socket = serverSocket.accept();
    return socket.getInputStream().read();
};
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(task);

// ...
Socket client = new Socket("localhost", 9090);
// ...
future.cancel(true);

serverSocket はクライアントからの接続があると、serverSocket.accept() から Socket を返します。 その後、socket.getInputStream().read() でブロックすると、割り込みに応答することはありません。

以下で Socket のI/Oを割り込みに応答させる例を示します。


Socket の I/O ブロックに割り込みする

前述したように、Socket の同期I/Oはインタラプトに応答しません。I/O 待ちとなっているスレッドへインタラプトするには Socket をクローズする必要があります。


AbstractExecutorService には以下のような newTaskFor() という protected なメソッドがあり、このメソッドはExecutorService.submit() 時に返却される Future のファクトリメソッドになっています。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

ExecutorService を継承し、このメソッドをオーバーライドして独自の割り込み処理を定義した Future を生成することができます。


特別なキャンセル処理可能なタスクを表すインターフェースを以下のように作成します。

public interface CancellableTask<T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}


作成したインターフェースの実装を以下のように作成します。

public abstract class SocketUsingTask<T> implements CancellableTask<T> {

    private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }

    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
        }
    }

    public RunnableFuture<T> newTask() {
        return new FutureTask<>(this) {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

キャンセルにてソケットをクローズするように cancel() を実装しています。

そして newTask() で新しい Furure を生成する際に、Future.cancel() の前処理として、ソケットをクローズする close() を呼び出しています。その後、finally にて通常の Future.cancel() を処理します。


最後に ExecutornewTaskFor() をオーバライドして上で定義した Furure のファクトリを呼ぶようにします。

public class CancellingExecutor extends ThreadPoolExecutor {
    
    public CancellingExecutor() {
        super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    }
    // ...

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask) {
            return ((CancellableTask<T>) callable).newTask();
        } else {
            return super.newTaskFor(callable);
        }
    }
}

これで、Future.cancel() の割り込みに応答することができます。


以下のようにすれば、future.cancel() にてブロックしていたストリームからの読み込みが SocketException で脱出し、割り込みに応じたキャンセルが実現できます。

ServerSocket serverSocket = new ServerSocket(9090);

CancellableTask<Integer> task = new SocketUsingTask<>() {
    @Override
    public Integer call() throws Exception {
        try {
            Socket socket = serverSocket.accept();
            setSocket(socket);
            return socket.getInputStream().read();
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }
};

ExecutorService executor = new CancellingExecutor();
Future<?> future = executor.submit(task);
// ...
Socket client = new Socket("localhost", 9090);
// ...
future.cancel(true);


まとめ

  • Java にはスレッドを安全に強制終了する仕組みがなく、割り込み要求による協力的な方法を取る必要がある
  • タスクのキャンセルはExecutorService を経由して得た Future で割り込みを使うのがベター
  • ブロックするメソッド全てが割り込みに応答するとは限らず、特殊なキャンセルが必要となるケースがある