Skip to content

AggregateAsync

1. 概要

AggregateAsync は、Observable シーケンスの全要素を 1 つの値にまとめ、その累積結果を Task として返すオペレーターです。各要素を受け取るたびに「前回までの累積結果」と「今回の要素」から新しい累積結果を作る関数を呼び出します。たとえば合計なら、これまでの合計 acc に次の値 x を足して、新しい合計を返します。

LINQ の Aggregate に相当する非同期版であり、シード値の有無や結果セレクターの指定に応じて 3 つのオーバーロードが用意されています。

2. シグネチャ

シードなし(要素同士の累積)

csharp
public static Task<T> AggregateAsync<T>(
    this Observable<T> source,
    Func<T, T, T> func,
    CancellationToken cancellationToken = default)

シード値を指定せず、最初の要素を初期値として使います。2 番目以降の要素については、前回までの累積結果と今回の要素から新しい累積結果を作ります。シーケンスが空の場合は InvalidOperationException をスローします。

シード値あり

csharp
public static Task<TAccumulate> AggregateAsync<T, TAccumulate>(
    this Observable<T> source,
    TAccumulate seed,
    Func<TAccumulate, T, TAccumulate> func,
    CancellationToken cancellationToken = default)

指定したシード値を初期状態として使います。各要素について、前回までの累積結果と今回の要素から新しい累積結果を作ります。シーケンスが空の場合はシード値がそのまま返されます。

シード値 + 結果セレクター

csharp
public static Task<TResult> AggregateAsync<T, TAccumulate, TResult>(
    this Observable<T> source,
    TAccumulate seed,
    Func<TAccumulate, T, TAccumulate> func,
    Func<TAccumulate, TResult> resultSelector,
    CancellationToken cancellationToken = default)

シード値と累積処理に加え、最終累積値を別の型に変換する resultSelector を指定できます。

オーバーロードの使い分け

オーバーロード用途
シードなし要素同士を直接累積する場合(例:最大値、文字列連結)
シード値あり累積の初期値を明示する場合(例:合計を 0 から開始)
シード値 + 結果セレクター累積後に型変換や整形が必要な場合

3. マーブルダイアグラム

AggregateAsync のマーブルダイアグラム

ソースシーケンスの各要素を順番に累積し、OnCompleted 後に最終累積値が Task の結果として返されます。

4. サンプルコード

csharp
// シードなし:要素同士を連結
var source = Observable.Create<string>(observer =>
{
    observer.OnNext("Hello");
    observer.OnNext(" ");
    observer.OnNext("World");
    observer.OnCompleted();
    return Disposable.Empty;
});

string result = await source.AggregateAsync((acc, x) => acc + x);
Console.WriteLine(result); // "Hello World"
csharp
// シード値あり:合計を計算
var numbers = new[] { 1, 2, 3, 4, 5 };
var source = numbers.ToObservable();

int sum = await source.AggregateAsync(0, (acc, x) => acc + x);
Console.WriteLine(sum); // 15
csharp
// シード値 + 結果セレクター:平均を計算
var source = new[] { 10, 20, 30 }.ToObservable();

double average = await source.AggregateAsync(
    seed: (Sum: 0, Count: 0),
    func: (acc, x) => (acc.Sum + x, acc.Count + 1),
    resultSelector: acc => (double)acc.Sum / acc.Count);

Console.WriteLine(average); // 20