Chunk
1. 概要
Chunk はソース Observable の要素をバッファリングし、配列 T[] としてまとめて発行する演算子です。バッファの区切り方に応じて多数の overload が用意されています。
- 個数ベース — 指定した個数に達するたびに発行
- スライディングウィンドウ —
countとskipを指定して重複やギャップのあるウィンドウを構成 - 時間ベース — 一定時間ごとに発行
- 時間+個数 — 時間または個数の どちらか先 に到達した時点で発行
- 境界 Observable — 別の Observable が発火するたびに発行
- 非同期ウィンドウ — 非同期タスクが完了するたびに発行
いずれの overload でも、空の配列は発行されません。ソースが完了した時点でバッファに残っている要素がある場合は、その残余が発行されます。
2. シグネチャ
個数ベース — Chunk(count)
public static Observable<T[]> Chunk<T>(this Observable<T> source, Int32 count)要素を count 個ずつまとめて配列として発行します。ソース完了時にバッファに残った要素がある場合は、count 未満でも発行されます(空配列は発行されません)。
source.Chunk(3)ソースが 1, 2, 3, 4, 5 を発行し完了した場合、[1,2,3] と [4,5] が発行されます。
スライディングウィンドウ — Chunk(count, skip)
public static Observable<T[]> Chunk<T>(this Observable<T> source, Int32 count, Int32 skip)skip 個ごとにウィンドウを開始し、count 個の要素を含む配列を発行します。
skip < count— ウィンドウが重複(オーバーラップ)skip == count—Chunk(count)と同じskip > count— ウィンドウ間にギャップが生じ、一部の要素がスキップされる
source.Chunk(count: 3, skip: 1) // 2要素ずつ重複
source.Chunk(count: 3, skip: 2) // 1要素ずつ重複
source.Chunk(count: 2, skip: 3) // 1要素ずつスキップskip < count の場合、ウィンドウが重複し同じ要素が複数の配列に含まれます。
時間ベース — Chunk(timeSpan)
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan)
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider)timeSpan ごとにバッファを区切り、蓄積された要素を配列として発行します。時間のカウントは購読開始時ではなく、チャンクに含まれる最初の要素が到着した時点から開始されます。区間内に要素がなかった場合、空配列は発行されません。TimeProvider を指定するとタイマーの実装を差し替えられます(テスト時に便利)。
source.Chunk(TimeSpan.FromSeconds(1))一定時間ごとにバッファが区切られ、溜まった要素がまとめて発行されます。
時間+個数 — Chunk(timeSpan, count)
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan, Int32 count)
public static Observable<T[]> Chunk<T>(this Observable<T> source, TimeSpan timeSpan, Int32 count, TimeProvider timeProvider)時間と個数の どちらか先に到達した条件 でバッファを発行します。時間のカウントは、現在のチャンクに含まれる最初の要素が到着した時点から開始されます。高頻度のイベントに対して「最大でも N 個」「最長でも T 秒」という制約を同時に掛けたい場合に有用です。
source.Chunk(TimeSpan.FromSeconds(1), count: 5)時間が先に来れば時間で区切られ、個数が先に達すれば個数で区切られます。
境界 Observable — Chunk(windowBoundaries)
public static Observable<TSource[]> Chunk<TSource, TWindowBoundary>(
this Observable<TSource> source,
Observable<TWindowBoundary> windowBoundaries)windowBoundaries が要素を発行するたびにバッファを区切り、蓄積された要素を配列として発行します。外部のイベント(ボタンクリック、タイマーなど)でバッファの区切りを制御できます。
source.Chunk(boundaryObservable)境界 Observable が発火するたびに、それまでに溜まった要素がまとめて発行されます。
非同期ウィンドウ — Chunk(asyncWindow)
public static Observable<T[]> Chunk<T>(
this Observable<T> source,
Func<T, CancellationToken, ValueTask> asyncWindow,
Boolean configureAwait = true)最初の要素が到着するとバッファを開始し、同時に asyncWindow を呼び出します。asyncWindow が完了した時点でバッファを発行します。次の要素が到着すると再びバッファを開始します。
source.Chunk(async (value, ct) =>
{
await Task.Delay(TimeSpan.FromSeconds(1), ct);
})非同期処理の完了をトリガーとしてバッファが区切られます。
overload の使い分け
| overload | 使う場面 |
|---|---|
Chunk(count) | 固定サイズのバッチ処理。ログを N 行ずつ処理するなど |
Chunk(count, skip) | 移動平均やスライディングウィンドウ解析 |
Chunk(timeSpan) | 一定間隔でまとめて UI 更新やネットワーク送信を行いたい場合 |
Chunk(timeSpan, count) | レート制限付きバッチ処理。高頻度イベントの制御 |
Chunk(windowBoundaries) | ユーザー操作(ボタン押下など)でバッファを区切りたい場合 |
Chunk(asyncWindow) | 非同期 I/O の完了をトリガーにバッファを区切りたい場合 |
3. サンプルコード
個数ベース
// 3 要素ずつバッチ処理
Observable.Range(1, 10)
.Chunk(3)
.Subscribe(batch =>
{
Console.WriteLine($"Batch: [{string.Join(", ", batch)}]");
});
// 出力:
// Batch: [1, 2, 3]
// Batch: [4, 5, 6]
// Batch: [7, 8, 9]
// Batch: [10]スライディングウィンドウ(移動平均)
// 3 要素の移動平均を 1 要素ずつスライド
Observable.Range(1, 5)
.Chunk(count: 3, skip: 1)
.Subscribe(window =>
{
var avg = window.Average();
Console.WriteLine($"Window: [{string.Join(", ", window)}] -> Average: {avg}");
});
// 出力:
// Window: [1, 2, 3] -> Average: 2
// Window: [2, 3, 4] -> Average: 3
// Window: [3, 4, 5] -> Average: 4時間ベース
// 1 秒ごとにバッファを発行
observable
.Chunk(TimeSpan.FromSeconds(1))
.Subscribe(batch =>
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] Count: {batch.Length}");
});時間+個数
// 最大 100 件または 5 秒ごとに送信
sensorDataStream
.Chunk(TimeSpan.FromSeconds(5), count: 100)
.SubscribeAwait(async (batch, ct) =>
{
await SendToServerAsync(batch, ct);
});境界 Observable
// ボタンクリックでバッファを区切る
inputStream
.Chunk(buttonClickObservable)
.Subscribe(batch =>
{
ProcessBatch(batch);
});非同期ウィンドウ
// 非同期処理の完了でバッファを区切る
source
.Chunk(async (value, ct) =>
{
await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
})
.Subscribe(batch =>
{
Console.WriteLine($"Batch: [{string.Join(", ", batch)}]");
});4. 補足
- Rx の
Bufferに相当する演算子ですが、R3 ではChunkという名前に変更されています。 - 隣接する 2 要素だけを扱いたい場合、
Chunk(count: 2, skip: 1)は Pairwise と似た結果になります。ただしChunk(2, 1)はT[]を発行し、ソース完了時に残った 1 要素のチャンクも発行します。常に 2 要素揃った前回値・今回値のタプルが欲しい場合はPairwiseを使用してください。 - 関連演算子: ChunkFrame(フレームベース版)、ChunkUntil(述語ベース版)、Pairwise(隣接ペア)