Java SE 7 で追加された NIO2 の非同期ソケットチャネル

Java7にて NIO2 として不完全だった NIO 系ライブラリが拡張されました。非同期 SocketChannel を使って簡単なサーバのサンプルをば。
Java6までの ServerSocketChannel と SocketChannel に対応する Asynchronous 系のクラスが追加されました。

  • AsynchronousServerSocketChannel
  • AsynchronousSocketChannel

Asynchronous 系はこのほかに以下もあります。

  • AsynchronousFileChannel
  • AsynchronousDatagramChannel


blog1.mammb.com

の NIO によるHTTPサーバのサンプルを NIO2 で書いてみます。

Asynchronous 系 Channel と Future

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Nio2HttpServer {
    
    private static final int DEFAULT_PORT = 9000;
    private static final int TIMEOUT = 10;
    
    private final AsynchronousServerSocketChannel server;
    
    public Nio2HttpServer() throws IOException {
        server = AsynchronousServerSocketChannel.open();
        server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        server.bind(new InetSocketAddress(DEFAULT_PORT));
    }
    
    public void start() throws Exception {
        while(true) {
            Future<AsynchronousSocketChannel> acceptFuture = server.accept();
            handleRequest(acceptFuture.get());
        }
    }
    
    private void handleRequest(AsynchronousSocketChannel channel) {
        
        try (AsynchronousSocketChannel acceptedChannel = channel) {
            
            ByteBuffer buff = ByteBuffer.allocateDirect(8192);
            
            acceptedChannel.read(buff).get(TIMEOUT, TimeUnit.SECONDS);

            printRequest(buff);
            mkResponse(buff);
            buff.flip();
            
            acceptedChannel.write(buff).get(TIMEOUT, TimeUnit.SECONDS);
            
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }
    
    private void printRequest(ByteBuffer buff){
        buff.flip();
        byte[] bytes = new byte[buff.limit()];
        buff.get(bytes);
        buff.compact();
        System.out.println(new String(bytes));
    }
    
    private void mkResponse(ByteBuffer buff) {
        buff.clear();
        String text = "HTTP/1.1 200 OK\n\n"
            + new Date() + " (" + Thread.currentThread() + ")";
        buff.put(text.getBytes());
    }
    
    public static void main(String[] args) throws Exception {
        new Nio2HttpServer().start();
    }
}

Asynchronous 系のクラスでは、API構成はほとんど変わりませんが、戻りとして Future を返却することで、非同期のタスクを細かく制御できるようになっています。
あと、SocketChannel 系では、今まで直接 socket に設定していたオプションを setOption() getOption() にて扱うようになったぐらいでしょうか。

CompletionHandler

AsynchronousServerSocketChannel#accept には上記の Future を返却するものに加え、CompletionHandler によりコールバックを指定することもできます。CompletionHandler を使うと start() は以下のように書くことができます(CompletionHandler 無いで再帰的にserver.accept(null, this)している点に注意してください)。

    public void start() throws Exception {
        server.accept(null, 
            new CompletionHandler<AsynchronousSocketChannel, Void>() {
            
                public void completed(AsynchronousSocketChannel ch, Void att) {
                    server.accept(null, this);
                    handleRequest(ch);
                }

                public void failed(Throwable exc, Void att) {
                    exc.printStackTrace();
                }
            });
    }

上記は、AsynchronousServerSocketChannel#accept の例ですが、AsynchronousSocketChannel#read や AsynchronousSocketChannel#write についても同じように CompletionHandler を指定することができます。

AsynchronousChannelGroup

AsynchronousServerSocketChannel はスレッドプールにて非同期 I/O を処理するには AsynchronousChannelGroup が提供されています。スレッドプールで対象にスレッドを処理するのであれば、非同期I/Oを使う意味があまりないですが。。
AsynchronousChannelGroup の以下の3つのstaticメソッドにてスレッドグループを作成します。

  • withCachedThreadPool(ExecutorService executor, int initialSize)
  • withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
  • withThreadPool(ExecutorService executor)


以下のように AsynchronousChannelGroup を作成してAsynchronousServerSocketChannel.open() に引数として渡します。

AsynchronousChannelGroup threadGroup = 
    AsynchronousChannelGroup.withFixedThreadPool(2, Executors.defaultThreadFactory());
AsynchronousServerSocketChannel channel =
    AsynchronousServerSocketChannel.open(threadGroup);


AsynchronousChannelGroup には以下のメソッドがあり、これらにより AsynchronousServerSocketChannel で利用しているスレッドの Shutdown と Termination の処理を制御できます。

  • awaitTermination(long timeout, TimeUnit unit)
  • isShutdown()
  • isTerminated()
  • shutdown()
  • shutdownNow()