Skip to content

Create

1. 概要

Create は、最も柔軟なファクトリメソッドです。subscribe 関数内で Observer<T> を直接操作し、値の発行(OnNext)、完了(OnCompleted)、エラー(OnErrorResume)を手動で制御できます。

同期パターン(IDisposable を返す)と非同期パターン(ValueTask を返す)の両方をサポートしています。TState 付きオーバーロードを使うと、クロージャによるアロケーションを回避できます。

2. シグネチャ

同期(IDisposable)

csharp
public static Observable<T> Create<T>(Func<Observer<T>, IDisposable> subscribe, bool rawObserver = false)

購読時にコールバックが呼ばれ、Observer<T> で値を発行できます。戻り値の IDisposable は購読解除時に破棄されます。

csharp
Observable.Create<int>(observer =>
{
    observer.OnNext(1);
    observer.OnNext(2);
    observer.OnCompleted();
    return Disposable.Empty;
})

ステート付き同期

csharp
public static Observable<T> Create<T, TState>(TState state, Func<Observer<T>, TState, IDisposable> subscribe, bool rawObserver = false)

state を渡すことでクロージャのキャプチャを避け、アロケーションを削減できます。

csharp
Observable.Create<int, MyState>(myState, (observer, s) =>
{
    observer.OnNext(s.Value);
    observer.OnCompleted();
    return Disposable.Empty;
})

非同期(ValueTask)

csharp
public static Observable<T> Create<T>(Func<Observer<T>, CancellationToken, ValueTask> subscribe, bool rawObserver = false)

非同期の subscribe 関数です。CancellationToken で購読解除時のキャンセルが通知されます。シーケンスを完了させるには、subscribe 関数内で observer.OnCompleted() を明示的に呼び出します。

csharp
Observable.Create<int>(async (observer, ct) =>
{
    for (int i = 0; i < 5; i++)
    {
        await Task.Delay(100, ct);
        observer.OnNext(i);
    }
    observer.OnCompleted();
})

ステート付き非同期

csharp
public static Observable<T> Create<T, TState>(TState state, Func<Observer<T>, TState, CancellationToken, ValueTask> subscribe, bool rawObserver = false)

ステート付きの非同期版です。state でクロージャを回避します。

rawObserver パラメータ

rawObserver は通常 false のまま使います。true にすると observer.Wrap() による中継を省き、下流の Observer<T> を直接使います。詳しい違いは rawObserver を参照してください。

オーバーロードの使い分け

オーバーロード使う場面
Create<T>(Func<Observer<T>, IDisposable>)同期的に値を発行する、またはリソースのクリーンアップが必要
Create<T, TState>(TState, ...)アロケーション削減が必要(ステート付き)
Create<T>(Func<Observer<T>, CancellationToken, ValueTask>)非同期処理内で値を発行する
Create<T, TState>(TState, ... ValueTask)非同期+アロケーション削減

3. マーブルダイアグラム

Create のマーブルダイアグラム

subscribe 関数内で Observer<T> に対して任意のタイミングで値を発行できます。同期版・非同期版のどちらも、シーケンスを完了させるには OnCompleted() を呼び出します。

4. サンプルコード

csharp
using R3;

// === 同期版: 固定シーケンスの生成 ===
Observable.Create<int>(observer =>
    {
        observer.OnNext(10);
        observer.OnNext(20);
        observer.OnNext(30);
        observer.OnCompleted();
        return Disposable.Empty;
    })
    .Subscribe(x => Console.WriteLine($"値: {x}"));
// 出力: 値: 10
// 出力: 値: 20
// 出力: 値: 30

// === 非同期版: 定期的に値を発行 ===
Observable.Create<string>(async (observer, ct) =>
    {
        var messages = new[] { "Hello", "World", "R3" };
        foreach (var msg in messages)
        {
            await Task.Delay(500, ct);
            observer.OnNext(msg);
        }
        observer.OnCompleted();
    })
    .Subscribe(
        x => Console.WriteLine($"メッセージ: {x}"),
        _ => Console.WriteLine("完了"));
// 出力(500ms ごと): メッセージ: Hello
// 出力: メッセージ: World
// 出力: メッセージ: R3
// 出力: 完了

// === ステート付き: クロージャを回避 ===
var config = new { Start = 1, Count = 5 };

Observable.Create<int, (int Start, int Count)>((config.Start, config.Count), (observer, state) =>
    {
        for (int i = 0; i < state.Count; i++)
        {
            observer.OnNext(state.Start + i);
        }
        observer.OnCompleted();
        return Disposable.Empty;
    })
    .Subscribe(x => Console.WriteLine(x));
// 出力: 1, 2, 3, 4, 5

5. 補足

CreateFrom との違い

CreateFromIAsyncEnumerable ベースで値を発行します。値を順番に yield return するだけの場合は CreateFrom のほうが簡潔に記述できます。CreateObserver を直接操作するため、非順次的な発行や複雑なエラーハンドリングなど、より柔軟な制御が可能です。

Defer との違い

Defer は既存の Observable の生成を購読時まで遅延させるだけであり、値の発行ロジック自体は内部の Observable に委ねます。Create は値の発行ロジックそのものを定義します。