Skip to content

Subscription / Multicast カテゴリ

概要

Subscription / Multicast カテゴリには、Observable の購読タイミングを制御するオペレーター、マルチキャスト(複数の購読者でソースを共有)するためのオペレーター、通知の配信スケジューラを変更するオペレーター、および型変換・同期化ユーティリティが含まれます。Subscribe などの購読 API は 購読 API にまとめています。

なぜマルチキャスト系オペレーターが必要か

Observable は基本的に、Subscribe された時点でシーケンスを開始します。多くの Observable は、購読されるたびに新しいシーケンスを作ります。つまり同じ Observable<T> インスタンスに複数回 Subscribe しても、既に動いている 1 つのシーケンスに複数の Observer が相乗りするとは限りません。

例えば、HTTP リクエスト、ファイル読み込み、タイマー、イベント変換、状態の監視などを表す Observable に 2 人の購読者がいる場合、単純に 2 回 Subscribe すると、ソース処理も 2 回起動する可能性があります。これは「同じ結果を複数の購読者へ配りたい」場合には不要な重複になりますし、副作用を持つソースでは意図しない再実行にもなります。

マルチキャスト系オペレーターは、この問題を解決するために、ソースからの通知を Subject などを介して複数の Observer に配信します。ソースへの購読は 1 つにまとめ、その出力を複数の購読者で共有する、という役割です。

もう 1 つの重要な用途は、接続タイミングの制御です。通常の Observable は Subscribe された時点で起動するため、購読者を先にすべて登録してからシーケンスを開始することが難しい場合があります。PublishReplayMulticastConnectableObservable を返し、Connect() を呼ぶまでソースを起動しません。これにより、複数の購読者を事前に Subscribe しておき、全員が準備できたタイミングで一斉にシーケンスを開始できます。

また、ReplayReplayFrame は、ソースを共有しながら過去の値をバッファリングします。これにより、後から参加した購読者にも直近の値や一定範囲の履歴を渡せます。単に共有したいだけなら Share、開始タイミングを手動で制御したいなら Publish、履歴も必要なら Replay / ReplayFrame、Subject を明示的に選びたいなら Multicast を使用します。

ホットシーケンスとは

このカテゴリの記事では「ホット」という表現が登場します。これは、シーケンスが購読者ごとに個別に開始されるのではなく、既に動いている 1 つのソースを複数の購読者で共有する状態を指します。

対になる考え方として「コールド」があります。コールドな Observable は、購読されるたびに新しいシーケンスを開始します。例えば Observable.RangeObservable.Create、HTTP リクエストを表す Observable などは、購読ごとに処理が最初から実行されることがあります。

一方、ホットな Observable では、ソースは購読者とは独立して動いており、購読者はその時点から流れてくる通知を受け取ります。Unity のイベント、UI のクリック、センサー入力、タイマーの共有などはホットな使い方になりやすい例です。

SharePublishReplayMulticast などのマルチキャスト系オペレーターは、コールドなソースを複数購読者で共有できる形に変換し、ホットシーケンスとして扱うために使います。ただし、PublishReplayConnectableObservable を返すため、Connect() するまではソースを開始しません。SharePublish().RefCount() と同等で、最初の購読者が現れた時点で自動的に接続します。

ホット化すると、後から購読した Observer は「過去に流れた値」を通常は受け取れません。後続購読者にも過去の値を渡したい場合は ReplayReplayFrame を使用します。

オペレーター一覧

関連概念

名前説明
ConnectableObservableConnect() でソースへの接続開始を制御できる Observable

マルチキャスト

オペレーター説明
Shareソースを自動的にホットシーケンスとして共有する
PublishConnectableObservable に変換し、手動で接続を制御する
Replay過去の値をバッファリングして新しい購読者に再生する
ReplayFrameフレームベースのウィンドウで値を再生する
Multicast任意の Subject を介してマルチキャストする
RefCountConnectableObservable を自動接続・切断する

スケジューリング

オペレーター説明
ObserveOn通知を受け取るスケジューラを指定する
ObserveOnCurrentSynchronizationContext現在の SynchronizationContext で通知を受け取る
ObserveOnThreadPoolスレッドプールで通知を受け取る
SubscribeOn購読処理を実行するスケジューラを指定する
SubscribeOnCurrentSynchronizationContext現在の SynchronizationContext で購読する
SubscribeOnThreadPoolスレッドプールで購読する
SubscribeOnSynchronize購読時にロックで同期する

同期・変換

オペレーター説明
Synchronize通知をスレッドセーフに直列化する
Trampoline再入を防ぎ通知をキューで直列化する
AsObservable型を Observable<T> に隠蔽する
AsSystemObservableR3 の Observable を System.IObservable<T> に変換する

比較表

マルチキャスト系の比較

オペレーター自動接続過去の値の再生手動制御
Share
Publish✅ (Connect)
Replay✅ (Connect)
ReplayFrame✅ (フレーム単位)✅ (Connect)
MulticastSubject 依存✅ (Connect)
RefCountソース依存

スケジューリング系の比較

オペレーター影響範囲スケジューラ
ObserveOn通知の配信先任意
ObserveOnCurrentSynchronizationContext通知の配信先現在の SynchronizationContext
ObserveOnThreadPool通知の配信先スレッドプール
SubscribeOn購読処理の実行先任意
SubscribeOnCurrentSynchronizationContext購読処理の実行先現在の SynchronizationContext
SubscribeOnThreadPool購読処理の実行先スレッドプール
SubscribeOnSynchronize購読処理の同期ロックベース

同期・変換系の比較

オペレーター目的手法
Synchronizeマルチスレッド安全性lock による排他制御
Trampoline再入防止キューによる直列化
AsObservable型の隠蔽R3 Observable<T>
AsSystemObservable相互運用System.IObservable<T>

使い分けガイド

ソースを複数の購読者で共有したい

  • 最もシンプルな方法Share: 自動的に接続・切断を管理します。
  • 接続タイミングを制御したいPublish + RefCount または手動 Connect()
  • 新しい購読者に過去の値を再生したいReplay または ReplayFrame
  • カスタム Subject を使いたいMulticast

通知の実行コンテキストを変更したい

通知の安全性を確保したい

  • マルチスレッドからの通知を直列化したいSynchronize
  • 再帰的な通知を安全に処理したいTrampoline

型を変換したい

通知を購読したい

  • 通常のコールバックで購読したいSubscribe
  • async/await で OnNext を処理したいSubscribeAwait
  • シーケンスの完了を await したいForEachAsync