Skip to content

ChunkUntil

1. 概要

ChunkUntil はソース Observable の要素をバッファリングし、述語(predicate)が true を返した時点でチャンクを区切って配列 T[] として発行する演算子です。述語が true を返した要素自体もチャンクに 含まれます

2. シグネチャ

値ベースの述語 — ChunkUntil(predicate)

csharp
public static Observable<T[]> ChunkUntil<T>(this Observable<T> source, Func<T, Boolean> predicate)

各要素に対して predicate を評価し、true が返された時点でバッファを発行します。true を返した要素はそのチャンクの最後の要素になります。

csharp
source.ChunkUntil(x => x % 3 == 0)

インデックス付き述語 — ChunkUntil(predicate with index)

csharp
public static Observable<T[]> ChunkUntil<T>(this Observable<T> source, Func<T, Int32, Boolean> predicate)

述語に要素のインデックス(0 始まり)が追加で渡されます。インデックスはソース全体での通し番号です(チャンクごとにリセットされません)。

csharp
source.ChunkUntil((value, index) => index % 4 == 3)

3. マーブルダイアグラム

ChunkUntil のマーブルダイアグラム

述語が true を返した要素でチャンクが区切られます。true を返した要素はそのチャンクの末尾に含まれます。

4. サンプルコード

値に基づくチャンク区切り

csharp
// 改行文字でチャンクを区切る(行単位のバッファリング)
charObservable
    .ChunkUntil(c => c == '\n')
    .Subscribe(line =>
    {
        Console.WriteLine(new string(line));
    });

条件に基づくバッチ処理

csharp
// 区切り記号(0)が来るまでバッファリング
new[] { 1, 2, 3, 0, 4, 5, 0, 6 }.ToObservable()
    .ChunkUntil(x => x == 0)
    .Subscribe(chunk =>
    {
        Console.WriteLine($"[{string.Join(", ", chunk)}]");
    });

// 出力:
// [1, 2, 3, 0]
// [4, 5, 0]
// [6]          ← ソース完了時の残余

インデックス付き述語

csharp
// 4 要素ごとにチャンクを区切る(インデックスベース)
source
    .ChunkUntil((value, index) => index % 4 == 3)
    .Subscribe(chunk =>
    {
        Console.WriteLine($"Chunk: [{string.Join(", ", chunk)}]");
    });

5. 補足

  • Chunk(count) が固定サイズでバッファを区切るのに対し、ChunkUntil は要素の値に基づいて動的にバッファを区切ります。
  • 述語が true を返した要素はチャンクに 含まれる 点に注意してください。「その要素から次のチャンクを開始したい」場合は、述語のロジックを調整する必要があります。
  • Chunk(windowBoundaries) が外部 Observable でチャンクを区切るのに対し、ChunkUntil はソース自身の要素で区切りを判断します。