Skip to content

Chunk

1. 概要

Chunk はソース Observable の要素をバッファリングし、配列 T[] としてまとめて発行する演算子です。バッファの区切り方に応じて多数の overload が用意されています。

  • 個数ベース — 指定した個数に達するたびに発行
  • スライディングウィンドウcountskip を指定して重複やギャップのあるウィンドウを構成
  • 時間ベース — 一定時間ごとに発行
  • 時間+個数 — 時間または個数の どちらか先 に到達した時点で発行
  • 境界 Observable — 別の Observable が発火するたびに発行
  • 非同期ウィンドウ — 非同期タスクが完了するたびに発行

いずれの overload でも、空の配列は発行されません。ソースが完了した時点でバッファに残っている要素がある場合は、その残余が発行されます。

2. シグネチャ

個数ベース — Chunk(count)

csharp
public static Observable<T[]> Chunk<T>(this Observable<T> source, Int32 count)

要素を count 個ずつまとめて配列として発行します。ソース完了時にバッファに残った要素がある場合は、count 未満でも発行されます(空配列は発行されません)。

csharp
source.Chunk(3)

Chunk(count) のマーブルダイアグラム

ソースが 1, 2, 3, 4, 5 を発行し完了した場合、[1,2,3][4,5] が発行されます。


スライディングウィンドウ — Chunk(count, skip)

csharp
public static Observable<T[]> Chunk<T>(this Observable<T> source, Int32 count, Int32 skip)

skip 個ごとにウィンドウを開始し、count 個の要素を含む配列を発行します。

  • skip < count — ウィンドウが重複(オーバーラップ)
  • skip == countChunk(count) と同じ
  • skip > count — ウィンドウ間にギャップが生じ、一部の要素がスキップされる
csharp
source.Chunk(count: 3, skip: 1)   // 2要素ずつ重複
source.Chunk(count: 3, skip: 2)   // 1要素ずつ重複
source.Chunk(count: 2, skip: 3)   // 1要素ずつスキップ

Chunk(count, skip) のマーブルダイアグラム

skip < count の場合、ウィンドウが重複し同じ要素が複数の配列に含まれます。


時間ベース — Chunk(timeSpan)

csharp
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan)
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider)

timeSpan ごとにバッファを区切り、蓄積された要素を配列として発行します。時間のカウントは購読開始時ではなく、チャンクに含まれる最初の要素が到着した時点から開始されます。区間内に要素がなかった場合、空配列は発行されません。TimeProvider を指定するとタイマーの実装を差し替えられます(テスト時に便利)。

csharp
source.Chunk(TimeSpan.FromSeconds(1))

Chunk(timeSpan) のマーブルダイアグラム

一定時間ごとにバッファが区切られ、溜まった要素がまとめて発行されます。


時間+個数 — Chunk(timeSpan, count)

csharp
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan, Int32 count)
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan, Int32 count, TimeProvider timeProvider)

時間と個数の どちらか先に到達した条件 でバッファを発行します。時間のカウントは、現在のチャンクに含まれる最初の要素が到着した時点から開始されます。高頻度のイベントに対して「最大でも N 個」「最長でも T 秒」という制約を同時に掛けたい場合に有用です。

csharp
source.Chunk(TimeSpan.FromSeconds(1), count: 5)

Chunk(timeSpan, count) のマーブルダイアグラム

時間が先に来れば時間で区切られ、個数が先に達すれば個数で区切られます。


境界 Observable — Chunk(windowBoundaries)

csharp
public static Observable<TSource[]> Chunk<TSource, TWindowBoundary>(
    this Observable<TSource> source,
    Observable<TWindowBoundary> windowBoundaries)

windowBoundaries が要素を発行するたびにバッファを区切り、蓄積された要素を配列として発行します。外部のイベント(ボタンクリック、タイマーなど)でバッファの区切りを制御できます。

csharp
source.Chunk(boundaryObservable)

Chunk(windowBoundaries) のマーブルダイアグラム

境界 Observable が発火するたびに、それまでに溜まった要素がまとめて発行されます。


非同期ウィンドウ — Chunk(asyncWindow)

csharp
public static Observable<T[]> Chunk<T>(
    this Observable<T> source,
    Func<T, CancellationToken, ValueTask> asyncWindow,
    Boolean configureAwait = true)

最初の要素が到着するとバッファを開始し、同時に asyncWindow を呼び出します。asyncWindow が完了した時点でバッファを発行します。次の要素が到着すると再びバッファを開始します。

csharp
source.Chunk(async (value, ct) =>
{
    await Task.Delay(TimeSpan.FromSeconds(1), ct);
})

Chunk(asyncWindow) のマーブルダイアグラム

非同期処理の完了をトリガーとしてバッファが区切られます。


overload の使い分け

overload使う場面
Chunk(count)固定サイズのバッチ処理。ログを N 行ずつ処理するなど
Chunk(count, skip)移動平均やスライディングウィンドウ解析
Chunk(timeSpan)一定間隔でまとめて UI 更新やネットワーク送信を行いたい場合
Chunk(timeSpan, count)レート制限付きバッチ処理。高頻度イベントの制御
Chunk(windowBoundaries)ユーザー操作(ボタン押下など)でバッファを区切りたい場合
Chunk(asyncWindow)非同期 I/O の完了をトリガーにバッファを区切りたい場合

3. サンプルコード

個数ベース

csharp
// 3 要素ずつバッチ処理
Observable.Range(1, 10)
    .Chunk(3)
    .Subscribe(batch =>
    {
        Console.WriteLine($"Batch: [{string.Join(", ", batch)}]");
    });

// 出力:
// Batch: [1, 2, 3]
// Batch: [4, 5, 6]
// Batch: [7, 8, 9]
// Batch: [10]

スライディングウィンドウ(移動平均)

csharp
// 3 要素の移動平均を 1 要素ずつスライド
Observable.Range(1, 5)
    .Chunk(count: 3, skip: 1)
    .Subscribe(window =>
    {
        var avg = window.Average();
        Console.WriteLine($"Window: [{string.Join(", ", window)}] -> Average: {avg}");
    });

// 出力:
// Window: [1, 2, 3] -> Average: 2
// Window: [2, 3, 4] -> Average: 3
// Window: [3, 4, 5] -> Average: 4

時間ベース

csharp
// 1 秒ごとにバッファを発行
observable
    .Chunk(TimeSpan.FromSeconds(1))
    .Subscribe(batch =>
    {
        Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] Count: {batch.Length}");
    });

時間+個数

csharp
// 最大 100 件または 5 秒ごとに送信
sensorDataStream
    .Chunk(TimeSpan.FromSeconds(5), count: 100)
    .SubscribeAwait(async (batch, ct) =>
    {
        await SendToServerAsync(batch, ct);
    });

境界 Observable

csharp
// ボタンクリックでバッファを区切る
inputStream
    .Chunk(buttonClickObservable)
    .Subscribe(batch =>
    {
        ProcessBatch(batch);
    });

非同期ウィンドウ

csharp
// 非同期処理の完了でバッファを区切る
source
    .Chunk(async (value, ct) =>
    {
        await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
    })
    .Subscribe(batch =>
    {
        Console.WriteLine($"Batch: [{string.Join(", ", batch)}]");
    });

4. 補足

  • Rx の Buffer に相当する演算子ですが、R3 では Chunk という名前に変更されています。
  • 隣接する 2 要素だけを扱いたい場合、Chunk(count: 2, skip: 1)Pairwise と似た結果になります。ただし Chunk(2, 1)T[] を発行し、ソース完了時に残った 1 要素のチャンクも発行します。常に 2 要素揃った前回値・今回値のタプルが欲しい場合は Pairwise を使用してください。
  • 関連演算子: ChunkFrame(フレームベース版)、ChunkUntil(述語ベース版)、Pairwise(隣接ペア)