AggregateAsync
1. 概要
AggregateAsync は、Observable シーケンスの全要素を 1 つの値にまとめ、その累積結果を Task として返すオペレーターです。各要素を受け取るたびに「前回までの累積結果」と「今回の要素」から新しい累積結果を作る関数を呼び出します。たとえば合計なら、これまでの合計 acc に次の値 x を足して、新しい合計を返します。
LINQ の Aggregate に相当する非同期版であり、シード値の有無や結果セレクターの指定に応じて 3 つのオーバーロードが用意されています。
2. シグネチャ
シードなし(要素同士の累積)
csharp
public static Task<T> AggregateAsync<T>(
this Observable<T> source,
Func<T, T, T> func,
CancellationToken cancellationToken = default)シード値を指定せず、最初の要素を初期値として使います。2 番目以降の要素については、前回までの累積結果と今回の要素から新しい累積結果を作ります。シーケンスが空の場合は InvalidOperationException をスローします。
シード値あり
csharp
public static Task<TAccumulate> AggregateAsync<T, TAccumulate>(
this Observable<T> source,
TAccumulate seed,
Func<TAccumulate, T, TAccumulate> func,
CancellationToken cancellationToken = default)指定したシード値を初期状態として使います。各要素について、前回までの累積結果と今回の要素から新しい累積結果を作ります。シーケンスが空の場合はシード値がそのまま返されます。
シード値 + 結果セレクター
csharp
public static Task<TResult> AggregateAsync<T, TAccumulate, TResult>(
this Observable<T> source,
TAccumulate seed,
Func<TAccumulate, T, TAccumulate> func,
Func<TAccumulate, TResult> resultSelector,
CancellationToken cancellationToken = default)シード値と累積処理に加え、最終累積値を別の型に変換する resultSelector を指定できます。
オーバーロードの使い分け
| オーバーロード | 用途 |
|---|---|
| シードなし | 要素同士を直接累積する場合(例:最大値、文字列連結) |
| シード値あり | 累積の初期値を明示する場合(例:合計を 0 から開始) |
| シード値 + 結果セレクター | 累積後に型変換や整形が必要な場合 |
3. マーブルダイアグラム
ソースシーケンスの各要素を順番に累積し、OnCompleted 後に最終累積値が Task の結果として返されます。
4. サンプルコード
csharp
// シードなし:要素同士を連結
var source = Observable.Create<string>(observer =>
{
observer.OnNext("Hello");
observer.OnNext(" ");
observer.OnNext("World");
observer.OnCompleted();
return Disposable.Empty;
});
string result = await source.AggregateAsync((acc, x) => acc + x);
Console.WriteLine(result); // "Hello World"csharp
// シード値あり:合計を計算
var numbers = new[] { 1, 2, 3, 4, 5 };
var source = numbers.ToObservable();
int sum = await source.AggregateAsync(0, (acc, x) => acc + x);
Console.WriteLine(sum); // 15csharp
// シード値 + 結果セレクター:平均を計算
var source = new[] { 10, 20, 30 }.ToObservable();
double average = await source.AggregateAsync(
seed: (Sum: 0, Count: 0),
func: (acc, x) => (acc.Sum + x, acc.Count + 1),
resultSelector: acc => (double)acc.Sum / acc.Count);
Console.WriteLine(average); // 20