Replay
1. 概要
Replay は、ソースが発行した値をバッファリングし、新しい購読者に過去の値を再生する ConnectableObservable を返すオペレーターです。バッファのサイズ、時間ウィンドウ、またはその両方で制限できます。
内部実装としては Multicast に ReplaySubject<T> を渡す専用オペレーターです。各オーバーロードは、対応するコンストラクターで作成した ReplaySubject<T> を Multicast に渡すことと同義です。
2. シグネチャ
パラメータなし(全値バッファリング)
public static ConnectableObservable<T> Replay<T>(
this Observable<T> source)ソースが発行したすべての値をバッファリングします。バッファサイズに制限はありません。これは source.Multicast(new ReplaySubject<T>()) と同義です。
bufferSize を指定
public static ConnectableObservable<T> Replay<T>(
this Observable<T> source,
int bufferSize)最新の bufferSize 個の値のみをバッファリングします。これは source.Multicast(new ReplaySubject<T>(bufferSize)) と同義です。
window(時間ウィンドウ)を指定
public static ConnectableObservable<T> Replay<T>(
this Observable<T> source,
TimeSpan window)指定した時間ウィンドウ内の値のみをバッファリングします。これは source.Multicast(new ReplaySubject<T>(window)) と同義です。
window + TimeProvider を指定
public static ConnectableObservable<T> Replay<T>(
this Observable<T> source,
TimeSpan window,
TimeProvider timeProvider)時間ウィンドウとカスタム TimeProvider を指定します。これは source.Multicast(new ReplaySubject<T>(window, timeProvider)) と同義です。
bufferSize + window を指定
public static ConnectableObservable<T> Replay<T>(
this Observable<T> source,
int bufferSize,
TimeSpan window)バッファサイズと時間ウィンドウの両方で制限します。これは source.Multicast(new ReplaySubject<T>(bufferSize, window)) と同義です。
bufferSize + window + TimeProvider を指定
public static ConnectableObservable<T> Replay<T>(
this Observable<T> source,
int bufferSize,
TimeSpan window,
TimeProvider timeProvider)バッファサイズ、時間ウィンドウ、およびカスタム TimeProvider のすべてを指定する完全なオーバーロードです。これは source.Multicast(new ReplaySubject<T>(bufferSize, window, timeProvider)) と同義です。
オーバーロードの使い分け
- すべての過去の値が必要 → パラメータなしオーバーロードを使用。ただしメモリ使用量に注意してください。
- 最新 N 件のみ必要 →
bufferSizeオーバーロードを使用。 - 一定時間内の値のみ必要 →
windowオーバーロードを使用。 - 件数と時間の両方で制限 →
bufferSize+windowオーバーロードを使用。 - テストやカスタムタイマー →
TimeProvider付きオーバーロードを使用。
3. マーブルダイアグラム
Connect() 後にソースが値を発行し始めます。新しい購読者が追加されると、バッファされた過去の値が即座に再生され、その後はリアルタイムの値を受け取ります。
4. サンプルコード
// すべての値をリプレイ
var replayed = observable.Replay();
var connection = replayed.Connect();
// 後から購読しても過去のすべての値を受け取る
await Task.Delay(TimeSpan.FromSeconds(1));
replayed.Subscribe(x => Console.WriteLine($"後から購読: {x}"));
// 最新 3 件のみリプレイ
var replayedLimited = observable.Replay(bufferSize: 3);
replayedLimited.Connect();
// 過去 5 秒間の値のみリプレイ
var replayedWindow = observable.Replay(TimeSpan.FromSeconds(5));
replayedWindow.Connect();
// 件数と時間の両方で制限
var replayedBoth = observable.Replay(
bufferSize: 10,
window: TimeSpan.FromSeconds(30));
replayedBoth.Connect();
// RefCount と組み合わせて自動管理
var autoReplay = observable.Replay(bufferSize: 1).RefCount();5. 補足
過去の値の再生が不要な場合は Share や Publish を使用してください。これらはバッファリングを行わないため、メモリ効率が良くなります。
フレームベースのウィンドウで値を再生する場合は ReplayFrame を使用してください。
Replay は Multicast の代表的な ReplaySubject<T> 指定を簡潔に書くための API です。通常は Multicast(new ReplaySubject<T>(...)) を直接書くよりも、意図が伝わりやすい Replay(...) を使用してください。