Subscription / Multicast カテゴリ
概要
Subscription / Multicast カテゴリには、Observable の購読タイミングを制御するオペレーター、マルチキャスト(複数の購読者でソースを共有)するためのオペレーター、通知の配信スケジューラを変更するオペレーター、および型変換・同期化ユーティリティが含まれます。Subscribe などの購読 API は 購読 API にまとめています。
なぜマルチキャスト系オペレーターが必要か
Observable は基本的に、Subscribe された時点でシーケンスを開始します。多くの Observable は、購読されるたびに新しいシーケンスを作ります。つまり同じ Observable<T> インスタンスに複数回 Subscribe しても、既に動いている 1 つのシーケンスに複数の Observer が相乗りするとは限りません。
例えば、HTTP リクエスト、ファイル読み込み、タイマー、イベント変換、状態の監視などを表す Observable に 2 人の購読者がいる場合、単純に 2 回 Subscribe すると、ソース処理も 2 回起動する可能性があります。これは「同じ結果を複数の購読者へ配りたい」場合には不要な重複になりますし、副作用を持つソースでは意図しない再実行にもなります。
マルチキャスト系オペレーターは、この問題を解決するために、ソースからの通知を Subject などを介して複数の Observer に配信します。ソースへの購読は 1 つにまとめ、その出力を複数の購読者で共有する、という役割です。
もう 1 つの重要な用途は、接続タイミングの制御です。通常の Observable は Subscribe された時点で起動するため、購読者を先にすべて登録してからシーケンスを開始することが難しい場合があります。Publish、Replay、Multicast は ConnectableObservable を返し、Connect() を呼ぶまでソースを起動しません。これにより、複数の購読者を事前に Subscribe しておき、全員が準備できたタイミングで一斉にシーケンスを開始できます。
また、Replay や ReplayFrame は、ソースを共有しながら過去の値をバッファリングします。これにより、後から参加した購読者にも直近の値や一定範囲の履歴を渡せます。単に共有したいだけなら Share、開始タイミングを手動で制御したいなら Publish、履歴も必要なら Replay / ReplayFrame、Subject を明示的に選びたいなら Multicast を使用します。
ホットシーケンスとは
このカテゴリの記事では「ホット」という表現が登場します。これは、シーケンスが購読者ごとに個別に開始されるのではなく、既に動いている 1 つのソースを複数の購読者で共有する状態を指します。
対になる考え方として「コールド」があります。コールドな Observable は、購読されるたびに新しいシーケンスを開始します。例えば Observable.Range、Observable.Create、HTTP リクエストを表す Observable などは、購読ごとに処理が最初から実行されることがあります。
一方、ホットな Observable では、ソースは購読者とは独立して動いており、購読者はその時点から流れてくる通知を受け取ります。Unity のイベント、UI のクリック、センサー入力、タイマーの共有などはホットな使い方になりやすい例です。
Share、Publish、Replay、Multicast などのマルチキャスト系オペレーターは、コールドなソースを複数購読者で共有できる形に変換し、ホットシーケンスとして扱うために使います。ただし、Publish や Replay は ConnectableObservable を返すため、Connect() するまではソースを開始しません。Share は Publish().RefCount() と同等で、最初の購読者が現れた時点で自動的に接続します。
ホット化すると、後から購読した Observer は「過去に流れた値」を通常は受け取れません。後続購読者にも過去の値を渡したい場合は Replay や ReplayFrame を使用します。
オペレーター一覧
関連概念
| 名前 | 説明 |
|---|---|
| ConnectableObservable | Connect() でソースへの接続開始を制御できる Observable |
マルチキャスト
| オペレーター | 説明 |
|---|---|
| Share | ソースを自動的にホットシーケンスとして共有する |
| Publish | ConnectableObservable に変換し、手動で接続を制御する |
| Replay | 過去の値をバッファリングして新しい購読者に再生する |
| ReplayFrame | フレームベースのウィンドウで値を再生する |
| Multicast | 任意の Subject を介してマルチキャストする |
| RefCount | ConnectableObservable を自動接続・切断する |
スケジューリング
| オペレーター | 説明 |
|---|---|
| ObserveOn | 通知を受け取るスケジューラを指定する |
| ObserveOnCurrentSynchronizationContext | 現在の SynchronizationContext で通知を受け取る |
| ObserveOnThreadPool | スレッドプールで通知を受け取る |
| SubscribeOn | 購読処理を実行するスケジューラを指定する |
| SubscribeOnCurrentSynchronizationContext | 現在の SynchronizationContext で購読する |
| SubscribeOnThreadPool | スレッドプールで購読する |
| SubscribeOnSynchronize | 購読時にロックで同期する |
同期・変換
| オペレーター | 説明 |
|---|---|
| Synchronize | 通知をスレッドセーフに直列化する |
| Trampoline | 再入を防ぎ通知をキューで直列化する |
| AsObservable | 型を Observable<T> に隠蔽する |
| AsSystemObservable | R3 の Observable を System.IObservable<T> に変換する |
比較表
マルチキャスト系の比較
| オペレーター | 自動接続 | 過去の値の再生 | 手動制御 |
|---|---|---|---|
| Share | ✅ | ❌ | ❌ |
| Publish | ❌ | ❌ | ✅ (Connect) |
| Replay | ❌ | ✅ | ✅ (Connect) |
| ReplayFrame | ❌ | ✅ (フレーム単位) | ✅ (Connect) |
| Multicast | ❌ | Subject 依存 | ✅ (Connect) |
| RefCount | ✅ | ソース依存 | ❌ |
スケジューリング系の比較
| オペレーター | 影響範囲 | スケジューラ |
|---|---|---|
| ObserveOn | 通知の配信先 | 任意 |
| ObserveOnCurrentSynchronizationContext | 通知の配信先 | 現在の SynchronizationContext |
| ObserveOnThreadPool | 通知の配信先 | スレッドプール |
| SubscribeOn | 購読処理の実行先 | 任意 |
| SubscribeOnCurrentSynchronizationContext | 購読処理の実行先 | 現在の SynchronizationContext |
| SubscribeOnThreadPool | 購読処理の実行先 | スレッドプール |
| SubscribeOnSynchronize | 購読処理の同期 | ロックベース |
同期・変換系の比較
| オペレーター | 目的 | 手法 |
|---|---|---|
| Synchronize | マルチスレッド安全性 | lock による排他制御 |
| Trampoline | 再入防止 | キューによる直列化 |
| AsObservable | 型の隠蔽 | R3 Observable<T> |
| AsSystemObservable | 相互運用 | System.IObservable<T> |
使い分けガイド
ソースを複数の購読者で共有したい
- 最もシンプルな方法 → Share: 自動的に接続・切断を管理します。
- 接続タイミングを制御したい → Publish + RefCount または手動
Connect()。 - 新しい購読者に過去の値を再生したい → Replay または ReplayFrame。
- カスタム Subject を使いたい → Multicast。
通知の実行コンテキストを変更したい
- 通知を特定のスレッドで受け取りたい → ObserveOn 系を使用。
- 購読処理自体を別スレッドで実行したい → SubscribeOn 系を使用。
- UI スレッドで通知を受け取りたい → ObserveOnCurrentSynchronizationContext。
通知の安全性を確保したい
- マルチスレッドからの通知を直列化したい → Synchronize。
- 再帰的な通知を安全に処理したい → Trampoline。
型を変換したい
- 具体的な型を隠蔽したい → AsObservable。
- System.IObservable<T> として公開したい → AsSystemObservable。
通知を購読したい
- 通常のコールバックで購読したい → Subscribe。
- async/await で OnNext を処理したい → SubscribeAwait。
- シーケンスの完了を await したい → ForEachAsync。