Java によるネットワークプログラミングの基礎知識




はじめに

Java でネットワークプログラミングを行うのであれば Netty を使えば良いのですが、Netty に至るまでの標準APIの変遷についての話題は知っておいて損はない内容だと思います。 いまさら書く必要のある内容では無いかもしれませんが、とりまとまった情報源も乏しいため、Java標準APIを利用したネットワークプログラミングについて説明していきます。


プログラミングモデルの変遷

Java でネットワークプログラミングを行うには、JDK1.4 以前では Socket を使ったシンプルなプログラミングモデルが提供されていました。

JDK1.4 では JSR 51: New I/O APIs for the Java Platform として java.nio パッケージが導入され、スケーラブルな I/O を行う API が整備されました。旧来のプログラミングモデルでは I/O 操作がブロックするブロッキング I/O でしたが、NIOの導入によりノンブロッキングI/Oが使えるようになりました。NIO に比較して、旧来の I/O は Old I/O (OIO) と呼ばれます。

そして、JDK1.7 では JSR 203: More New I/O APIs for the Java Platform として、java.nio パッケージにI/Oを非同期に扱うAPIが整備されました。これは NIO2 と呼ばれます。NIO2 では、I/O 操作の結果を Future で得るプログラミングモデルと、CompletionHandler によるコールバックでI/O操作の結果を扱うプログラミングモデルが提供されました。

このような経緯から、標準APIでネットワークプログラミングを行う場合には、以下のような選択肢が生まれます。

  • OIO のブロッキングI/O
  • NIO のノンブロッキングI/OとI/O多重化
  • NIO2 の非同期チャネルと Future
  • NIO2 の非同期チャネルと 非同期ハンドラ

厄介なのは、これらは異なるプログラミングモデルでありAPIにも互換性が無い点で、パフォーマンス特性の変化に応じて実装を切り替えることが非常に困難な点です。

実装に取り掛かる前に、必要なパフォーマンス特性を想定してプログラミングモデルを選択する必要があります。 さらに、選択したプログラミングモデルに応じて異なった実装上で考慮しなければならない事項があります。


ここでは簡単なサーバ実装を題材にして、それぞれのプログラミングモデルについて眺めていきたいと思います。


ソケット

OIO でネットワークプログラミングを行う場合の中心人物はソケットになります。

ソケットは、通信サービスと外部を結ぶ端点(endpoint)を表す抽象概念であり、ホスト自身のIPアドレスとサービスを表すポート番号が結びついています。

Java では java.net.Socketjava.net.ServerSocket といったクラスでソケットを扱います。


TCP接続を行うにはクライアント側で Socket を作成します。

Socket sock = new Socket();
sock.connect(new InetSocketAddress(host, port));

Socket をインスタンス化し、対象サーバへの接続(connect)を行います。


一方サーバ側では ServerSocket を作成し、accept() で接続を待ち受けます。

ServerSocket serverSocket = new ServerSocket(8080);
Socket socket = serverSocket.accept();

accept() は、クライアントからの接続シーケンスが完了するまで処理をブロックします。 accept() により接続が確立されれば Socket を介してクライアントとの会話を行うことができます。具体的には、 Socket から取得したストリームを介することでTCP上でのメッセージのやり取りを行います。

InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();


クライアントからの接続は、以下のようなシーケンスでTCPセグメントの交換が行われます。

  1. SYN(クライアント->サーバ)
  2. SYN/ACK(サーバ->クライアント)
  3. ACK(クライアント->サーバ)

2 の時点でクライアントの connect() がリターンし、3 の時点でサーバの accept() がリターンします。


ソケットによるサーバ実装

ServerSocket.accept() で接続を受付して、Socket 経由で取得したストリームを介してやり取りを行えばよいため、単純なサーバは以下のように実装することができます。

ServerSocket serverSocket = new ServerSocket(9000);
for (;;) {
    Socket socket = serverSocket.accept();
    handle(soket);
}

Socket の処理は、クライアントからの送信内容をそのままエコーする例としては以下のような実装が考えられます。

private void handle(Socket socket) {
    try (socket) {
        InputStream in = socket.getInputStream();
        OutputStream out = socket.getOutputStream();
        byte[] bytes = new byte[8192];
        int count;
        while ((count = in.read(bytes)) >= 0) {
            out.write(bytes, 0, count);
        }
        out.flush();
    } catch (IOException e) { }
}

クライアントとの接続が確立された Socket からストリームを取得することで、読み込みと書き込みをブロッキングI/Oで行っています。

ここでは Socket をクローズしていますが、InputStreamOutputStream のいずれかをクローズするだけでも十分です。


上記例では、サーバの処理がシングルスレットで動くため、1つのクライアントとの会話が終了しない限り他のクライアントの要求を処理することができません。

よって通常は、クライアントとの会話を処理するワーカースレッドを用意して複数のクライアントの処理を並列で扱います。

例えば以下のようになります。

ServerSocket serverSocket = new ServerSocket(9000);
for (;;) {
    Socket socket = serverSocket.accept();
    new Thread(() -> handle(socket)).start();
}


スレッドプールを利用したサーバ実装

リクエストを並列に処理するサーバの実装には、ExecutorService を利用するのが簡単です。

例えば以下のようになります。

ServerSocket serverSocket = new ServerSocket(9000);
ExecutorService executor = Executors.newWorkStealingPool();
for (;;) {
    Socket socket = serverSocket.accept();
    executor.execute(() -> handleConversation(socket));
}

Executors.newWorkStealingPool() により(デフォルトでは)使用可能なプロセッサ数に応じたスレッドプールが作成され、タスクが競合回避のために複数の(スレッド別の)キューにより管理されるようになります。

Stealing という名前の通り、もし自身のキューのタスクが無くなれば、他のキューからタスクを盗んで実行します。この仕組により、スレッドの数を抑えながら良好な並列性を実現することができます。

クライアントの数が多く、接続が長期間に渡る場合は newCachedThreadPool() を使うことで必要なスレッドが生成され、処理が終われば(60秒以内であれば)プールにキャッシュされ、次の処理でスレッドが使い回されます。

ServerSocket serverSocket = new ServerSocket(9000);
ExecutorService executor = Executors.newCachedThreadPool();
for (;;) {
    Socket socket = serverSocket.accept();
    executor.execute(() -> handleConversation(socket));
}


全体としては以下のような実装が考えられます。

public class OioEchoServer {

    private static final int BUFFER_SIZE = 128 * 1024;
    private static final int SO_TIMEOUT = 60 * 1000;

    private final ServerSocket serverSocket;
    private final ExecutorService executorService;

    public OioEchoServer(int port) {
        try {
            this.serverSocket = new ServerSocket(port);
            this.serverSocket.setReceiveBufferSize(BUFFER_SIZE);
            this.executorService = Executors.newCachedThreadPool();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void start() {
        for (;;) {
            if (serverSocket.isClosed()) break;
            handleAccept();
        }
    }

    private void handleAccept() {
        try {
            Socket socket = serverSocket.accept();
            System.out.println("accept connection " + socket);
            socket.setSendBufferSize(BUFFER_SIZE);
            socket.setSoTimeout(SO_TIMEOUT);
            executorService.execute(() -> handleConversation(socket));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void handleConversation(Socket socket) {
        try (socket) {
            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();
            byte[] bytes = new byte[8192];
            int count;
            while ((count = is.read(bytes)) >= 0) {
                os.write(bytes, 0, count);
                os.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String... args) {
        new OioEchoServer(9000).start();
    }

}

この例では、リクエストをそのままレスポンスとして返却するエコーサーバです。 Telnet などで接続して文字列を送信すれば、その列をそのままエコーとしてレスポンスします。

実装はとても素直でシンプルで、これが Socket を使った場合の利点になります。


スレッドプールとして newWorkStealingPool() を使った場合は、CPU数に応じたスレッドが上限となるため、ネットワーク回線が細くリクエストの到着に時間がかかったり、レスポンスのための処理自体に時間がかかるような場合には、スレッドが専有されて新しいリクエストが待機してしまうなどの欠点があります。

スレッドプールとして newCachedThreadPool() を使った場合は、大量のスレッドで接続を同時に処理できますが、スレッドのメモリ消費やコンテキストスイッチにより全体的なスループットが低下してしまう欠点があります。


ブロッキングI/Oの課題

Java における通常のI/O操作はブロッキングI/Oです。先程のサーバ実装もブロッキングI/Oが使われています。

ブロッキングI/Oでは、I/O操作で扱う対象のデータが得られない場合、または出力できない場合はI/O操作はリターンしません。つまりこれはスレッドがブロックされることを意味します。

ネットワークのストリームからの読み込みはソケットの受信バッファにデータが無い間ブロックします。ソケットの受信バッファにわずかでもデータが入ったタイミングで読み込みのブロックがリターンしてスレッドの実行が再開します。

ネットワークのチャネルに対する書き込みはソケットの受信バッファにわずかでも空きができるまでブロックします。受信バッファにわずかでも空きができるとその大きさ分のデータが転送され、ブロックがリターンしてスレッドの実行が再開します。


先程の例では、スレッドプールから取得したスレッドで処理を行うようにしましたが、クライアントが接続して何もデータを送信しなかったり、到達に時間がかかったりした場合には、スレッドがブロックされます。また、クライアントにデータを送信する際に送信バッファが一杯だった場合にも同様にスレッドがブロックします。

スレッド数に上限を設けた場合には、全てのスレッドがブロックすると、新しいリクエストを処理するスレッドが用意できずに無応答になる可能性があります。無応答にならないまでも、モバイルなどの通信速度の遅いクライアントとの会話でスレッドが頻繁にブロックした場合にはサーバのレスポンスが全体的に低下する可能性があります。

これを防ぐためには、スレッド数の上限を外し、全ての接続要求に都度スレッドを割り当てることが考えられます。ブロックするスレッドがあったとしても、他のスレッドが他のクライアントとの会話を処理することができるようになります。しかし多数の同時接続が発生するケースでは、スレッドの生成コストやスレッドの消費メモリ、スレッドのコンテキストスイッチによるリソース消費などが問題となってきます。

これが俗に言うC10K問題で、同時接続数が1万台にもなるとハードウェアの性能には問題ないにも関わらず、サービスのレスポンスが劣化する事象です。1つのリクエストを1つのスレッドに紐付けるアプローチを取った場合には、スレッドの増加によるメモリの圧迫やコンテキストスイッチの多発によりレスポンスが劣化していきます(ファイルディスクリプタの制限による影響など原因はこれだけに限りませんが)。


C10K問題に対抗するには、少ないスレッドで効率よくクライアンを捌いていけばよいのですが、従来型のブロッキングI/Oではこれを実現することが難しいため、ノンブロッキングI/Oを使う必要がでてきます。 もちろんWebアプリケーションなどの場合はバックエンドのデータベース側がボトルネックになって性能が出ないケースの方が圧倒的に多いのですが、単なるWebサーバであったりプロキシであったり、チャットアプリケーションのような接続が長時間に及ぶようなケースではノンブロッキングI/Oを使ったサーバ実装が有用です。


ノンブロッキングI/O

ブロッキングI/Oの課題を解決するために、JDK1.4 でノンブロッキングI/Oが導入されました。

ノンブロッキングの読み込みはデータが得られなかったらゼロを返します。ノンブロッキングの書き込みは送信バッファに書き込みのスペースがなければデータの転送を行わずにゼロを返します。I/O操作がブロックしないため、スレッドはその間より有用な処理を行うことができます。


ノンブロッキング I/O は java.nio パッケージのチャネル(Channel)を使って実現します。

チャネルはソケットなどの外部のデータソースに接続されたデータパスです。読み込みや書き込み処理をノンブロッキングで行うことができますし、ブロッキングで行うこともできます(利用中にモードの切り替えを行うこともできます)。

例えば ServerSocket に該当するチャネルは以下のように取得します。

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));

チャネルはデフォルトでブロックモードとなっているため、configureBlocking() にて明示的にノンブロッキングを指定することでノンブロッキング I/O が可能になります。

Socket に相当するチャネルは、同様に以下のように取得できます。

SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
socketChannel.read(buffer);


チャネルを使った I/O 操作は必ず java.nio.Buffer を介して行う必要があります。上記例では、この実装クラスである ByteBuffer を使っています。

従来のストリームを使った操作では、ストリームを組み合わせることでバッファリングなどの処理を実現できましたが、これにより内部で無駄なデータのコピーなどが発生して非効率でした。java.nio.Buffer では低レベルなAPIの扱いにくさと引き換えに、効率的なデータの入出力が可能となります。


チャネルを使った I/O として、serverSocketChannel.accept() では対象となる接続が無い場合には即座にnull を返しますし、socketChannel.read(buffer) でも読み込み対象が無い場合には即座にリターンします。

チャネル I/O でブロックしないのは良いのですが、I/O操作が完了するまで処理をループしてしまっては、そもそもスレッドが専有されてしまいスレッドがブロックしている状況となんら変わらないままです。

対象の準備が整ったタイミングで通知を受けて処理を行うことができればスレッドを有効に活用することができます。この用途で利用するのがセレクタ(Selector)で、I/Oの多重化が可能になります。


I/O多重化(multiplexing)

これからI/Oを行う複数のチャネルをセレクタに登録しておけば、いずれかのチャネルでI/O操作を実施できる状態になったタイミング(やタイムアウトとなったタイミング)で通知を受けることができます。 これを利用することで、1つのスレッドで複数の接続を扱えるようになりI/Oの多重化が実現できます。

セレクタは、SelectorProvider 実装により提供され、Linux の場合は poll ベースの実装が提供されていましたが、JDK6 からは、Linux 2.6 以降のカーネルが検知された場合、よりスケーラブルな epoll 実装が利用されます(BSD では kqueue 実装になります)。


セレクタ Selector は以下のように取得して、チャネルと紐付けます(セレクタにチャネルを登録します)。

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

register() により SelectionKey のインスタンスが生成され。Selector に登録されます。合わせてチャネル自身にも生成された SelectionKey のインスタンスが格納されます。

register() の第2引数には関心集合として通知を受けたい対象を指定します。上記ではソケットのアクセプト操作が可能となったタイミングで通知を受け取る例となっています。

関心集合として指定可能なものは以下があります。

  • OP_READ : 対象のチャネルに対する読み込みが可能となった
  • OP_WRITE : 対象のチャネルに対する書き込みが可能となった
  • OP_CONNECT : 対象のチャネルで接続が可能となった(クライアント側)
  • OP_ACCEPT : 対象のチャネルで accept が可能となった(サーバ側)

これらはint値の定数として定義されており、論理演算により複数の関心集合を表すことができます。

指定した関心集合に合致するイベントが通知されますが、各操作が可能となった場合の他、例外が投げられようとしている場合もセレクタの操作対象として通知されます。

register() によるセレクタへの登録とselector との関係は多少分かりづらいと思うので簡単に実装を紹介しておきます。 チャネルのregister() をコールした際、チャネルの保持する SelectionKey の配列から、対象とする selector に該当する SelectionKey を検索します。もし既に登録があれば、SelectionKey の関心集合と添付を更新し、無ければ 対象とする selectorSelectionKey を登録しつつチャネル自身にも SelectionKey を保持します。 つまり以下のような実装になっています。

チャネルにおける実装は以下のようになっています(簡略化しています)。

public final SelectionKey register(Selector sel, int ops, Object att) {
    SelectionKey key = findKey(sel);
    if (key != null) {
        key.attach(att);
        key.interestOps(ops);
    } else {
        key = ((AbstractSelector)sel).register(this, ops, att);
        addKey(key);
    }
}

1行目の findKey() は以下のように、対象の selector に合致するキーを検索します。

private SelectionKey[] keys;
private SelectionKey findKey(Selector sel) {
    for (int i = 0; i < keys.length; i++)
        if ((keys[i] != null) && (keys[i].selector() == sel))
            return keys[i];
    return null;
}

SelectionKey はチャネル(クライアントとの接続)とセレクタで一意となります。ですので、通常は行いませんが、チャネルに複数のセレクタを登録することもでき、その結果複数の SelectionKey が生成されることになります。


対象のチャネルで指定した操作の準備が整った場合に通知を受け取るには以下のようにします。

int count = selector.select();

select() はブロックします。

準備が整ったタイミングでリターンし、セレクトされたチャネルの数を返します(セレクト結果がゼロを返した場合には、タイムアウトの発生/登録されたキーのキャンセル/セレクト操作の中止(Selector.wakeup())によるブロックの解除 のいすれかが発生したことになります)。

セレクトされた結果は SelectionKey として取得できます。

Set<SelectionKey> selectedKeys = selector.selectedKeys();

SelectionKey から取得するチャネルは I/O 操作の準備が整った状態のチャネルということになります。

SelectionKey は以下のようなループの中で処理を行うことができます。

Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();
    if (!key.isValid()) {
        continue;
    }
    if (key.isAcceptable()) {
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        // ...
    }
    if (key.isReadable()) {
        SocketChannel channel = (SocketChannel) key.channel();
        // ...
    }
    if (key.isWritable()) {
        SocketChannel channel = (SocketChannel) key.channel();
        // ...
    }
}

注意すべきは、セレクトされた SelectionKey は取得したあとで明示的に remove() しないと selectedKeys に残り続けてしまう点です。後の利用がない場合には忘れずに削除する必要があります。

なお、セレクタ操作を複数スレッドで行う場合は synchronized(selectedKeys) のようにセレクトキーの集合に対して同期化すべきです。


SelectionKey に対して関心集合を変更できます。例えば、書き込みデータがなくなった場合には以下のように現在の関心集合から SelectionKey.OP_WRITE を削除できます。

key.interestOpsAnd(~SelectionKey.OP_WRITE);

関心集合に SelectionKey.OP_WRITE を追加する場合は以下のようにすることができます。

key.interestOpsOr(SelectionKey.OP_WRITE);


SelectionKey には任意のオブジェクトを紐付けることができます。 紐付けを行うには、channel.register() の第3引数にオブジェクトを設定します。

channel.register(key.selector(), SelectionKey.OP_READ, buffer);

その上で、セレクトされたキーのattachment() により紐付けた対象のオブジェクトを取り出すことができます。

SelectionKey key = iterator.next();
ByteBuffer buffer = (ByteBuffer) key.attachment();

attachmentSelectionKey に登録されるため、特定のチャネルの選択キー別に特定の状態を関連付けることができます。例えば特定のチャネル(つまり特定のクライアント)からのリクエストの読み込み内容を添付しておくことで、セレクト操作が複数回にわたり行われた内容を蓄積しておくことができます。


バッファ操作

チャネルに対する読み込みや書き込みは java.nio.Buffer を介して行います。バッファは読み込みモードと書き込みモードを切り替えながら処理していきます。現在このバッファは読み込みモードなのか書き込みモードなのかをプログラマが把握していなければならないので扱いが面倒です。

例えば、buffer.hasRemaining() は書き込みモードであれば、このバッファに書き込みできる容量が残っているかどうかを表し、読み込みモードであれば、このバッファに読み込みできる容量が残っているかを返却します。ですので、戻り値の意味が、現在のモードによって異なるものとなり、つねに現在のモードがどちらになっているかを意識しながら実装する必要があります。


バッファは以下のように容量を指定してインスタンスを取得します。

ByteBuffer byteBuffer = ByteBuffer.allocate(10);

バッファには position と limit というプロパティがあり、初期状態は以下のようになっています。

f:id:Naotsugu:20200927193103p:plain

チャネルからバッファへの読み込みは以下のようになります。

socketChannel.read(byteBuffer);

または以下のようにバッファに値を格納できます。

byteBuffer.put(data);

7バイトのデータがバッファに入ったとすると以下のようになります。

f:id:Naotsugu:20200927193123p:plain

バッファへの書き込みモードを読み出しモードに変更するには以下のようにします。

byteBuffer.flip();

すると以下のような状態になります。

f:id:Naotsugu:20200927193142p:plain

チャネルに対してバッファの内容を書き込むには以下のようにします。

socketChannel.write(byteBuffer);

または以下のようにバイト配列data に値を取り出すことができます。

byteBuffer.get(data);

4バイトのデータが取り出せたとすると以下のようになります。

f:id:Naotsugu:20200927193158p:plain

7バイトの値が格納され、そのうちの4バイトが取り出された状態になります。

既に取り出されたデータを切り詰める場合は以下のようにします。

byteBuffer.compact();

取り出されていないデータが先頭に移動して、以下のようになります。

f:id:Naotsugu:20200927193221p:plain

この状態でバッファに値を追記していくことができるようになります。

バッファの内容をクリアするには以下のようにします。

byteBuffer.clear();

バッファの状態は論理的に初期化された状態となります。

f:id:Naotsugu:20200927193103p:plain

このように、バッファは書き込みモードと読み込みモードを切り替えながら部分的な書き込みと読み込みを少ないメモリ領域で、不要なコピー操作無く処理できるようになっています。その代償として扱いにくいAPIとなっています。


I/O多重化を使ったサーバ実装

NIO によるサーバ実装の例を見てみましょう。先程の例と同様に、単純に送信内容をそのまま返却するエコーサーバを考えます。

セレクト結果を統一的に扱うためのハンドラインターフェースを以下のように定義しておきます。

public interface Handler {
    void handle(SelectionKey key) throws IOException;
}

ハンドラを SelectionKey のアタッチメントとして紐付け、各イベントのタイミングで取り出してコールすることにします。

セレクト処理のループは以下のようになります。

serverSocketChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
while (selector.keys().size() > 0) {
    int count = selector.select(10 * 1000);
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    synchronized (selectedKeys) {
        Iterator<SelectionKey> iterator = selectedKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            if (!key.isValid()) {
                continue;
            }
            Handler handler = (Handler) key.attachment();
            handler.handle(key);
        }
    }
}

セレクトされた結果から Handler handler = (Handler) key.attachment(); のようにハンドラを取り出して実行するようにしています。

全体としては以下のようになります。

public class NioEchoServer {

    private static final int BUFFER_SIZE = 128 * 1024;
    private final ServerSocketChannel serverSocketChannel;

    public NioEchoServer() {
        try {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, BUFFER_SIZE);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void start(int port) {
        try {
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
            while (selector.keys().size() > 0) {
                int count = selector.select(10 * 1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                synchronized (selectedKeys) {
                    Iterator<SelectionKey> iterator = selectedKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (!key.isValid()) {
                            continue;
                        }
                        Handler handler = (Handler) key.attachment();
                        handler.handle(key);
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    interface Handler {
        void handle(SelectionKey key) throws IOException;
    }

    class AcceptHandler implements Handler {

        @Override
        public void handle(SelectionKey key) throws IOException {
            if (!key.isAcceptable()) {
                return;
            }
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel == null) {
                return;
            }
            System.out.println("connection accepted: " + socketChannel);
            socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, BUFFER_SIZE);
            socketChannel.configureBlocking(false);
            socketChannel.register(key.selector(),
                    SelectionKey.OP_READ,
                    new RequestHandler());
        }
    }

    class RequestHandler implements Handler {
        private final ByteBuffer buffer;

        public RequestHandler() {
            this.buffer = ByteBuffer.allocate(BUFFER_SIZE);
        }

        public RequestHandler(ByteBuffer buffer) {
            this.buffer = buffer;
        }

        @Override
        public void handle(SelectionKey key) throws IOException {
            if (!key.isReadable()) {
                return;
            }
            SocketChannel socketChannel = (SocketChannel) key.channel();
            if (socketChannel == null) {
                return;
            }
            int count = socketChannel.read(buffer);
            if (count == -1) {
                socketChannel.close();
            }
            if (count > 0) {
                socketChannel.register(key.selector(),
                        SelectionKey.OP_WRITE,
                        new ResponseHandler(buffer));
            }
        }
    }


    class ResponseHandler implements Handler {

        private final ByteBuffer buffer;

        public ResponseHandler(ByteBuffer buffer) {
            this.buffer = buffer;
        }

        @Override
        public void handle(SelectionKey key) throws IOException {
            if (!key.isWritable()) {
                return;
            }
            SocketChannel socketChannel = (SocketChannel) key.channel();
            if (socketChannel == null) {
                return;
            }

            buffer.flip();
            socketChannel.write(buffer);
            boolean hasRemaining = buffer.hasRemaining();
            buffer.compact();

            if (hasRemaining) {
                key.interestOpsOr(SelectionKey.OP_WRITE);
            } else {
                key.interestOps(SelectionKey.OP_READ);
                key.attach(new RequestHandler(buffer));
            }
        }
    }

    public static void main(String[] args) {
        new NioEchoServer().start(9000);
    }

}

AcceptHandler で接続時の処理を行い、RequestHandler を呼ぶことで送信データの読み込みを行います。 読み込んだ結果は ResponseHandler によりレスポンスとして書き込みを行っています。

ソケットを使った単純な構造と比べて、部分的な読み取りや書き込みが行われるため複雑で扱いにくいコードになっています。 さらに、上記では扱っていませんが、アイドルタイムアウトを扱いたい場合は、ブロッキングI/Oではブロック時間のタイムアウトを SO_TIMEOUT として扱うことができましたが、セレクタを使った場合はそもそもブロックしないため、チャネルに対する最後の操作時間からの経過を監視してセレクタの登録から取り除く必要があります。 また、通常はCPUのコア数に応じた適切な数のワーカースレッドで並列化させる必要があったりするため、コードはより扱いにくいものになります。

セレクタを使った場合に比べて抽象度を上げた、(多少)扱いやすいAPIが、JDK1.6 で提供された NIO2 です。


非同期チャネル

JDK 1.5 では Future が追加されました。Future は将来の計算結果を表します。計算処理を Future としてカプセル化されたタスクとして扱うことで、(別のスレッドで実行される)タスクの完了を待つ間に他の処理を進めることができます。

NIO2 では、I/O処理を Future として扱えるように API の拡張が行われています。 AsynchronousServerSocketChannelAsynchronousSocketChannel がそれです。

AsynchronousServerSocketChannel は以下のように扱います。

AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
asyncServerSocketChannel.bind(new InetSocketAddress(9000));
Future<AsynchronousSocketChannel> acceptResult = serverChannel.accept();

serverChannel.accept() の結果として Future が即座に返るので、現在のスレッドではその他の処理を継続できます。 任意のタイミングで acceptResult.get() とすることで AsynchronousSocketChannel を得ることができます。しかし acceptResult.get() は処理が完了していなければブロックするため注意してください。


AsynchronousSocketChannel は以下のように扱います。

AsynchronousSocketChannel asyncSocketChannel = acceptResult.get();

ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> readResult = asyncSocketChannel.read(buffer);

読み込みの結果もFuture が即座に返ります。write についても同様です。読み込みや書き込みは現在とは別のスレッドで処理が行われ、必要なタイミングで結果を得ることができます。


Future 自体は(CompletableFuture と違い) 計算の結果を次の計算に引き渡すといった Promise チェーンのような扱いができないため、accept -> read -> write のような順番でブロックなく非同期実行したい場合は、CompletionHandler を使ったコールバックを使うのが簡単です。

Future を返すメソッドの他に、CompletionHandler を受け取るメソッドがあります。 AsynchronousServerSocketChannel の場合は以下のようになります。

asyncServerSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
    @Override
    public void completed(AsynchronousSocketChannel asyncSocketChannel, Void attachment) { }
    @Override
    public void failed(Throwable e, Void attachment) { }
});

asyncServerSocketChannel.accept() の第1引数には、セレクタの時に見たアタッチメントとして任意オブジェクトを添付できます。コールバックの結果として completed() の第2引数に渡ってきます。

completed() では接続完了後の処理を書き、failed() では失敗時の処理を記載します。

AsynchronousSocketChannel の read と write にも同じようにコールバックを受け取るメソッドが定義されています。

asyncSocketChannel.read(buffer, attachment, new CompletionHandler<>() {
    @Override
    public void completed(Integer result, Object attachment) { }
    @Override
    public void failed(Throwable e, Object attachment) { }
});

第1引数には 読み込みや書き込みで使用する ByteBuffer、第2引数にはアタッチメントとして任意オブジェクトを添付できます。

コールバックを指定する際には、(NIO では簡単に対応することが難しかった)タイミアウトも指定することができます。

asyncSocketChannel.read(buffer, 10, TimeUnit.SECONDS, attachment,
        new CompletionHandler<>() {
            // ...
        }
);


AsynchronousServerSocketChannel 作成時に、ワーカースレッドのスレッドプールを指定することもできます。

AsynchronousChannelGroup group = AsynchronousChannelGroup
        .withThreadPool(Executors.newCachedThreadPool());
AsynchronousServerSocketChannel serverSocketChannel =
        AsynchronousServerSocketChannel.open(group);

NIO の時代には自前で行う必要のあった操作がAPIとして提供され、セレクタなどの低レベルの操作が隠蔽されているため、多少扱いやすくなっています。


NIO2 によるエコーサーバ

最後に、今まで見てきたエコーサーバを NIO2 で実装してみましょう。

CompletionHandler を使って素直に実装した場合は、非同期の実装で良くあるコールバック地獄になります。

ここでは、アタッチメントとしてチャネルに添付する Context を定義します。

class Context {
    final AsynchronousSocketChannel asyncSocketChannel;
    final ByteBuffer buffer;
    public Context(AsynchronousSocketChannel asyncSocketChannel, ByteBuffer buffer) {
        this.asyncSocketChannel = asyncSocketChannel;
        this.buffer = buffer;
    }
    public void close() {
        try {
            asyncSocketChannel.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

コールバック指定時にこの Context を同時にアタッチすることにします。


接続を受け付ける acceptHandler は以下の実装になります。

    private final CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler = new CompletionHandler<>() {

        @Override
        public void completed(AsynchronousSocketChannel asyncSocketChannel, Void attachment) {

            // accept the next connection
            asyncServerSocketChannel.accept(null, this);

            // handle this connection
            Context ctx = new Context(asyncSocketChannel, ByteBuffer.allocate(1024));
            asyncSocketChannel.read(ctx.buffer, ctx, requestHandler);
        }

        @Override
        public void failed(Throwable e, Void attachment) {
            e.printStackTrace();
        }
    };

Context をインスタンス化して asyncSocketChannel.read() のコールバック requestHandler を登録しています。

requestHandler は以下のようになります。

    private final CompletionHandler<Integer, Context> requestHandler = new CompletionHandler<>() {
        @Override
        public void completed(Integer result, Context ctx) {
            if (result == -1) {
                ctx.close();
                return;
            }
            System.out.println("read " + result + " bytes.");
            ctx.buffer.flip();
            ctx.asyncSocketChannel.write(ctx.buffer, ctx, responseHandler);
        }

        @Override
        public void failed(Throwable e, Context ctx) {
            e.printStackTrace();
            ctx.close();
        }
    };

Context のバッファに読み込み、responseHandler を登録しているだけです。

responseHandler は以下のようになります。

    private final CompletionHandler<Integer, Context> responseHandler = new CompletionHandler<>() {
        @Override
        public void completed(Integer result, Context ctx) {
            System.out.println("write " + result + " bytes.");
            boolean hasRemaining = ctx.buffer.hasRemaining();
            ctx.buffer.compact();
            if (hasRemaining) {
                ctx.asyncSocketChannel.write(ctx.buffer, ctx, responseHandler);
            } else {
                ctx.asyncSocketChannel.read(ctx.buffer, ctx, requestHandler);
            }
        }

        @Override
        public void failed(Throwable e, Context ctx) {
            e.printStackTrace();
            ctx.close();
        }
    };

サーバを以下のように開始すれば完了です。

    public void start(int port) {
        try {
            asyncServerSocketChannel.bind(new InetSocketAddress(port));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        asyncServerSocketChannel.accept(null, acceptHandler);
    }

注意点としては、asyncServerSocketChannel は別スレッドで非同期に実行されるため、現在のスレッドが終了しないようにする必要があります。

全体としては以下のような実装になります。

public class Nio2EchoServer {

    private final AsynchronousServerSocketChannel asyncServerSocketChannel;

    public Nio2EchoServer() {
        try {
            this.asyncServerSocketChannel = AsynchronousServerSocketChannel.open();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void start(int port) {
        try {
            asyncServerSocketChannel.bind(new InetSocketAddress(port));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        asyncServerSocketChannel.accept(null, acceptHandler);
    }

    private final CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler = new CompletionHandler<>() {

        @Override
        public void completed(AsynchronousSocketChannel asyncSocketChannel, Void attachment) {

            // accept the next connection
            asyncServerSocketChannel.accept(null, this);

            // handle this connection
            Context ctx = new Context(asyncSocketChannel, ByteBuffer.allocate(1024));
            asyncSocketChannel.read(ctx.buffer, ctx, requestHandler);
        }

        @Override
        public void failed(Throwable e, Void attachment) {
            e.printStackTrace();
        }
    };

    private final CompletionHandler<Integer, Context> requestHandler = new CompletionHandler<>() {
        @Override
        public void completed(Integer result, Context ctx) {
            if (result == -1) {
                ctx.close();
                return;
            }
            System.out.println("read " + result + " bytes.");
            ctx.buffer.flip();
            ctx.asyncSocketChannel.write(ctx.buffer, ctx, responseHandler);
        }

        @Override
        public void failed(Throwable e, Context ctx) {
            e.printStackTrace();
            ctx.close();
        }
    };

    private final CompletionHandler<Integer, Context> responseHandler = new CompletionHandler<>() {
        @Override
        public void completed(Integer result, Context ctx) {
            System.out.println("write " + result + " bytes.");
            boolean hasRemaining = ctx.buffer.hasRemaining();
            ctx.buffer.compact();
            if (hasRemaining) {
                ctx.asyncSocketChannel.write(ctx.buffer, ctx, responseHandler);
            } else {
                ctx.asyncSocketChannel.read(ctx.buffer, ctx, requestHandler);
            }
        }

        @Override
        public void failed(Throwable e, Context ctx) {
            e.printStackTrace();
            ctx.close();
        }
    };

    class Context {
        final AsynchronousSocketChannel asyncSocketChannel;
        final ByteBuffer buffer;
        public Context(AsynchronousSocketChannel asyncSocketChannel, ByteBuffer buffer) {
            this.asyncSocketChannel = asyncSocketChannel;
            this.buffer = buffer;
        }
        public void close() {
            try {
                asyncSocketChannel.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) {
        new Nio2EchoServer().start(9000);
        try (Scanner scanner = new Scanner(System.in)) {
            scanner.nextLine();
        }
    }

}


まとめ

Java によるネットワークプログラミングについて、OIO によるブロッキングI/Oの実装、NIO のノンブロッキングI/OとI/O多重化の実装、NIO2 の非同期チャネルによる実装を見てきました。

OIO では全てがシンプルでしたが、ノンブロッキングな並列化によるパフォーマンス向上と引き換えに、単純なエコーサーバでさえ実装は複雑になりました。ですので通常は Netty などのライブラリを利用するのが得策です。

NIO や NIO2 のような実装が有用なケースは単純な静的コンテンツを扱うWebサーバやプロキシサーバやチャットサーバといったケースになるでしょう。

通常のWebアプリケーションでは、データベースアクセスがブロックしてしまえばメリットが活かせなくなります。何か1つでもブロックする同期箇所があれば、そこに引きずられてしまうため、全てをノンブロッキングに並列化する必要があります。ですので、Asynchronous Database Access API であったり Reactive Relational Database Connectivity (R2DBC) などデータベース周りの対応も合わせて導入しないと大きなメリットは得られない点は覚えておく必要があります。