Skip to content

SubscribeAwait

1. 概要

SubscribeAwait は、非同期の OnNext ハンドラで Observable を購読する購読 API です。AwaitOperation パラメータにより、複数の非同期操作が同時に発生した場合の振る舞いを制御できます。通常の Subscribe では同期的なハンドラしか指定できませんが、SubscribeAwait を使うことで async/await パターンを活用した非同期処理を購読内で実行できます。

2. シグネチャ

基本形(onNextAsync のみ)

csharp
public static IDisposable SubscribeAwait<T>(
    this Observable<T> source,
    Func<T, CancellationToken, ValueTask> onNextAsync,
    AwaitOperation awaitOperation = AwaitOperation.Sequential,
    bool configureAwait = true,
    bool cancelOnCompleted = false,
    int maxConcurrent = -1)

最もシンプルなオーバーロードです。非同期の OnNext ハンドラのみを指定します。エラーは ObservableSystem.GetUnhandledExceptionHandler() に委譲されます。

onErrorResume 付き

csharp
public static IDisposable SubscribeAwait<T>(
    this Observable<T> source,
    Func<T, CancellationToken, ValueTask> onNextAsync,
    Action<Exception> onErrorResume,
    AwaitOperation awaitOperation = AwaitOperation.Sequential,
    bool configureAwait = true,
    bool cancelOnCompleted = false,
    int maxConcurrent = -1)

エラー発生時のコールバックを追加で指定します。エラーが発生してもシーケンスの購読を継続します。

onErrorResume + onCompleted 付き

csharp
public static IDisposable SubscribeAwait<T>(
    this Observable<T> source,
    Func<T, CancellationToken, ValueTask> onNextAsync,
    Action<Exception> onErrorResume,
    Action<Result> onCompleted,
    AwaitOperation awaitOperation = AwaitOperation.Sequential,
    bool configureAwait = true,
    bool cancelOnCompleted = false,
    int maxConcurrent = -1)

エラーと完了の両方のコールバックを指定する完全なオーバーロードです。

AwaitOperation の種類

AwaitOperation 列挙型は、非同期操作の同時実行制御を定義します。

説明
Sequential前の非同期操作が完了するまで次を待機する(デフォルト)
Drop前の操作が実行中の場合、新しい値を破棄する
Switch新しい値が来たら前の操作をキャンセルして切り替える
Parallelすべての非同期操作を並列に実行する
SequentialParallelmaxConcurrent で制限された並列実行
ThrottleFirstLast最初と最後の値のみを処理する

オーバーロードの使い分け

  • シンプルな非同期処理 → 基本形を使用。エラーハンドリングはグローバルハンドラに任せます。
  • エラーを個別に処理したいonErrorResume 付きオーバーロードを使用。
  • 完了通知も必要 → 完全なオーバーロードを使用。

3. サンプルコード

csharp
// 基本的な非同期購読(Sequential モード)
var subscription = observable.SubscribeAwait(async (value, ct) =>
{
    await ProcessAsync(value, ct);
}, AwaitOperation.Sequential);

// Drop モード: 処理中の値がある場合、新しい値を破棄
var subscription2 = observable.SubscribeAwait(async (value, ct) =>
{
    await Task.Delay(TimeSpan.FromSeconds(1), ct);
    Console.WriteLine($"処理完了: {value}");
}, AwaitOperation.Drop);

// Switch モード: 新しい値が来たら前の処理をキャンセル
var subscription3 = observable.SubscribeAwait(async (value, ct) =>
{
    await Task.Delay(TimeSpan.FromSeconds(1), ct);
    Console.WriteLine($"処理完了: {value}");
}, AwaitOperation.Switch);

// エラーハンドリング付き
var subscription4 = observable.SubscribeAwait(
    async (value, ct) =>
    {
        await ProcessAsync(value, ct);
    },
    onErrorResume: ex => Console.WriteLine($"エラー: {ex.Message}"),
    onCompleted: result => Console.WriteLine($"完了: {result}"),
    awaitOperation: AwaitOperation.Sequential
);

// Parallel モードで並列処理
var subscription5 = observable.SubscribeAwait(async (value, ct) =>
{
    await ProcessAsync(value, ct);
}, AwaitOperation.Parallel);

// SequentialParallel モードで同時実行数を制限
var subscription6 = observable.SubscribeAwait(async (value, ct) =>
{
    await ProcessAsync(value, ct);
}, AwaitOperation.SequentialParallel, maxConcurrent: 3);