Java Stream API を高みに導くStream Gatherer

JDK 22 でプレビュー公開(https://openjdk.org/jeps/461)、JDK 23 でセカンドプレビュー公開(https://openjdk.org/jeps/473) となる Stream Gatherers を学び、正式公開に備えましょう。


はじめに

Stream Gatherers とは、Stream パイプラインの中間操作に対する拡張ポイントを提供するものです。

Stream パイプラインは以下のように構築されます。

long numberOfWords =
    Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog")  // (1)
          .filter(Predicate.not(String::isEmpty))                   // (2)
          .collect(Collectors.counting());                          // (3)
  • (1) ストリームを作成。評価は行わない
  • (2) 中間操作を設定。評価は行わない
  • (3) 終端操作を設定しパイプライン全体を評価

Stream Gatherers は、この中間操作に対して適用するものです。 終端操作に Collectors があるのと同じように、中間操作に Gatherers があります。

旧来の中間操作では、ストリームの要素1つ1つに対してしか操作ができませんでしたが Gatherers を使うことでストリームの要素に対する集約操作が可能になります(1対1、1対多、多対1、多対多に要素を変換できます)。

例えば、Gatherers.windowFixed() では、要素を指定個数にグルーピングする中間操作が定義できます。

List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
        .gather(Gatherers.windowFixed(3))
        .toList();
// [[1, 2, 3], [4, 5, 6], [7, 8]]


Gatherer

Stream インターフェースに Gatherer を引数に取る gather() が追加されます。

public interface Stream<T> extends BaseStream<T, Stream<T>> {

    default <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer) {
        return StreamSupport.stream(spliterator(), isParallel())
                            .gather(gatherer)
                            .onClose(this::close);
    }
    

Gatherer は以下のようなインターフェース定義となっています。

public interface Gatherer<T, A, R> {
    default Supplier<A> initializer() { return defaultInitializer(); };
    Integrator<A, T, R> integrator();
    default BinaryOperator<A> combiner() { return defaultCombiner(); }
    default BiConsumer<A, Downstream<? super R>> finisher() { return defaultFinisher(); }
    // ...
}
  • initializer() ストリーム要素の処理中にプライベート状態を維持するオブジェクト
  • integrator() 入力ストリームから新しい要素を統合(場合によってはプライベート状態オブジェクトを検査)(場合によっては要素を出力ストリームに出力)
  • combiner() 入力ストリームが並列で評価された場合の各並列値の結合関数
  • finisher() 消費する入力要素がなくなったときに呼び出されるフィニッシャー(この関数はプライベート状態オブジェクトを検査し、場合によっては追加の出力要素を出力できる)

Gatherer は以下のように積み重ねることもできますし、

source.gather(a).gather(b).gather(c).collect(...)

andThen() により結合することもできます。

source.gather(a.andThen(b).andThen(c)).collect(...)


組み込み Gatherer

collect に対する Collectors と同様に、gather に対する Gatherers が提供されています。

Gatherers.windowFixed

入力要素を指定されたサイズのリストにグループ化し、ウィンドウがいっぱいになったときに下流にウィンドウを発行するステートフルな多対多ギャザラーです。

List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
        .gather(Gatherers.windowFixed(3))
        .toList(); // [[1, 2, 3], [4, 5, 6], [7, 8]]

Gatherers.windowSliding

入力要素を指定されたサイズのリストにグループ化するステートフルな多対多のギャザラーです。 最初のウィンドウの後、後続の各ウィンドウは、最初の要素を削除し、入力ストリームから次の要素を追加することによって、前のウィンドウのコピーから作成されます。

List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
        .gather(Gatherers.windowSliding(2))
        .toList(); // [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
        .gather(Gatherers.windowSliding(6))
        .toList(); // [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]

Gatherers.fold

集約を段階的に構築し、入力要素が存在しなくなったときにその集約を発行するステートフルな多対 1 ギャザラーです。

Optional<String> numberString = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .gather(
            Gatherers.fold(() -> "", (string, number) -> string + number)
        )
        .findFirst(); // Optional["123456789"]

第一引数には初期値を提供するサプライヤーを指定します。

Gatherers.scan

指定された関数を現在の状態と現在の要素に適用して次の要素を生成し、それを下流に渡すステートフルな1対1 ギャザラーです。

List<String> numberStrings = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .gather(
              Gatherers.scan(() -> "", (string, number) -> string + number)
        )
        .toList(); // ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]

第一引数には初期値を提供するサプライヤーを指定します。

Gatherers.mapConcurrent

Virtual Thread を使用して、最大同時実行数の固定ウィンドウで操作を同時に実行するステートフルな 1 対 1 ギャザラーです(ストリームの順序が保証されます)。

List<Integer> numbers = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .gather(Gatherers.mapConcurrent(4, a -> a * 2))
        .toList(); // [2, 4, 6, 8, 10, 12, 14, 16, 18]

第一引数には最大並列数を指定します。

Gatherer.ofSequential

Gatherer.ofSequential() により独自にギャザラーを定義できます。

例えば、Gatherers.windowFixed() と同じようなギャザラーは以下のように生成できます。

static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {

    return Gatherer.ofSequential(

        // The initializer
        () -> new ArrayList<TR>(windowSize),

        // The integrator
        Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
            window.add(element);
            if (window.size() < windowSize) return true;
            var result = new ArrayList<TR>(window);
            window.clear();
            return downstream.push(result);

        }),

        // The finisher
        (window, downstream) -> {
            if (!downstream.isRejecting() && !window.isEmpty()) {
                downstream.push(new ArrayList<TR>(window));
                window.clear();
            }
        }
    );
}


まとめ

プレビュー公開の Stream Gatherer について紹介しました。

JDK 24 で正式公開になると思うので、キャッチアップしておきましょう。