Skip to content

AsSystemObservable

1. 概要

AsSystemObservable は、R3 の Observable<T> を標準 .NET の System.IObservable<T> に変換するオペレーターです。System.IObservable<T> を期待するライブラリや API との相互運用が必要な場合に使用します。

R3 の Observable プロトコルから標準 .NET の IObservable プロトコルに変換されます。標準 .NET の IObserver<T> には OnErrorResumeResult 付きの OnCompleted がないため、エラー系の通知は OnError に変換されます。

R3 の通知System.IObserver<T> への通知備考
OnNext(value)OnNext(value)値はそのまま通知されます。
OnErrorResume(error)OnError(error)R3 では非致命的エラーですが、System.IObserver<T>.OnError は終端通知として扱われます。
OnCompleted(Result.Success) / OnCompleted()OnCompleted()正常完了として通知されます。
OnCompleted(Result.Failure(error))OnError(error)R3 の失敗完了は、標準 .NET ではエラー終端として通知されます。

特に OnErrorResume は、R3 ではシーケンスを継続できる非致命的エラー通知ですが、AsSystemObservable で変換すると System.IObserver<T>.OnError になります。標準 .NET の OnError は終端通知のため、購読側から見るとその時点で Observable は終了します。

OnErrorResume でシーケンスを終了させたくない場合は、AsSystemObservable の前に IgnoreOnErrorResume を併用して非致命的エラー通知を抑制します。エラー内容をログなどに残したい場合は、コールバック付きのオーバーロードを使用します。

csharp
IObservable<int> systemObservable = r3Observable
    .IgnoreOnErrorResume(ex => logger.LogWarning(ex, "Non-fatal error"))
    .AsSystemObservable();

OnErrorResume の情報自体を値として下流へ渡したい場合は、Materialize で通知を Notification<T> に変換してから AsSystemObservable する方法もあります。

2. シグネチャ

デフォルト

csharp
public static IObservable<T> AsSystemObservable<T>(
    this Observable<T> source)

R3 の Observable<T>System.IObservable<T> に変換します。返された IObservable<T> を通じて標準の IObserver<T> で購読できます。

3. マーブルダイアグラム

AsSystemObservable のマーブルダイアグラム

図の err は R3 の OnErrorResume を表しており、変換後は System.IObserver<T>.OnError として通知されます。

4. サンプルコード

csharp
// R3 の Observable を IObservable<T> に変換
var r3Observable = Observable.Range(1, 10);
IObservable<int> systemObservable = r3Observable.AsSystemObservable();

// System.IObservable<T> を期待する API に渡す
systemObservable.Subscribe(new MySystemObserver());

// System.Reactive (Rx.NET) との相互運用
var r3Stream = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<long> rxCompatible = r3Stream.AsSystemObservable();

// LINQ to Objects の拡張メソッドなど、IObservable を受け取る API と連携
ExternalLibrary.ProcessStream(rxCompatible);

5. 補足

R3 の型システム内で型を隠蔽するだけの場合は AsObservable を使用してください。AsSystemObservable は R3 の型システムから離れ、標準 .NET の IObservable<T> インターフェースに変換します。

逆方向の変換(IObservable<T> → R3 Observable<T>)には ToObservable 拡張メソッドを使用します。