AggregateByAsync
1. 概要
AggregateByAsync は、Observable シーケンスの要素をキーごとにグループ化し、各グループの累積結果を Task として返すオペレーターです。各グループでは、要素を受け取るたびに「前回までの累積結果」と「今回の要素」から新しい累積結果を作ります。結果は IEnumerable<KeyValuePair<TKey, TAccumulate>> として返され、各キーに対応する累積値を取得できます。LINQ の AggregateBy に相当する非同期版です。
2. シグネチャ
固定シード値
csharp
public static Task<IEnumerable<KeyValuePair<TKey, TAccumulate>>> AggregateByAsync<T, TKey, TAccumulate>(
this Observable<T> source,
Func<T, TKey> keySelector,
TAccumulate seed,
Func<TAccumulate, T, TAccumulate> func,
IEqualityComparer<TKey>? comparer = null,
CancellationToken cancellationToken = default)すべてのキーに対して同一のシード値を使用します。keySelector で要素からキーを抽出し、同じキーの要素に対して func を順次適用して累積します。
シードセレクター
csharp
public static Task<IEnumerable<KeyValuePair<TKey, TAccumulate>>> AggregateByAsync<T, TKey, TAccumulate>(
this Observable<T> source,
Func<T, TKey> keySelector,
Func<TKey, TAccumulate> seedSelector,
Func<TAccumulate, T, TAccumulate> func,
IEqualityComparer<TKey>? comparer = null,
CancellationToken cancellationToken = default)キーに応じて異なるシード値を生成する seedSelector を指定します。各グループの初期値をキーに基づいて動的に決定したい場合に使用します。
オーバーロードの使い分け
| オーバーロード | 用途 |
|---|---|
| 固定シード値 | 全グループで同じ初期値を使う場合(例:すべてのカウントを 0 から開始) |
| シードセレクター | キーに応じて初期値を変えたい場合 |
3. マーブルダイアグラム
ソースシーケンスの各要素がキーごとにグループ化され、各グループで個別に累積されます。OnCompleted 後に全グループの累積結果がキーと値のペアとして返されます。
4. サンプルコード
csharp
// 部門ごとの売上合計を計算
var sales = Observable.Create<(string Department, int Amount)>(observer =>
{
observer.OnNext(("営業", 100));
observer.OnNext(("開発", 200));
observer.OnNext(("営業", 150));
observer.OnNext(("開発", 300));
observer.OnNext(("営業", 80));
observer.OnCompleted();
return Disposable.Empty;
});
var result = await sales.AggregateByAsync(
keySelector: x => x.Department,
seed: 0,
func: (acc, x) => acc + x.Amount);
foreach (var kv in result)
{
Console.WriteLine($"{kv.Key}: {kv.Value}");
}
// 営業: 330
// 開発: 500csharp
// シードセレクターを使用:キーに応じた初期値
var items = Observable.Create<(string Category, int Value)>(observer =>
{
observer.OnNext(("A", 10));
observer.OnNext(("B", 20));
observer.OnNext(("A", 30));
observer.OnCompleted();
return Disposable.Empty;
});
var result = await items.AggregateByAsync(
keySelector: x => x.Category,
seedSelector: key => key == "A" ? 100 : 0,
func: (acc, x) => acc + x.Value);
foreach (var kv in result)
{
Console.WriteLine($"{kv.Key}: {kv.Value}");
}
// A: 140 (100 + 10 + 30)
// B: 20 (0 + 20)