RxJava は ReactiveX (Reactive Extensions) の JVM 実装で、平たく言うと非同期のイベント処理を簡素に書ける、単なるライブラリです。
"Rx = Observables + LINQ + Schedulers" などと表現されます。なんですかこれ?大げさな抽象概念を掲げすぎて意味不明になる現象(モナド現象)になっていませんか?
Observable とは
Observable は発生順に並んだイベントのストリームです。
Java8 では、何らかの型Tの並びを Stream として表現し、この並びに対して関数的な操作を適用する API が用意されています。例えば Stream.map()
は関数 Function
を引数に取り、関数で変換された新しい Stream
を返します。戻り値として Stream
が得られるので、同じように操作を連結して定義していくことができます。
ReactiveX の Observable は Java で言うところの Stream です。違いは2点。
1点目:Observable は Push でイベントを通知する
Stream が Stream.collect()
などの終端処理でデータの並びに対してPull 形式で処理を行うのに対して、Observable はイベントの発生時に Push 形式でイベントを通知します。
発生したイベントを通知するには、通知のためのインターフェースが必要で、これが以下の Observer インターフェースです(観察と考えるより、単に通知と考えた方がわかりやすいです)。
public interface Observer<T> { public abstract void onNext(T t); public abstract void onCompleted(); public abstract void onError(Throwable e); }
Observable はこのインターフェースを使ってイベントの値、完了とエラーの通知を行います。
通知するイベントは、Java の Stream と同じように map したり filter したりして、最終的にイベントの通知先に届きます。イベントの通知の受信側は Observable.subscribe()
の引数として通知を受けた時の振る舞いを登録します。
2点目:Observable は 非同期のイベントを扱う
Observable は Java8 でいうところの CompletableFuture みたいなものです。
Java では非同期処理の結果を得る場合に、Future を使ってきました(js や Scala では Promise)。 Future を使うことで、時間のかかる計算処理が終了するまで画面描画がブロックするなどに対処することができました。Java8 では Future に対するサポートが強化された CompletableFuture が導入され、 Future を合成したり、CompletableFuture の合成パイプラインを作成することができます。
脱線しました。
Observable では非同期でイベントを通知(あるいは購読)したり、非同期でイベントをリスンしている Observable 同士を合成したりすることができます。
例えばイベントのリスンを非同期で行うには以下のように Schedulers
を利用します。
これにより observable は スレッドScheduler (別スレッド)で動作するようになります。
observable.subscribeOn(Schedulers.newThread())
Schedulers には各種スレッドの生成戦略が定義されており、この戦略の実態である Scheduler を Observable に渡すことでスレッド処理をカスタマイズできます。
異なるスレッド上の複数の observable から通知されたイベントを合成して、新しい observable を作成するなどが Observable のパイプラインとして定義できるのです。
ここまでをまとめると
- Observable は発生順に並んだイベントのストリーム(Observables)
- Observable には関数的な操作を適用する API が用意されている(LINQ)
- Scheduler により Observable のスレッド(非同期処理)を制御できる(Schedulers)
ということで、"Rx = Observables + LINQ + Schedulers" となります。
まずは簡単に どのように動くかを見てみる
簡単な例で RxJava を触ってみましょう。Java8 でストリームを扱うには Stream を生成しないと始まらないように、RxJava でも Observable を定義しないと何も始まりません。
まずは簡単に one, two, ・・ という文字を通知する Observable を生成してみます。
Observable<String> observable = Observable.create(observer -> { observer.onNext("one"); observer.onNext("two"); observer.onNext("three"); observer.onNext("four"); observer.onNext("five"); });
通常はクリックイベントに応じて、そのイベント値を通知するなどになると思いますが、ここでは単に文字列値を連続で通知しているだけです。
次にイベントの発生を通知してほしい人が Observer で、イベントが通知された場合に何を行うかを定義します。
Observer<String> observer = new Observer<String>() { @Override public void onNext(String o) { System.out.println(o); } @Override public void onCompleted() {} @Override public void onError(Throwable e) {} };
Gofのオブザーバパターンでは Observer は update() などのメソッドでイベントの発生通知を受け付けますが、少し拡張されていて onNext()
onCompleted()
onError()
という3つのメソッドがインターフェースに定義されています。
今回は onNext()
で通知された文字列をコンソール出力しているだけです。
最後に単なる文字列を通知するだけの observable へ、通知を受け取る observer を登録します。
observable.subscribe(observer);
subscribe メソッドにて observer を登録することで通知を受け取ります。この時コンソールには以下の出力が得られます。
one two three four five
イベントのストリームは関数型によくある map や filter などの十徳ナイフでイベントの加工やフィルタリングが行えます。
observable.map(String::toUpperCase) .filter(s -> s.contains("O")) .subscribe(observer);
UpperCase して "O" を含むものだけを取り出しています。この時コンソールには以下の出力が得られます。
ONE TWO FOUR
...以下の例と全く同じでですね。
Stream.of("one","two","three","four","five") .filter(s -> s.contains("O")) .forEach(System.out::println);
Hello World
もう少し RxJava を触っていってみましょう。
まずは build.gradle を準備します。
apply plugin: 'java' sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile 'io.reactivex:rxjava:1.0.9' }
簡単な Hello World
import rx.Observable; import java.util.Arrays; public class Main { public static void main(String...args) { Observable.from(Arrays.asList("Thom", "Johnny")) .subscribe(name -> System.out.println("Hello " + name + "!")); } }
Observable.from()
は Iterable を引数に Observable を生成できます。
Observable.subscribe
には Action1 という以下の関数を渡しています。
public interface Action1<T1> extends Action { public void call(T1 t1); }
単にイベント値を受け取って何かしらのアクションを実行するだけならObserver
の代わりに Action1
が使えます。
コンソールには以下が出力されます。
Hello Thom! Hello Johnny!
非同期 Observable
Observable.interval()
は別スレッドで指定時間のインターバル毎に通知する Observable を生成します。buffer()
は指定時間内に通知されたイベントを束ねてリストにまとめ上げます。
以下の例を試してみましょう。
public class Main { public static void main(String...args) throws Exception { Observable.interval(1L, TimeUnit.SECONDS, Schedulers.computation()) .buffer(3L, TimeUnit.SECONDS) .map(list -> Arrays.deepToString(list.toArray())) .subscribe(s -> System.out.println( Thread.currentThread().getName() + " " + s)); Thread.sleep(10000L); } }
実行すると以下のようにコンソール出力されます。
RxComputationThreadPool-1 [0, 1] RxComputationThreadPool-1 [2, 3, 4] RxComputationThreadPool-1 [5, 6, 7]
interval()
により1秒毎に 0, 1, 2 ・・・ と数字が通知されます。buffer()
では3秒分のイベントをバッファして、束ねています。最終的にスレッド名とともに3秒毎に束ねたイベントがコンソール出力されます。
JavaFXのUIイベントを扱う
手っ取り早く試せる JavaFX のイベントを RxJava で扱ってみます。あくまでサンプルです。
最初に build.gradle に JavaFX プラグインを入れておきます。 前述のものに1行目を加えただけです。
apply from: 'http://dl.bintray.com/content/shemnon/javafx-gradle/0.4.0/javafx.plugin' apply plugin: 'java' sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile 'io.reactivex:rxjava:1.0.9' }
手頃な例として、google の検索ワードのサジェストAPIを呼ぶメソッドを作っておきます。
private List<String> suggests(final String text) { try { URL url = new URL("http://google.co.jp/complete/search?output=toolbar&q=" + text); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("GET"); DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); Document document = builder.parse(connection.getInputStream()); NodeList nodeList = document.getElementsByTagName("CompleteSuggestion"); return IntStream.range(0, nodeList.getLength()) .mapToObj(nodeList::item) .map(node -> (Element) node.getFirstChild()) .map(elm -> elm.getAttribute("data")) .collect(Collectors.toList()); } catch (Exception ignore) {} return Collections.emptyList(); }
引数の文字列でAPIを呼んで、サジェスト一覧をリストで返すメソッドになります。
テキストボックス並べて入力内容に応じて先のサジェスト一覧をリスト表示します。
public class Main extends Application { @Override public void start(Stage primaryStage) throws Exception { primaryStage.setTitle("FxApp"); TextField textField = new TextField(); ListView<String> listView = new ListView<>(); listView.setMaxHeight(200); Observable<String> stream = Observable.create(observer -> textField.textProperty().addListener((observable, oldValue, newValue) -> observer.onNext(newValue))); Action1<String> action = t -> { List<String> su = suggests(t); Platform.runLater(() -> { listView.getItems().clear(); listView.getItems().addAll(FXCollections.observableList(su)); }); }; stream.observeOn(Schedulers.newThread()) .skip(300, TimeUnit.MICROSECONDS) .subscribe(action); primaryStage.setScene(new Scene(new VBox(textField, listView), 300, 250)); primaryStage.show(); } public static void main(String[] args) { launch(args); } }
Observable
はテキストフィールドに変更があったタイミングで、変更後のテキストを通知します。
テキスト値の通知を受けた場合に ListView にサジェスト結果を反映する Action1
を定義します。なお、JavaFX の UI はFxスレッド上で変更する必要があるため、リストの更新処理はPlatform.runLater
で行います。
最後にこれらを繋ぐのが以下のコードです。
stream.observeOn(Schedulers.newThread())
.skip(300, TimeUnit.MICROSECONDS)
.subscribe(action);
キー入力の度にサジェストAPIを呼ぶのではなく、300ms 待機するため skip 処理を入れています。
observeOn() にて観察処理、つまり Action1
の処理は別スレッドで非同期実行されます。
gradlew run
で実行すると以下のようになります。
文字を入力するとイベントが発生し、サジェスト結果が反映されます。
なお、JavaFX で Rx 使う場合には以下のようなポーティングを使った方がよいです。
以上簡単でしたが、なんとなく Rx のイメージが伝わったでしょうか。。