SubscribeOn
1. 概要
SubscribeOn は、Observable の購読(Subscribe)および破棄(Dispose)処理を実行するスケジューラを指定するオペレーターです。ソースへの購読がどのスレッド・コンテキストで行われるかを制御します。
2. シグネチャ
SynchronizationContext を指定
public static Observable<T> SubscribeOn<T>(
this Observable<T> source,
SynchronizationContext synchronizationContext)指定した SynchronizationContext 上で購読処理を実行します。UI スレッドなど特定のコンテキストでの購読に使用します。
TimeProvider を指定
public static Observable<T> SubscribeOn<T>(
this Observable<T> source,
TimeProvider timeProvider)指定した TimeProvider のスケジューラ上で購読処理を実行します。
FrameProvider を指定
public static Observable<T> SubscribeOn<T>(
this Observable<T> source,
FrameProvider frameProvider)指定した FrameProvider のフレームループ上で購読処理を実行します。ゲームエンジンなどフレームベースの環境で使用します。
オーバーロードの使い分け
- UI スレッドやカスタムコンテキスト →
SynchronizationContextオーバーロードを使用。 - 時間ベースのスケジューリング →
TimeProviderオーバーロードを使用。 - フレームベースの環境(Unity など) →
FrameProviderオーバーロードを使用。
3. マーブルダイアグラム
SubscribeOn はソースへの購読呼び出し自体を指定されたスケジューラ上で実行します。通知の配信先は変更されません。
4. サンプルコード
// SynchronizationContext 上で購読
observable
.SubscribeOn(SynchronizationContext.Current!)
.Subscribe(x => Console.WriteLine($"値: {x}"));
// TimeProvider を使用して購読
observable
.SubscribeOn(TimeProvider.System)
.Subscribe(x => Console.WriteLine($"値: {x}"));
// FrameProvider を使用して購読(Unity 等)
observable
.SubscribeOn(frameProvider)
.Subscribe(x => Console.WriteLine($"値: {x}"));
// ObserveOn と組み合わせる典型的なパターン
observable
.SubscribeOn(TimeProvider.System) // バックグラウンドで購読
.ObserveOn(uiContext) // UI スレッドで通知を受け取る
.Subscribe(x => UpdateUI(x));5. 補足
ObserveOn との違いに注意してください。SubscribeOn は購読処理(Subscribe の呼び出しと Dispose の実行)がどこで行われるかを制御します。一方、ObserveOn は通知(OnNext、OnErrorResume、OnCompleted)がどこで配信されるかを制御します。
| 観点 | SubscribeOn | ObserveOn |
|---|---|---|
| 変更するもの | ソースへの Subscribe と Dispose の実行場所 | OnNext / OnErrorResume / OnCompleted の実行場所 |
| 主な用途 | 重い購読初期化、ブロッキングするソース開始処理の移動 | UI 更新、Unity オブジェクト更新、通知処理のスレッド切り替え |
Subscribe(...) 内のラムダに効くか | 直接は効かない | 効く |
| UI 更新に必要か | これだけでは不十分 | 多くの場合こちら |
SubscribeOn は、名前から「Subscribe(x => ...) の中身を指定コンテキストで実行する」と誤解されやすいですが、そうではありません。SubscribeOn が動かすのは、上流ソースに対する購読開始処理です。OnNext を受け取る場所や、Subscribe に渡したラムダの実行場所を変えたい場合は ObserveOn を使います。
// 誤解しやすい例: UpdateUI が UI スレッドで実行される保証にはならない
observable
.SubscribeOn(uiContext)
.Subscribe(x => UpdateUI(x));UI 更新をしたい場合は、通知を UI スレッドへ戻します。
observable
.ObserveOn(uiContext)
.Subscribe(x => UpdateUI(x));SubscribeOn が有効なのは、購読時点で重い処理やブロッキング処理が走る Observable です。たとえば購読開始時にファイルを開く、ネットワーク接続を開始する、センサーやイベントハンドラを登録する、といった処理を呼び出し元スレッドから逃がしたい場合に使います。
Observable.Create<string>(observer =>
{
// この初期化処理を SubscribeOn で別コンテキストへ移せる
var text = File.ReadAllText(path);
observer.OnNext(text);
observer.OnCompleted();
return Disposable.Empty;
})
.SubscribeOn(TimeProvider.System)
.ObserveOn(uiContext)
.Subscribe(text => label.Text = text);SubscribeOn は通常、チェーン内の位置より「ソースへの購読開始」に効きます。一方で、ObserveOn は置いた位置より下流の通知処理に効きます。この違いが、両者を使い分けるうえで最も重要です。
一般的なパターンとして、SubscribeOn でバックグラウンドスレッドに購読を移し、ObserveOn で UI スレッドに通知を戻すという組み合わせが使われます。