ObserveOn
1. 概要
ObserveOn は、通知(OnNext、OnErrorResume、OnCompleted)を受け取るスケジューラ(コンテキスト)を指定するオペレーターです。ソースが発行した値を、指定されたスケジューラ上で配信します。
2. シグネチャ
SynchronizationContext を指定
public static Observable<T> ObserveOn<T>(
this Observable<T> source,
SynchronizationContext synchronizationContext)指定した SynchronizationContext 上で通知を配信します。UI スレッドへの通知配信に最もよく使用されます。
TimeProvider を指定
public static Observable<T> ObserveOn<T>(
this Observable<T> source,
TimeProvider timeProvider)指定した TimeProvider のスケジューラ上で通知を配信します。
FrameProvider を指定
public static Observable<T> ObserveOn<T>(
this Observable<T> source,
FrameProvider frameProvider)指定した FrameProvider のフレームループ上で通知を配信します。ゲームエンジンなどフレームベースの環境で使用します。
オーバーロードの使い分け
- UI スレッドへの通知 →
SynchronizationContextオーバーロードを使用。 - 時間ベースのスケジューリング →
TimeProviderオーバーロードを使用。 - フレームベースの環境(Unity など) →
FrameProviderオーバーロードを使用。
3. マーブルダイアグラム
ObserveOn はソースからの通知をすべて指定されたスケジューラに転送します。値自体は変更されず、通知が配信されるコンテキストのみが変わります。
4. サンプルコード
// バックグラウンド処理の結果を UI スレッドで受け取る
observable
.ObserveOn(uiSynchronizationContext)
.Subscribe(x => UpdateUI(x));
// TimeProvider を使用
observable
.ObserveOn(TimeProvider.System)
.Subscribe(x => Console.WriteLine($"値: {x}"));
// FrameProvider を使用(Unity 等)
observable
.ObserveOn(frameProvider)
.Subscribe(x => UpdateGameObject(x));
// 典型的なパターン: 重い処理をバックグラウンドで行い UI で表示
observable
.Select(x => HeavyComputation(x))
.ObserveOn(SynchronizationContext.Current!)
.Subscribe(result => label.Text = result.ToString());5. 補足
SubscribeOn との違い
SubscribeOn との違いに注意してください。ObserveOn は通知の配信先コンテキストを変更しますが、購読処理自体のコンテキストは変更しません。
| 観点 | ObserveOn | SubscribeOn |
|---|---|---|
| 変更するもの | OnNext / OnErrorResume / OnCompleted の実行場所 | ソースへの Subscribe と Dispose の実行場所 |
| 主な用途 | UI 更新、Unity オブジェクト更新、通知処理のスレッド切り替え | 重い購読初期化、ブロッキングするソース開始処理の移動 |
Subscribe(...) 内のラムダに効くか | 効く | 直接は効かない |
| UI 更新に必要か | 多くの場合こちら | これだけでは不十分 |
初心者が特に混同しやすい点は、SubscribeOn という名前に Subscribe が含まれていても、Subscribe(x => ...) のラムダを実行する場所を指定するわけではないことです。Subscribe(x => UpdateUI(x)) の UpdateUI を UI スレッドで実行したい場合は ObserveOn を使います。
// UI スレッドで UpdateUI を実行したい場合
observable
.ObserveOn(uiContext)
.Subscribe(x => UpdateUI(x));次のように SubscribeOn だけを書いても、通知を UI スレッドに戻す意味にはなりません。
// 誤解しやすい例: Subscribe ラムダの実行場所を変える目的ではない
observable
.SubscribeOn(uiContext)
.Subscribe(x => UpdateUI(x));ObserveOn は、置いた位置より下流の通知処理に影響します。例えば、重い変換はバックグラウンドで行い、その結果だけを UI に戻したい場合は、重い処理の後に ObserveOn を置きます。
observable
.Select(x => HeavyComputation(x)) // ここは ObserveOn 前のコンテキストで実行
.ObserveOn(uiContext)
.Subscribe(result => UpdateUI(result)); // ここは uiContext で実行反対に、ObserveOn を早い位置に置くと、その後の Select なども指定コンテキストで実行されます。UI スレッドに戻した後に重い処理を書くと UI を止める原因になるため、ObserveOn を置く位置には注意してください。
observable
.ObserveOn(uiContext)
.Select(x => HeavyComputation(x)) // UI スレッド上で重い処理をしてしまう
.Subscribe(result => UpdateUI(result));購読開始処理をバックグラウンドに移し、通知だけ UI に戻したい場合は、SubscribeOn と ObserveOn を組み合わせます。
observable
.SubscribeOn(TimeProvider.System) // ソースへの購読開始をバックグラウンドへ
.Select(x => HeavyComputation(x))
.ObserveOn(uiContext) // UI 更新だけ UI スレッドへ
.Subscribe(result => UpdateUI(result));簡便なショートカットとして、ObserveOnCurrentSynchronizationContext や ObserveOnThreadPool も用意されています。