ForEachAsync
1. 概要
ForEachAsync は、Observable シーケンスの各要素に対して指定したアクションを実行し、完了を Task として待機できる終端操作です。await 可能な Subscribe() のように使えるため、シーケンスの完了や失敗を通常の async/await の制御フローに乗せたい場合に便利です。シーケンスが完了するまで全要素を消費し、すべてのアクションが実行された後に Task が完了します。
2. シグネチャ
アクションのみ
csharp
public static Task ForEachAsync<T>(
this Observable<T> source,
Action<T> action,
CancellationToken cancellationToken = default)各要素に対して action を実行します。戻り値は Task(値なし)で、シーケンスの完了を待機するために使用します。
インデックス付きアクション
csharp
public static Task ForEachAsync<T>(
this Observable<T> source,
Action<T, Int32> action,
CancellationToken cancellationToken = default)各要素とそのゼロベースインデックスを受け取る action を実行します。
重要な動作
ForEachAsync は内部でソース Observable を購読し、シーケンスが正常に完了すると返された Task も完了します。OnCompleted(Result.Failure(exception)) で失敗完了した場合は、その例外で Task が faulted になるため、呼び出し側の try-catch で受け取れます。OnErrorResume が通知された場合も同様に、返された Task の例外として扱われます。
一方、Subscribe() は購読を開始して IDisposable を返すため、完了を await したり、失敗完了を try-catch で直接扱ったりする用途には向きません。処理の完了まで待ちたい場合や、エラーを外側の async メソッドでまとめて扱いたい場合は ForEachAsync が適しています。
オーバーロードの使い分け
| オーバーロード | 用途 |
|---|---|
Action<T> | 要素の値のみが必要な場合 |
Action<T, Int32> | 要素の順番(インデックス)も必要な場合 |
3. サンプルコード
csharp
// 各要素をコンソールに出力
var source = new[] { "Alpha", "Bravo", "Charlie" }.ToObservable();
await source.ForEachAsync(item =>
{
Console.WriteLine(item);
});
// Alpha
// Bravo
// Charliecsharp
// インデックス付きで処理
var source = new[] { "りんご", "バナナ", "みかん" }.ToObservable();
await source.ForEachAsync((item, index) =>
{
Console.WriteLine($"{index}: {item}");
});
// 0: りんご
// 1: バナナ
// 2: みかんcsharp
// 失敗完了を try-catch で受け取る
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnCompleted(Result.Failure(new InvalidOperationException("読み込みに失敗しました")));
return Disposable.Empty;
});
try
{
await source.ForEachAsync(x => Console.WriteLine(x));
}
catch (InvalidOperationException ex)
{
Console.WriteLine(ex.Message); // 読み込みに失敗しました
}