ToObservable
1. 概要
ToObservable は、既存の .NET 型を R3 の Observable<T> に変換する拡張メソッドです。
Task、Task<T>、ValueTask、ValueTask<T>、IEnumerable<T>、IAsyncEnumerable<T>、IObservable<T> に対応しており、既存の非同期処理やコレクションをリアクティブパイプラインに統合できます。
2. シグネチャ
Task 系
csharp
public static Observable<Unit> ToObservable(this Task task, bool configureAwait = true)
public static Observable<T> ToObservable<T>(this Task<T> task, bool configureAwait = true)Task は完了時に Unit を発行して完了します。Task<T> は結果の値を発行して完了します。configureAwait で継続のコンテキストキャプチャを制御できます。
csharp
var task = Task.FromResult(42);
task.ToObservable()ValueTask 系
csharp
public static Observable<Unit> ToObservable(this ValueTask task, bool configureAwait = true)
public static Observable<T> ToObservable<T>(this ValueTask<T> task, bool configureAwait = true)ValueTask 版です。動作は Task 版と同様ですが、ValueTask のアロケーション効率の良さを活かせます。
csharp
ValueTask<string> vtask = new ValueTask<string>("hello");
vtask.ToObservable()IEnumerable
csharp
public static Observable<T> ToObservable<T>(this IEnumerable<T> source, CancellationToken cancellationToken = default)同期コレクションの各要素を順に発行して完了します。
csharp
new[] { 1, 2, 3 }.ToObservable()IAsyncEnumerable
csharp
public static Observable<T> ToObservable<T>(this IAsyncEnumerable<T> source)非同期コレクションの各要素を順に発行して完了します。
csharp
asyncEnumerable.ToObservable()IObservable
csharp
public static Observable<T> ToObservable<T>(this IObservable<T> source)標準の System.IObservable<T> を R3 の Observable<T> に変換します。既存の Rx コードとの相互運用に使用します。
csharp
systemObservable.ToObservable()オーバーロードの使い分け
| overload | 使う場面 |
|---|---|
Task.ToObservable() | 非同期処理の結果をリアクティブパイプラインで扱う |
Task<T>.ToObservable() | 戻り値のある非同期処理を変換する |
ValueTask.ToObservable() / ValueTask<T>.ToObservable() | ValueTask を使い、アロケーションを抑えたい場合 |
IEnumerable<T>.ToObservable() | 同期コレクションをリアクティブに処理する |
IAsyncEnumerable<T>.ToObservable() | 非同期ストリームをリアクティブに処理する |
IObservable<T>.ToObservable() | 標準 Rx から R3 への移行・相互運用 |
3. マーブルダイアグラム
変換元の型に応じて動作が異なります。Task / ValueTask は単一の値を発行して完了し、IEnumerable / IAsyncEnumerable はコレクションの各要素を順に発行して完了します。
4. サンプルコード
csharp
using R3;
// Task<T> → Observable<T>
var task = Task.FromResult(42);
task.ToObservable()
.Subscribe(x => Console.WriteLine($"Task の結果: {x}"));
// 出力: Task の結果: 42
// IEnumerable<T> → Observable<T>
var list = new[] { "A", "B", "C" };
list.ToObservable()
.Subscribe(x => Console.WriteLine(x));
// 出力: A, B, C
// IAsyncEnumerable<T> → Observable<T>
async IAsyncEnumerable<int> GenerateAsync()
{
for (int i = 0; i < 3; i++)
{
await Task.Delay(100);
yield return i;
}
}
GenerateAsync().ToObservable()
.Subscribe(x => Console.WriteLine($"非同期: {x}"));
// 出力:
// 非同期: 0
// 非同期: 1
// 非同期: 2
// IObservable<T> → R3 Observable<T>(標準 Rx との相互運用)
IObservable<int> systemObservable = GetLegacyObservable();
systemObservable.ToObservable()
.Subscribe(x => Console.WriteLine($"R3: {x}"));