JDK 24 で追加された Stream Gatherers 用ユーティリティライブラリ Gatherers4j


Stream Gatherers

JDK 24 で追加された JEP 485: Stream Gatherersは、Stream の中間操作をカスタマイズできます。

JDK で提供される Gatherers には windowFixed, windowSliding, fold, scan, mapConcurrent があり、例えば windowFixed は以下のように使うことができます。

List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
        .gather(Gatherers.windowFixed(3))
        .toList();
// [[1, 2, 3], [4, 5, 6], [7, 8]]

詳細は以下を参照してください。

https://blog1.mammb.com/entry/2024/04/16/203058


Gatherers4j とは

Gatherers4j は、Stream Gatherers 用の様々な中間操作を提供するライブラリです。

以下の依存で導入できます。

implementation("com.ginsberg:gatherers4j:0.11.0")

中間操作は、Gatherers4j クラスのスタティックメソッドで提供されます。

Stream.of("A", "B", "C", "D", "E")
    .gather(Gatherers4j.takeLast(3))
    .toList(); // [ "C", "D", "E" ]

ライブラリを導入しなくても、Stream Gatherers でどんなことが出来るかを見ておくことには意味があると思うので、ここに紹介しておきます。


シーケンス操作

結合操作

いわゆる zip 操作は、zipWith() で提供されています。 自身のストリームの隣接要素間で zip するには zipWithNext() が使えます。

Stream.of("A", "B", "C")
    .gather(Gatherers4j.zipWith(List.of(1, 2, 3)))
    .toList();
//  [ Pair("A", 1), Pair("B", 2), Pair("C", 3) ]
Stream.of("A", "B", "C", "D")
    .gather(Gatherers4j.zipWithNext())
    .toList();
// [ ["A", "B"], ["B", "C"], ["C", "D"] ]

interleaveWith() は他のストリームなどから交互に要素を取得します。

final Stream<String> left = Stream.of("A", "B", "C");
final Stream<String> right = Stream.of("D", "E", "F", "G", "H");

left.gather(Gatherers4j.interleaveWith(right).appendLonger())
    .toList();
// "A", "D", "B", "E", "C", "F", "G", "H"

crossWith() では直積を取得できます。

Stream.of("A", "B", "C")
    .gather(Gatherers4j.crossWith(List.of(1, 2, 3)))
    .toList();
// [
//    Pair("A", 1), Pair("A", 2), Pair("A", 3),
//    Pair("B", 1), Pair("B", 2), Pair("B", 3),
//    Pair("C", 1), Pair("C", 2), Pair("C", 3)
// ]

インデックス付与

withIndex() ではインデックスを付与したストリームを得ることができます。

Stream.of("A", "B", "C")
    .gather(Gatherers4j.withIndex())
    .toList();
// [ WithIndex(0, "A"), WithIndex(1, "B"), WithIndex(2, "C") ]

mapIndexed() インデックス付きのマップ

Stream.of("A", "B", "C", "D")
    .gather(
        Gatherers4j.mapIndexed(
            (index, element) -> element + index
        )
     )
     .toList();
// [A0 B1 C2 D3]

foldIndexed() インデックス付きの畳み込み

Stream.of("A", "B", "C", "D")
    .gather(
        Gatherers4j.foldIndexed(
            () -> "", // initialValue
            (index, carry, next) -> carry + String.format("%s%d", next, index)
        )
     )
     .forEach(System.out::println);
// A0B1C2D3

peekIndexed() インデックス値確認

Stream.of("A", "B", "C", "D")
    .gather(
        Gatherers4j.peekIndexed(
            (index, element) -> System.out.println("Element " + element + " at index " + index)
        )
     )
     .toList();
// Returns: [A B C D]
// Prints:
//   Element A at index 0
//   Element B at index 1
//   Element C at index 2
//   Element D at index 3

scanIndexed() インデックス付きスキャン

Stream.of("A", "B", "C")
    .gather(
        Gatherers4j.scanIndexed(
            () -> "",
            (index, carry, next) -> carry + next + index
        )
    )
    .toList();
// [ "A0", "A0B1", "A0B1C2" ]

並び替え

各種並び替えの中間操作が提供されています。 ただし、並び替えは全要素を扱うため、要素を順次処理するというストリームの思想から外れるため、中間操作として行うのはやめておいた方が良いと思います。

reverse() 逆順

Stream.of("A", "B", "C")
    .gather(Gatherers4j.reverse())
    .toList();
// [ "C", "B", "A" ]

shuffle() 要素をシャッフル

Stream.of("A", "B", "C", "D", "E");
    .gather(Gatherers4j.shuffle())
    .toList();
// [ "D", "A", "B", "C", "E" ]

rotate() ローテート

Stream.of("A", "B", "C", "D", "E")
    .gather(Gatherers4j.rotate(Rotate.Left, 2))
    .toList();
// ["C", "D", "E", "A", "B"]

繰り返し

repeat() で指定回数繰り返し、repeatInfinitely() では無限繰り返しが可能です。

Stream.of("A", "B", "C")
    .gather(Gatherers4j.repeat(3))
    .toList();
// [ "A", "B", "C", "A", "B", "C", "A", "B", "C" ]

Stream.of("A", "B", "C")
    .gather(Gatherers4j.repeatInfinitely())
    .toList();
// [ "A", "B", "C", "A", "B", "C", "A", "B", "C" ...]

スロットル

throttle() はストリーム要素数を期間ごとに制限します。

以下は、100ミリ秒の間に2つの要素を通過させます。 次要素の取得はブロックされます。次要素の取得を切り捨てる場合は、後述の debounce() を使います。

IntStream.range(1, 7).boxed()
    .gather(Gatherers4j.throttle(2, Duration.ofMillis(100)))
    ...


フィルタリング/選択

debounce

debounce() 指定時間内に通過する要素を制限

以下の例では、50ミリ秒で2要素のみ通過させ、他は切り捨てます。

long start = System.currentTimeMillis();

IntStream
    .range(1, 10_000_000)
    .boxed()
    .gather(Gatherers4j.debounce(2, Duration.ofMillis(50)))
    .map(it -> new Pair<>(it, System.currentTimeMillis() - start))
    .forEach(System.out::println);
// Pair[first=1, second=12]
// Pair[first=2, second=15]
// Pair[first=8915384, second=62]
// Pair[first=8915385, second=62]

重複削除

distinctBy() 重複削除

Stream.of(
        new Person("Todd", "Ginsberg"),
        new Person("Emma", "Ginsberg"),
        new Person("Todd", "Smith")
    )
    .gather(Gatherers4j.distinctBy(Person::firstName))
    .toList();
// [Person("Todd", "Ginsberg"), Person("Emma", "Ginsberg")]

dedupeConsecutive() 連続する重複削除

Stream.of("A", "A", "A", "B", "B", "C", "C", "D", "A", "B", "C")
    .gather(Gatherers4j.dedupeConsecutive())
    .toList();
// ["A", "B", "C", "D", "A", "B", "C"]

dedupeConsecutiveBy() 評価関数を指定して連続する重複削除

record Person(String firstName, String lastName) {}

Stream.of(
        new Person("Todd", "Ginsberg"),
        new Person("Todd", "Smith"),
        new Person("Emma", "Ginsberg")
    )
    .gather(Gatherers4j.dedupeConsecutiveBy(Person::firstName))
    .toList();
// [Person("Todd", "Ginsberg"), Person("Emma", "Ginsberg")]

フィルタ

filterIndexed() インデックス付きフィルタ

Stream.of("A", "B", "C", "T")
    .gather(Gatherers4j.filterIndexed((index, element) -> index % 2 == 0 || element.equals("T")))
    .toList();
// ["A", "C", "T"]

filterInstanceOf() instanceOf フィルタ

Stream.of((byte)1, (short)2, 3, (long)4, 1.0, 1.0d)
    .gather(Gatherers4j.filterInstanceOf(Integer.class, Short.class))
    .toList();
// [2, 3]

filterOrdered() filterOrderedBy() は並び順の条件が満たされるものを通過させます。

Stream.of(1, 2, 2, 3, 2)
    .gather(Gatherers4j.filterOrdered(Order.AscendingOrEqual))
    .toList();
// [1, 2, 2, 3]

Stream.of("AAA", "AA", "AA", "AAA", "A")
    .gather(Gatherers4j.filterOrderedBy(Order.Descending, Comparator.comparingInt(String::length)))
    .toList();
// ["AAA", "AA", "A"];

takeEveryNth() n番目の要素毎に選択

Stream.of("A", "B", "C", "D", "E", "F", "G")
    .gather(Gatherers4j.takeEveryNth(3))
    .toList();
// ["A", "D", "G"]

dropEveryNth() n番目の要素毎に削除

Stream.of("A", "B", "C", "D", "E", "F", "G")
    .gather(Gatherers4j.dropEveryNth(3))
    .toList();
// ["B", "C", "E", "F"]

takeLast() 末尾のn個を選択

Stream.of("A", "B", "C", "D", "E", "F", "G")
    .gather(Gatherers4j.takeLast(3))
    .toList();
// ["E", "F", "G"]

dropLast() 末尾のn個を削除

Stream.of("A", "B", "C", "D", "E")
    .gather(Gatherers4j.dropLast(2))
    .toList();
// ["A", "B", "C"]

takeUntil() 条件を満たすまで選択

Stream.of("A", "B", "C", "D", "E", "F", "G")
    .gather(Gatherers4j.takeUntil(it -> it.equals("C")))
    .toList()
// ["A", "B", "C"]

sampleFixedSize() は、固定要素をランダムに選択します。

Stream.of("A", "B", "C", "D", "E")
    .gather(Gatherers4j.sampleFixedSize(2))
    .toList();
// Possibly: ["A", "D"]

samplePercentage() は、パーセンテージを指定してランダムに選択します。

Stream.of("A", "B", "C", "D", "E")
    .gather(Gatherers4j.samplePercentage(0.4))
    .toList();
// Possibly: ["A", "D"]
// Possibly: ["A"]
// Possibly: ["A", "D", "E"]

uniquelyOccurring() はユニークな出現を選択します。

Stream.of("A", "B", "C", "A")
    .gather(Gatherers4j.uniquelyOccurring())
    .toList();
// ["B", "C"]


Grouping と Windowing

group() は隣接する同じ要素をグルーピングします。

groupBy() では比較関数を指定できます。

Stream.of("A", "A", "BB", "BB", "CCC", "A")
    .gather(Gatherers4j.group())
    .toList();
// [ ["A", "A"], ["BB", "BB"], ["CCC"], ["A"] ]


Stream.of("A", "B", "C", "BB", "BBB", "C", "DD", "DD")
    .gather(Gatherers4j.groupBy(String::length))
    .toList();
// [ ["A", "B", "C"], ["BB"], ["BBB"], ["C"], ["DD", "DD"] ]

groupOrdered() は並び順毎にグルーピングします。

groupOrderedBy() は比較関数を指定できます。

Stream.of(3, 2, 1, 2, 2, 1)
    .gather(Gatherers4j.groupOrdered(Order.Descending))
    .toList();
// [ [3, 2, 1], [2], [2, 1] ]

Stream.of("AAA", "AA", "A", "AA", "AA", "A")
    .gather(Gatherers4j
        .groupOrderedBy(
            Order.Descending, 
            Comparator.comparingInt(String::length)
        )
    )
    .toList();
// [ ["AAA", "AA", "A"], ["AA"], ["AA", "A"] ]

window() ウインドウサイズとスライド数を指定して要素をグルーピングします。

Stream.of("A", "B", "C", "D", "E", "F", "G")
    .gather(Gatherers4j.window(2, 2, false))
    .toList();
// [ ["A", "B"], ["C", "D"], ["E", "F"] ]

Stream.of("A", "B", "C", "D", "E", "F", "G");
    .gather(Gatherers4j.window(2, 3, true))
    .toList();
// [ ["A", "B"], ["D", "E"], ["G"] ]


算術操作

算術操作を行う Gatherer には、メソッド名に runningmoving が付くものがあります。 running は要素を1つづつ累積して計算し、moving は、ウインドウサイズの範囲で計算するものになっています。

合計を行う runningSum()movingSum() の例は以下のようになります。

Stream.of("1.0", "2.0", "10.0", "2.0")
    .map(BigDecimal::new)
    .gather(Gatherers4j.runningSum())
    .toList();
// [ 
//   BigDecimal("1.0"), 
//   BigDecimal("3.0"), 
//   BigDecimal("13.0")
//   BigDecimal("15.0")
// ]
Stream.of("1.0", "2.0", "10.0", "2.0")
    .map(BigDecimal::new)
    .gather(Gatherers4j.movingSum(3))
    .toList();
// [ 
//   BigDecimal("13.0"), 
//   BigDecimal("14.0") 
// ]

また、xxxBy と付くものは、計算する値を抽出する関数を指定できるものになっています。

Stream.of(
        new NamedValue("first",  new BigDecimal("1.1")),
        new NamedValue("second", new BigDecimal("2.2")),
        new NamedValue("third",  new BigDecimal("10.3"))
    )
    .gather(Gatherers4j.runningSumBy(NamedValue::value))
    .toList();
// [ 
//   BigDecimal("1.1"), 
//   BigDecimal("3.3"),
//   BigDecimal("13.6") 
// ]

以下の関数が提供されています。

  • 合計
    • runningSum()
    • runningSumBy()
    • movingSum()
    • movingSumBy()
  • 乗算
    • runningProduct()
    • runningProductBy()
    • movingProduct()
    • movingProductBy()
  • 単純平均
    • simpleRunningAverage()
    • simpleRunningAverageBy()
    • simpleMovingAverage()
    • simpleMovingAverageBy()
  • 標準偏差
    • runningPopulationStandardDeviation()
    • runningPopulationStandardDeviationBy()
  • 標本標準偏差
    • runningSampleStandardDeviation()
    • runningSampleStandardDeviationBy()
  • 指数平滑移動平均
    • exponentialMovingAverageWithAlpha()
    • exponentialMovingAverageWithAlphaBy()
    • exponentialMovingAverageWithPeriod()
    • exponentialMovingAverageWithPeriodBy()


検証

ensureOrdered() 指定された順序になっているかを検証します(検証エラー時はIllegalStateException)

Stream.of(3, 2, 1)
    .gather(Gatherers4j.ensureOrdered(Order.Descending))
    .toList();
// [3, 2, 1]

ensureOrderedBy() 指定された順序になっているかを検証します(検証エラー時はIllegalStateException)

Stream.of("AAA", "BB", "C")
    .gather(Gatherers4j.ensureOrderedBy(Order.Descending, String::length))
    .toList();
// ["AAA", "BB", "C"]

ensureSize() 指定されたサイズになっているかを検証します(検証エラー時はIllegalStateException)

Stream.of("A", "B", "C")
    .gather(Gatherers4j.ensureSize(Size.Equals, 3))
    .toList();
// ["A", "B", "C"]