Skip to content

ConnectableObservable

1. 概要

ConnectableObservable<T> は、通常の Observable<T> と同じように購読できますが、ソースへの接続開始を Connect() で明示的に制御できる Observable です。

厳密には ConnectableObservable<T> 自体はオペレーターではなく、PublishReplayReplayFrameMulticast などのオペレーターが返す型です。

2. 基本的な動作

通常の Observable は、多くの場合 Subscribe された時点でソース処理を開始します。一方、ConnectableObservable<T> は、購読者を登録しただけではソースへ接続しません。

csharp
var connectable = source.Publish();

// ここではまだ source は開始されない
var sub1 = connectable.Subscribe(x => Console.WriteLine($"A: {x}"));
var sub2 = connectable.Subscribe(x => Console.WriteLine($"B: {x}"));

// ここで初めて source への購読が開始される
var connection = connectable.Connect();

Connect() 後は、ソースからの通知が内部の Subject などを通じて複数の購読者へ配信されます。つまり、複数の購読者が同じソースシーケンスを共有できます。

3. Connect の戻り値

Connect()IDisposable を返します。この IDisposable は、ソースとの接続そのものを表します。

csharp
var connection = connectable.Connect();

// ソースとの接続を切断する
connection.Dispose();

購読者ごとの IDisposable と、Connect() が返す IDisposable は役割が異なります。

IDisposable役割
Subscribe(...) の戻り値その購読者だけを解除する
Connect() の戻り値ソースとの接続自体を解除する

ある購読者を解除しても、他の購読者やソース接続が残っていれば配信は続きます。一方、Connect() の戻り値を Dispose すると、ソースへの接続が切断されます。

4. 主な生成元

オペレーター内部 Subject主な用途
PublishSubject<T>値を再生せず、複数購読者で共有する
Publish(initialValue)BehaviorSubject<T>初期値または最新値を持つ共有
ReplayReplaySubject<T>過去の値を新しい購読者に再生する
ReplayFrameReplayFrameSubject<T>フレーム範囲内の過去値を再生する
Multicast任意の ISubject<T>Subject を明示的に選んで共有する

5. RefCount / Share との関係

ConnectableObservable<T> は手動で Connect() するため、開始タイミングを細かく制御できます。一方で、単に「複数購読者で共有したい」だけなら、毎回 Connect() と接続破棄を手で管理するのは面倒です。

RefCount を使うと、最初の購読者が購読したタイミングで自動的に Connect() し、最後の購読者が解除されたタイミングで接続を破棄できます。

csharp
Observable<T> shared = source.Publish().RefCount();

SharePublish().RefCount() のショートカットです。

csharp
Observable<T> shared = source.Share();

開始タイミングを自分で揃えたい場合は Publish()Replay() が返す ConnectableObservable<T> をそのまま使い、購読者数に応じて自動管理したい場合は RefCount() または Share() を使います。

6. 使いどころ

  • 複数の購読者を先に登録してから、同じタイミングで配信を開始したい。
  • HTTP リクエスト、センサー、タイマー、イベント変換などのソース処理を複数回起動したくない。
  • 後から購読する Observer に過去の値を再生したい。
  • Connect() の戻り値を保持し、明示的に接続の開始と終了を制御したい。

7. 注意点

ConnectableObservable<T>Subscribe しただけではソースは開始されません。値が流れない場合は、Connect() を呼んでいるかを確認してください。

また、Connect() を複数回呼んだ場合の扱いは実装に依存します。R3 の Multicast ベースの実装では、接続中に再度 Connect() しても同じ接続が返され、接続が破棄された後に再度 Connect() すると新しい接続が開始されます。