WhereAwait
1. 概要
WhereAwait は、ソースシーケンスの各要素に対して非同期条件(async predicate)を評価し、条件を満たす要素のみを通過させるフィルタリングオペレーターです。HTTP リクエストやデータベースクエリなど、非同期処理を伴う条件判定に使用します。
2. シグネチャ
非同期条件関数によるフィルタリング
csharp
public static Observable<T> WhereAwait<T>(
this Observable<T> source,
Func<T, CancellationToken, ValueTask<Boolean>> predicate,
AwaitOperation awaitOperation = AwaitOperation.Sequential,
bool configureAwait = true,
bool cancelOnCompleted = false,
int maxConcurrent = -1)非同期条件関数 predicate が true を返す要素のみを下流に流します。AwaitOperation で非同期処理の実行方式を制御できます。
awaitOperation: 非同期処理の実行方式(Sequential、Parallel、Switch、Dropなど)configureAwait:ConfigureAwaitに渡す値(デフォルト:true)cancelOnCompleted: ソースが完了した際に実行中の非同期処理をキャンセルするか(デフォルト:false)maxConcurrent: 並行実行の最大数。-1は無制限(デフォルト:-1)
csharp
// API でバリデーションを行い、有効な要素のみ通過させる
source.WhereAwait(
async (x, ct) => await ValidateAsync(x, ct),
AwaitOperation.Sequential);3. マーブルダイアグラム
各要素に対して非同期条件が評価されます。非同期処理の完了後、条件を満たす要素のみが下流に流れます。AwaitOperation の設定によって、非同期処理の実行順序や並行性が変わります。
4. サンプルコード
csharp
using R3;
// 非同期バリデーションによるフィルタリング
var userIds = new[] { 1, 2, 3, 4, 5 };
userIds.ToObservable()
.WhereAwait(
async (id, ct) =>
{
// 外部 API でユーザーの有効性を検証
var isActive = await CheckUserIsActiveAsync(id, ct);
return isActive;
},
AwaitOperation.Sequential)
.Subscribe(id => Console.WriteLine($"有効なユーザー ID: {id}"));
// 並列実行で高速化する例
userIds.ToObservable()
.WhereAwait(
async (id, ct) =>
{
var isActive = await CheckUserIsActiveAsync(id, ct);
return isActive;
},
AwaitOperation.Parallel,
maxConcurrent: 3)
.Subscribe(id => Console.WriteLine($"有効なユーザー ID: {id}"));
// ソース完了時に実行中の非同期処理をキャンセルする例
source
.WhereAwait(
async (x, ct) => await SlowValidationAsync(x, ct),
AwaitOperation.Sequential,
cancelOnCompleted: true)
.Subscribe(x => Console.WriteLine($"値: {x}"));5. 補足
Where との違い
条件判定が同期的に行える場合は Where を使用してください。Where のほうがオーバーヘッドが小さく、シンプルです。WhereAwait は非同期処理が必要な場合にのみ使用します。
AwaitOperation の選択
| AwaitOperation | 動作 | 適した場面 |
|---|---|---|
Sequential | 前の非同期処理が完了してから次を開始 | 順序保証が必要な場合(デフォルト) |
Parallel | 複数の非同期処理を並行実行 | 独立した処理で高速化したい場合 |
Switch | 新しい要素が来たら前の処理をキャンセル | 最新の結果だけが必要な場合 |
Drop | 処理中に新しい要素が来たら破棄 | 処理が追いつかない場合にドロップしたい場合 |