SelectAwait
1. 概要
SelectAwait は、上流から流れてくる各値に 非同期関数 を適用し、その結果を下流へ流すオペレーターです。
同期版の Select と同じ「値の変換」を行いますが、変換処理が ValueTask<TResult> を返す非同期メソッドである点が異なります。AwaitOperation パラメーターによって、非同期処理が重なった場合の振る舞い(順次実行・ドロップ・スイッチ・並列など)を細かく制御できます。
2. シグネチャ
非同期変換
csharp
public static Observable<TResult> SelectAwait<T, TResult>(
this Observable<T> source,
Func<T, CancellationToken, ValueTask<TResult>> selector,
AwaitOperation awaitOperation = AwaitOperation.Sequential,
bool configureAwait = true,
bool cancelOnCompleted = false,
int maxConcurrent = -1)各値に対して非同期の selector を実行し、結果を下流へ流します。awaitOperation で非同期処理の並行制御を指定します。
csharp
source.SelectAwait(async (x, ct) => await httpClient.GetStringAsync($"/api/{x}", ct))AwaitOperation の種類
| AwaitOperation | 動作 |
|---|---|
Sequential(既定) | 値をキューに入れ、前の非同期処理の完了を待ってから次を実行する |
Drop | 非同期処理の実行中に届いた値は破棄する |
Switch | 新しい値が届くと実行中の非同期処理をキャンセルし、最新の値で再実行する |
Parallel | すべての値を即座に非同期処理へ送る(完了順で下流へ流れる) |
SequentialParallel | すべての値を即座に非同期処理へ送るが、結果は元の順序を保って下流へ流す |
ThrottleFirstLast | 実行中は最初と最後の値のみ保持し、スロットリングする |
maxConcurrent は Parallel および SequentialParallel でのみ有効で、同時実行数を制限します。-1(既定)の場合は制限なしです。
3. マーブルダイアグラム
上流の各値に対して非同期処理が実行され、完了した結果が下流へ流れます。AwaitOperation の設定により、処理中に届いた値の扱いが変わります。
4. サンプルコード
csharp
using R3;
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
// Sequential: 前の処理が終わるまで次を待つ
source.SelectAwait(async (x, ct) =>
{
await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
return x * 10;
}, AwaitOperation.Sequential)
.Subscribe(x => Console.WriteLine($"Sequential: {x}"));
// Parallel: すべて同時に実行し、完了順で流れる
source.SelectAwait(async (x, ct) =>
{
await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
return x * 10;
}, AwaitOperation.Parallel)
.Subscribe(x => Console.WriteLine($"Parallel: {x}"));
// Switch: 新しい値が届くと前の処理をキャンセル
source.SelectAwait(async (x, ct) =>
{
await Task.Delay(TimeSpan.FromSeconds(2), ct);
return x * 10;
}, AwaitOperation.Switch)
.Subscribe(x => Console.WriteLine($"Switch: {x}"));5. 補足
- 同期的な変換には Select を使用してください。
SelectAwaitは非同期処理が必要な場合にのみ使います。 - SelectMany も非同期的な展開に使えますが、
SelectManyは各値から Observable を生成して合流 するのに対し、SelectAwaitは各値に対して 1 つの非同期結果 を返す点が異なります。 cancelOnCompletedをtrueにすると、上流が完了した時点で実行中の非同期処理がキャンセルされます。