Publish
1. 概要
Publish は、Observable を ConnectableObservable に変換するオペレーターです。Connect() を手動で呼び出すまでソースへの購読を開始しないため、すべての購読者が準備できてから一斉にデータの配信を開始できます。
内部実装としては Multicast に Subject を渡す専用オペレーターで、Publish() は Multicast(new Subject<T>()) と同義です。initialValue を指定するオーバーロードは Multicast(new BehaviorSubject<T>(initialValue)) と同義です。
2. シグネチャ
パラメータなし
public static ConnectableObservable<T> Publish<T>(
this Observable<T> source)内部的に Subject<T> を使用して、ソースをマルチキャストする ConnectableObservable を返します。これは source.Multicast(new Subject<T>()) と同義です。
initialValue を指定
public static ConnectableObservable<T> Publish<T>(
this Observable<T> source,
T initialValue)内部的に BehaviorSubject<T> を使用します。これは source.Multicast(new BehaviorSubject<T>(initialValue)) と同義です。購読者は Connect() 前でも initialValue を即座に受け取ります。
オーバーロードの使い分け
- 通常のマルチキャスト → パラメータなしオーバーロードを使用。
- 初期値が必要な場合 →
initialValueオーバーロードを使用。新しい購読者は常に最新の値(または初期値)を受け取ります。
3. マーブルダイアグラム
Publish で作成された ConnectableObservable は、Connect() が呼び出されるまでソースに購読しません。Connect() 後、すべての購読者が同じストリームを受信します。
図は Connect() 後に共有ストリームとして値が配信される様子を簡略化して示しています。Publish() 単体では、購読者を登録しただけではソースは開始されません。実際に値を流すには、返された ConnectableObservable に対して Connect() を呼び出します。
4. サンプルコード
// 基本的な Publish の使い方
var connectable = observable.Publish();
// 複数の購読者を先に登録
var sub1 = connectable.Subscribe(x => Console.WriteLine($"購読者1: {x}"));
var sub2 = connectable.Subscribe(x => Console.WriteLine($"購読者2: {x}"));
// Connect() でソースへの購読を開始
var connection = connectable.Connect();
// 不要になったら接続を破棄
connection.Dispose();
// initialValue を使用
var connectableWithInit = observable.Publish(initialValue: 0);
connectableWithInit.Subscribe(x => Console.WriteLine($"値: {x}")); // すぐに 0 を受け取る
// RefCount と組み合わせて自動管理
var autoConnect = observable.Publish().RefCount();
// Publish().RefCount() と同じ動作をするショートカット
var shared = observable.Share();5. 補足
Publish は単体では手動 Connect() が必要ですが、RefCount と組み合わせると、最初の購読者が購読したときに自動で接続し、最後の購読者が解除されたときに自動で切断できます。そのため、ソースを共有したいが接続ライフタイムは購読者数に任せたい場合は、Publish().RefCount() が便利です。
Share は、この Publish().RefCount() をまとめたショートカットです。単にソースを複数購読者で共有したいだけなら Share() を使うと簡潔です。Connect() のタイミングを自分で制御したい場合は Publish()、接続を自動管理したい場合は Publish().RefCount() または Share() を使用します。
過去の値を再生する必要がある場合は Replay を使用してください。
手動で Connect() を呼び出すパターンは、すべての購読者がセットアップされた後に一斉にデータを受信したい場合に有用です。
Publish / Publish(initialValue) は Multicast の代表的な Subject 指定を簡潔に書くための API です。通常は Multicast(new Subject<T>()) を直接書くよりも、意図が伝わりやすい Publish() を使用してください。