JDK 22 でプレビュー公開(https://openjdk.org/jeps/461)、JDK 23 でセカンドプレビュー公開(https://openjdk.org/jeps/473) となる Stream Gatherers を学び、正式公開に備えましょう。
はじめに
Stream Gatherers とは、Stream パイプラインの中間操作に対する拡張ポイントを提供するものです。
Stream パイプラインは以下のように構築されます。
long numberOfWords = Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1) .filter(Predicate.not(String::isEmpty)) // (2) .collect(Collectors.counting()); // (3)
(1)
ストリームを作成。評価は行わない(2)
中間操作を設定。評価は行わない(3)
終端操作を設定しパイプライン全体を評価
Stream Gatherers は、この中間操作に対して適用するものです。 終端操作に Collectors があるのと同じように、中間操作に Gatherers があります。
旧来の中間操作では、ストリームの要素1つ1つに対してしか操作ができませんでしたが Gatherers を使うことでストリームの要素に対する集約操作が可能になります(1対1、1対多、多対1、多対多に要素を変換できます)。
例えば、Gatherers.windowFixed()
では、要素を指定個数にグルーピングする中間操作が定義できます。
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8) .gather(Gatherers.windowFixed(3)) .toList(); // [[1, 2, 3], [4, 5, 6], [7, 8]]
Gatherer
Stream
インターフェースに Gatherer
を引数に取る gather()
が追加されます。
public interface Stream<T> extends BaseStream<T, Stream<T>> { default <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer) { return StreamSupport.stream(spliterator(), isParallel()) .gather(gatherer) .onClose(this::close); }
Gatherer
は以下のようなインターフェース定義となっています。
public interface Gatherer<T, A, R> { default Supplier<A> initializer() { return defaultInitializer(); }; Integrator<A, T, R> integrator(); default BinaryOperator<A> combiner() { return defaultCombiner(); } default BiConsumer<A, Downstream<? super R>> finisher() { return defaultFinisher(); } // ... }
initializer()
ストリーム要素の処理中にプライベート状態を維持するオブジェクトintegrator()
入力ストリームから新しい要素を統合(場合によってはプライベート状態オブジェクトを検査)(場合によっては要素を出力ストリームに出力)combiner()
入力ストリームが並列で評価された場合の各並列値の結合関数finisher()
消費する入力要素がなくなったときに呼び出されるフィニッシャー(この関数はプライベート状態オブジェクトを検査し、場合によっては追加の出力要素を出力できる)
Gatherer
は以下のように積み重ねることもできますし、
source.gather(a).gather(b).gather(c).collect(...)
andThen()
により結合することもできます。
source.gather(a.andThen(b).andThen(c)).collect(...)
組み込み Gatherer
collect
に対する Collectors
と同様に、gather
に対する Gatherers
が提供されています。
Gatherers.windowFixed
入力要素を指定されたサイズのリストにグループ化し、ウィンドウがいっぱいになったときに下流にウィンドウを発行するステートフルな多対多ギャザラーです。
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8) .gather(Gatherers.windowFixed(3)) .toList(); // [[1, 2, 3], [4, 5, 6], [7, 8]]
Gatherers.windowSliding
入力要素を指定されたサイズのリストにグループ化するステートフルな多対多のギャザラーです。 最初のウィンドウの後、後続の各ウィンドウは、最初の要素を削除し、入力ストリームから次の要素を追加することによって、前のウィンドウのコピーから作成されます。
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8) .gather(Gatherers.windowSliding(2)) .toList(); // [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8) .gather(Gatherers.windowSliding(6)) .toList(); // [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
Gatherers.fold
集約を段階的に構築し、入力要素が存在しなくなったときにその集約を発行するステートフルな多対 1 ギャザラーです。
Optional<String> numberString = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9) .gather( Gatherers.fold(() -> "", (string, number) -> string + number) ) .findFirst(); // Optional["123456789"]
第一引数には初期値を提供するサプライヤーを指定します。
Gatherers.scan
指定された関数を現在の状態と現在の要素に適用して次の要素を生成し、それを下流に渡すステートフルな1対1 ギャザラーです。
List<String> numberStrings = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9) .gather( Gatherers.scan(() -> "", (string, number) -> string + number) ) .toList(); // ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]
第一引数には初期値を提供するサプライヤーを指定します。
Gatherers.mapConcurrent
Virtual Thread を使用して、最大同時実行数の固定ウィンドウで操作を同時に実行するステートフルな 1 対 1 ギャザラーです(ストリームの順序が保証されます)。
List<Integer> numbers = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9) .gather(Gatherers.mapConcurrent(4, a -> a * 2)) .toList(); // [2, 4, 6, 8, 10, 12, 14, 16, 18]
第一引数には最大並列数を指定します。
Gatherer.ofSequential
Gatherer.ofSequential()
により独自にギャザラーを定義できます。
例えば、Gatherers.windowFixed()
と同じようなギャザラーは以下のように生成できます。
static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) { return Gatherer.ofSequential( // The initializer () -> new ArrayList<TR>(windowSize), // The integrator Gatherer.Integrator.ofGreedy((window, element, downstream) -> { window.add(element); if (window.size() < windowSize) return true; var result = new ArrayList<TR>(window); window.clear(); return downstream.push(result); }), // The finisher (window, downstream) -> { if (!downstream.isRejecting() && !window.isEmpty()) { downstream.push(new ArrayList<TR>(window)); window.clear(); } } ); }
まとめ
プレビュー公開の Stream Gatherer について紹介しました。
JDK 24 で正式公開になると思うので、キャッチアップしておきましょう。