ConnectableObservable
1. 概要
ConnectableObservable<T> は、通常の Observable<T> と同じように購読できますが、ソースへの接続開始を Connect() で明示的に制御できる Observable です。
厳密には ConnectableObservable<T> 自体はオペレーターではなく、Publish、Replay、ReplayFrame、Multicast などのオペレーターが返す型です。
2. 基本的な動作
通常の Observable は、多くの場合 Subscribe された時点でソース処理を開始します。一方、ConnectableObservable<T> は、購読者を登録しただけではソースへ接続しません。
var connectable = source.Publish();
// ここではまだ source は開始されない
var sub1 = connectable.Subscribe(x => Console.WriteLine($"A: {x}"));
var sub2 = connectable.Subscribe(x => Console.WriteLine($"B: {x}"));
// ここで初めて source への購読が開始される
var connection = connectable.Connect();Connect() 後は、ソースからの通知が内部の Subject などを通じて複数の購読者へ配信されます。つまり、複数の購読者が同じソースシーケンスを共有できます。
3. Connect の戻り値
Connect() は IDisposable を返します。この IDisposable は、ソースとの接続そのものを表します。
var connection = connectable.Connect();
// ソースとの接続を切断する
connection.Dispose();購読者ごとの IDisposable と、Connect() が返す IDisposable は役割が異なります。
| IDisposable | 役割 |
|---|---|
Subscribe(...) の戻り値 | その購読者だけを解除する |
Connect() の戻り値 | ソースとの接続自体を解除する |
ある購読者を解除しても、他の購読者やソース接続が残っていれば配信は続きます。一方、Connect() の戻り値を Dispose すると、ソースへの接続が切断されます。
4. 主な生成元
| オペレーター | 内部 Subject | 主な用途 |
|---|---|---|
| Publish | Subject<T> | 値を再生せず、複数購読者で共有する |
| Publish(initialValue) | BehaviorSubject<T> | 初期値または最新値を持つ共有 |
| Replay | ReplaySubject<T> | 過去の値を新しい購読者に再生する |
| ReplayFrame | ReplayFrameSubject<T> | フレーム範囲内の過去値を再生する |
| Multicast | 任意の ISubject<T> | Subject を明示的に選んで共有する |
5. RefCount / Share との関係
ConnectableObservable<T> は手動で Connect() するため、開始タイミングを細かく制御できます。一方で、単に「複数購読者で共有したい」だけなら、毎回 Connect() と接続破棄を手で管理するのは面倒です。
RefCount を使うと、最初の購読者が購読したタイミングで自動的に Connect() し、最後の購読者が解除されたタイミングで接続を破棄できます。
Observable<T> shared = source.Publish().RefCount();Share は Publish().RefCount() のショートカットです。
Observable<T> shared = source.Share();開始タイミングを自分で揃えたい場合は Publish() や Replay() が返す ConnectableObservable<T> をそのまま使い、購読者数に応じて自動管理したい場合は RefCount() または Share() を使います。
6. 使いどころ
- 複数の購読者を先に登録してから、同じタイミングで配信を開始したい。
- HTTP リクエスト、センサー、タイマー、イベント変換などのソース処理を複数回起動したくない。
- 後から購読する Observer に過去の値を再生したい。
Connect()の戻り値を保持し、明示的に接続の開始と終了を制御したい。
7. 注意点
ConnectableObservable<T> に Subscribe しただけではソースは開始されません。値が流れない場合は、Connect() を呼んでいるかを確認してください。
また、Connect() を複数回呼んだ場合の扱いは実装に依存します。R3 の Multicast ベースの実装では、接続中に再度 Connect() しても同じ接続が返され、接続が破棄された後に再度 Connect() すると新しい接続が開始されます。