Skip to content

Replay

1. 概要

Replay は、ソースが発行した値をバッファリングし、新しい購読者に過去の値を再生する ConnectableObservable を返すオペレーターです。バッファのサイズ、時間ウィンドウ、またはその両方で制限できます。

内部実装としては MulticastReplaySubject<T> を渡す専用オペレーターです。各オーバーロードは、対応するコンストラクターで作成した ReplaySubject<T>Multicast に渡すことと同義です。

2. シグネチャ

パラメータなし(全値バッファリング)

csharp
public static ConnectableObservable<T> Replay<T>(
    this Observable<T> source)

ソースが発行したすべての値をバッファリングします。バッファサイズに制限はありません。これは source.Multicast(new ReplaySubject<T>()) と同義です。

bufferSize を指定

csharp
public static ConnectableObservable<T> Replay<T>(
    this Observable<T> source,
    int bufferSize)

最新の bufferSize 個の値のみをバッファリングします。これは source.Multicast(new ReplaySubject<T>(bufferSize)) と同義です。

window(時間ウィンドウ)を指定

csharp
public static ConnectableObservable<T> Replay<T>(
    this Observable<T> source,
    TimeSpan window)

指定した時間ウィンドウ内の値のみをバッファリングします。これは source.Multicast(new ReplaySubject<T>(window)) と同義です。

window + TimeProvider を指定

csharp
public static ConnectableObservable<T> Replay<T>(
    this Observable<T> source,
    TimeSpan window,
    TimeProvider timeProvider)

時間ウィンドウとカスタム TimeProvider を指定します。これは source.Multicast(new ReplaySubject<T>(window, timeProvider)) と同義です。

bufferSize + window を指定

csharp
public static ConnectableObservable<T> Replay<T>(
    this Observable<T> source,
    int bufferSize,
    TimeSpan window)

バッファサイズと時間ウィンドウの両方で制限します。これは source.Multicast(new ReplaySubject<T>(bufferSize, window)) と同義です。

bufferSize + window + TimeProvider を指定

csharp
public static ConnectableObservable<T> Replay<T>(
    this Observable<T> source,
    int bufferSize,
    TimeSpan window,
    TimeProvider timeProvider)

バッファサイズ、時間ウィンドウ、およびカスタム TimeProvider のすべてを指定する完全なオーバーロードです。これは source.Multicast(new ReplaySubject<T>(bufferSize, window, timeProvider)) と同義です。

オーバーロードの使い分け

  • すべての過去の値が必要 → パラメータなしオーバーロードを使用。ただしメモリ使用量に注意してください。
  • 最新 N 件のみ必要bufferSize オーバーロードを使用。
  • 一定時間内の値のみ必要window オーバーロードを使用。
  • 件数と時間の両方で制限bufferSize + window オーバーロードを使用。
  • テストやカスタムタイマーTimeProvider 付きオーバーロードを使用。

3. マーブルダイアグラム

Replay のマーブルダイアグラム

Connect() 後にソースが値を発行し始めます。新しい購読者が追加されると、バッファされた過去の値が即座に再生され、その後はリアルタイムの値を受け取ります。

4. サンプルコード

csharp
// すべての値をリプレイ
var replayed = observable.Replay();
var connection = replayed.Connect();

// 後から購読しても過去のすべての値を受け取る
await Task.Delay(TimeSpan.FromSeconds(1));
replayed.Subscribe(x => Console.WriteLine($"後から購読: {x}"));

// 最新 3 件のみリプレイ
var replayedLimited = observable.Replay(bufferSize: 3);
replayedLimited.Connect();

// 過去 5 秒間の値のみリプレイ
var replayedWindow = observable.Replay(TimeSpan.FromSeconds(5));
replayedWindow.Connect();

// 件数と時間の両方で制限
var replayedBoth = observable.Replay(
    bufferSize: 10,
    window: TimeSpan.FromSeconds(30));
replayedBoth.Connect();

// RefCount と組み合わせて自動管理
var autoReplay = observable.Replay(bufferSize: 1).RefCount();

5. 補足

過去の値の再生が不要な場合は SharePublish を使用してください。これらはバッファリングを行わないため、メモリ効率が良くなります。

フレームベースのウィンドウで値を再生する場合は ReplayFrame を使用してください。

ReplayMulticast の代表的な ReplaySubject<T> 指定を簡潔に書くための API です。通常は Multicast(new ReplaySubject<T>(...)) を直接書くよりも、意図が伝わりやすい Replay(...) を使用してください。