Skip to content

ForEachAsync

1. 概要

ForEachAsync は、Observable シーケンスの各要素に対して指定したアクションを実行し、完了を Task として待機できる終端操作です。await 可能な Subscribe() のように使えるため、シーケンスの完了や失敗を通常の async/await の制御フローに乗せたい場合に便利です。シーケンスが完了するまで全要素を消費し、すべてのアクションが実行された後に Task が完了します。

2. シグネチャ

アクションのみ

csharp
public static Task ForEachAsync<T>(
    this Observable<T> source,
    Action<T> action,
    CancellationToken cancellationToken = default)

各要素に対して action を実行します。戻り値は Task(値なし)で、シーケンスの完了を待機するために使用します。

インデックス付きアクション

csharp
public static Task ForEachAsync<T>(
    this Observable<T> source,
    Action<T, Int32> action,
    CancellationToken cancellationToken = default)

各要素とそのゼロベースインデックスを受け取る action を実行します。

重要な動作

ForEachAsync は内部でソース Observable を購読し、シーケンスが正常に完了すると返された Task も完了します。OnCompleted(Result.Failure(exception)) で失敗完了した場合は、その例外で Task が faulted になるため、呼び出し側の try-catch で受け取れます。OnErrorResume が通知された場合も同様に、返された Task の例外として扱われます。

一方、Subscribe() は購読を開始して IDisposable を返すため、完了を await したり、失敗完了を try-catch で直接扱ったりする用途には向きません。処理の完了まで待ちたい場合や、エラーを外側の async メソッドでまとめて扱いたい場合は ForEachAsync が適しています。

オーバーロードの使い分け

オーバーロード用途
Action<T>要素の値のみが必要な場合
Action<T, Int32>要素の順番(インデックス)も必要な場合

3. サンプルコード

csharp
// 各要素をコンソールに出力
var source = new[] { "Alpha", "Bravo", "Charlie" }.ToObservable();

await source.ForEachAsync(item =>
{
    Console.WriteLine(item);
});
// Alpha
// Bravo
// Charlie
csharp
// インデックス付きで処理
var source = new[] { "りんご", "バナナ", "みかん" }.ToObservable();

await source.ForEachAsync((item, index) =>
{
    Console.WriteLine($"{index}: {item}");
});
// 0: りんご
// 1: バナナ
// 2: みかん
csharp
// 失敗完了を try-catch で受け取る
var source = Observable.Create<int>(observer =>
{
    observer.OnNext(1);
    observer.OnNext(2);
    observer.OnCompleted(Result.Failure(new InvalidOperationException("読み込みに失敗しました")));
    return Disposable.Empty;
});

try
{
    await source.ForEachAsync(x => Console.WriteLine(x));
}
catch (InvalidOperationException ex)
{
    Console.WriteLine(ex.Message); // 読み込みに失敗しました
}