Skip to content

SelectAwait

1. 概要

SelectAwait は、上流から流れてくる各値に 非同期関数 を適用し、その結果を下流へ流すオペレーターです。

同期版の Select と同じ「値の変換」を行いますが、変換処理が ValueTask<TResult> を返す非同期メソッドである点が異なります。AwaitOperation パラメーターによって、非同期処理が重なった場合の振る舞い(順次実行・ドロップ・スイッチ・並列など)を細かく制御できます。

2. シグネチャ

非同期変換

csharp
public static Observable<TResult> SelectAwait<T, TResult>(
    this Observable<T> source,
    Func<T, CancellationToken, ValueTask<TResult>> selector,
    AwaitOperation awaitOperation = AwaitOperation.Sequential,
    bool configureAwait = true,
    bool cancelOnCompleted = false,
    int maxConcurrent = -1)

各値に対して非同期の selector を実行し、結果を下流へ流します。awaitOperation で非同期処理の並行制御を指定します。

csharp
source.SelectAwait(async (x, ct) => await httpClient.GetStringAsync($"/api/{x}", ct))

AwaitOperation の種類

AwaitOperation動作
Sequential(既定)値をキューに入れ、前の非同期処理の完了を待ってから次を実行する
Drop非同期処理の実行中に届いた値は破棄する
Switch新しい値が届くと実行中の非同期処理をキャンセルし、最新の値で再実行する
Parallelすべての値を即座に非同期処理へ送る(完了順で下流へ流れる)
SequentialParallelすべての値を即座に非同期処理へ送るが、結果は元の順序を保って下流へ流す
ThrottleFirstLast実行中は最初と最後の値のみ保持し、スロットリングする

maxConcurrentParallel および SequentialParallel でのみ有効で、同時実行数を制限します。-1(既定)の場合は制限なしです。

3. マーブルダイアグラム

SelectAwait のマーブルダイアグラム

上流の各値に対して非同期処理が実行され、完了した結果が下流へ流れます。AwaitOperation の設定により、処理中に届いた値の扱いが変わります。

4. サンプルコード

csharp
using R3;

var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);

// Sequential: 前の処理が終わるまで次を待つ
source.SelectAwait(async (x, ct) =>
    {
        await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
        return x * 10;
    }, AwaitOperation.Sequential)
    .Subscribe(x => Console.WriteLine($"Sequential: {x}"));

// Parallel: すべて同時に実行し、完了順で流れる
source.SelectAwait(async (x, ct) =>
    {
        await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
        return x * 10;
    }, AwaitOperation.Parallel)
    .Subscribe(x => Console.WriteLine($"Parallel: {x}"));

// Switch: 新しい値が届くと前の処理をキャンセル
source.SelectAwait(async (x, ct) =>
    {
        await Task.Delay(TimeSpan.FromSeconds(2), ct);
        return x * 10;
    }, AwaitOperation.Switch)
    .Subscribe(x => Console.WriteLine($"Switch: {x}"));

5. 補足

  • 同期的な変換には Select を使用してください。SelectAwait は非同期処理が必要な場合にのみ使います。
  • SelectMany も非同期的な展開に使えますが、SelectMany は各値から Observable を生成して合流 するのに対し、SelectAwait は各値に対して 1 つの非同期結果 を返す点が異なります。
  • cancelOnCompletedtrue にすると、上流が完了した時点で実行中の非同期処理がキャンセルされます。