Skip to content

AggregateByAsync

1. 概要

AggregateByAsync は、Observable シーケンスの要素をキーごとにグループ化し、各グループの累積結果を Task として返すオペレーターです。各グループでは、要素を受け取るたびに「前回までの累積結果」と「今回の要素」から新しい累積結果を作ります。結果は IEnumerable<KeyValuePair<TKey, TAccumulate>> として返され、各キーに対応する累積値を取得できます。LINQ の AggregateBy に相当する非同期版です。

2. シグネチャ

固定シード値

csharp
public static Task<IEnumerable<KeyValuePair<TKey, TAccumulate>>> AggregateByAsync<T, TKey, TAccumulate>(
    this Observable<T> source,
    Func<T, TKey> keySelector,
    TAccumulate seed,
    Func<TAccumulate, T, TAccumulate> func,
    IEqualityComparer<TKey>? comparer = null,
    CancellationToken cancellationToken = default)

すべてのキーに対して同一のシード値を使用します。keySelector で要素からキーを抽出し、同じキーの要素に対して func を順次適用して累積します。

シードセレクター

csharp
public static Task<IEnumerable<KeyValuePair<TKey, TAccumulate>>> AggregateByAsync<T, TKey, TAccumulate>(
    this Observable<T> source,
    Func<T, TKey> keySelector,
    Func<TKey, TAccumulate> seedSelector,
    Func<TAccumulate, T, TAccumulate> func,
    IEqualityComparer<TKey>? comparer = null,
    CancellationToken cancellationToken = default)

キーに応じて異なるシード値を生成する seedSelector を指定します。各グループの初期値をキーに基づいて動的に決定したい場合に使用します。

オーバーロードの使い分け

オーバーロード用途
固定シード値全グループで同じ初期値を使う場合(例:すべてのカウントを 0 から開始)
シードセレクターキーに応じて初期値を変えたい場合

3. マーブルダイアグラム

AggregateByAsync のマーブルダイアグラム

ソースシーケンスの各要素がキーごとにグループ化され、各グループで個別に累積されます。OnCompleted 後に全グループの累積結果がキーと値のペアとして返されます。

4. サンプルコード

csharp
// 部門ごとの売上合計を計算
var sales = Observable.Create<(string Department, int Amount)>(observer =>
{
    observer.OnNext(("営業", 100));
    observer.OnNext(("開発", 200));
    observer.OnNext(("営業", 150));
    observer.OnNext(("開発", 300));
    observer.OnNext(("営業", 80));
    observer.OnCompleted();
    return Disposable.Empty;
});

var result = await sales.AggregateByAsync(
    keySelector: x => x.Department,
    seed: 0,
    func: (acc, x) => acc + x.Amount);

foreach (var kv in result)
{
    Console.WriteLine($"{kv.Key}: {kv.Value}");
}
// 営業: 330
// 開発: 500
csharp
// シードセレクターを使用:キーに応じた初期値
var items = Observable.Create<(string Category, int Value)>(observer =>
{
    observer.OnNext(("A", 10));
    observer.OnNext(("B", 20));
    observer.OnNext(("A", 30));
    observer.OnCompleted();
    return Disposable.Empty;
});

var result = await items.AggregateByAsync(
    keySelector: x => x.Category,
    seedSelector: key => key == "A" ? 100 : 0,
    func: (acc, x) => acc + x.Value);

foreach (var kv in result)
{
    Console.WriteLine($"{kv.Key}: {kv.Value}");
}
// A: 140  (100 + 10 + 30)
// B: 20   (0 + 20)