Create
1. 概要
Create は、最も柔軟なファクトリメソッドです。subscribe 関数内で Observer<T> を直接操作し、値の発行(OnNext)、完了(OnCompleted)、エラー(OnErrorResume)を手動で制御できます。
同期パターン(IDisposable を返す)と非同期パターン(ValueTask を返す)の両方をサポートしています。TState 付きオーバーロードを使うと、クロージャによるアロケーションを回避できます。
2. シグネチャ
同期(IDisposable)
public static Observable<T> Create<T>(Func<Observer<T>, IDisposable> subscribe, bool rawObserver = false)購読時にコールバックが呼ばれ、Observer<T> で値を発行できます。戻り値の IDisposable は購読解除時に破棄されます。
Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnCompleted();
return Disposable.Empty;
})ステート付き同期
public static Observable<T> Create<T, TState>(TState state, Func<Observer<T>, TState, IDisposable> subscribe, bool rawObserver = false)state を渡すことでクロージャのキャプチャを避け、アロケーションを削減できます。
Observable.Create<int, MyState>(myState, (observer, s) =>
{
observer.OnNext(s.Value);
observer.OnCompleted();
return Disposable.Empty;
})非同期(ValueTask)
public static Observable<T> Create<T>(Func<Observer<T>, CancellationToken, ValueTask> subscribe, bool rawObserver = false)非同期の subscribe 関数です。CancellationToken で購読解除時のキャンセルが通知されます。シーケンスを完了させるには、subscribe 関数内で observer.OnCompleted() を明示的に呼び出します。
Observable.Create<int>(async (observer, ct) =>
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(100, ct);
observer.OnNext(i);
}
observer.OnCompleted();
})ステート付き非同期
public static Observable<T> Create<T, TState>(TState state, Func<Observer<T>, TState, CancellationToken, ValueTask> subscribe, bool rawObserver = false)ステート付きの非同期版です。state でクロージャを回避します。
rawObserver パラメータ
rawObserver は通常 false のまま使います。true にすると observer.Wrap() による中継を省き、下流の Observer<T> を直接使います。詳しい違いは rawObserver を参照してください。
オーバーロードの使い分け
| オーバーロード | 使う場面 |
|---|---|
Create<T>(Func<Observer<T>, IDisposable>) | 同期的に値を発行する、またはリソースのクリーンアップが必要 |
Create<T, TState>(TState, ...) | アロケーション削減が必要(ステート付き) |
Create<T>(Func<Observer<T>, CancellationToken, ValueTask>) | 非同期処理内で値を発行する |
Create<T, TState>(TState, ... ValueTask) | 非同期+アロケーション削減 |
3. マーブルダイアグラム
subscribe 関数内で Observer<T> に対して任意のタイミングで値を発行できます。同期版・非同期版のどちらも、シーケンスを完了させるには OnCompleted() を呼び出します。
4. サンプルコード
using R3;
// === 同期版: 固定シーケンスの生成 ===
Observable.Create<int>(observer =>
{
observer.OnNext(10);
observer.OnNext(20);
observer.OnNext(30);
observer.OnCompleted();
return Disposable.Empty;
})
.Subscribe(x => Console.WriteLine($"値: {x}"));
// 出力: 値: 10
// 出力: 値: 20
// 出力: 値: 30
// === 非同期版: 定期的に値を発行 ===
Observable.Create<string>(async (observer, ct) =>
{
var messages = new[] { "Hello", "World", "R3" };
foreach (var msg in messages)
{
await Task.Delay(500, ct);
observer.OnNext(msg);
}
observer.OnCompleted();
})
.Subscribe(
x => Console.WriteLine($"メッセージ: {x}"),
_ => Console.WriteLine("完了"));
// 出力(500ms ごと): メッセージ: Hello
// 出力: メッセージ: World
// 出力: メッセージ: R3
// 出力: 完了
// === ステート付き: クロージャを回避 ===
var config = new { Start = 1, Count = 5 };
Observable.Create<int, (int Start, int Count)>((config.Start, config.Count), (observer, state) =>
{
for (int i = 0; i < state.Count; i++)
{
observer.OnNext(state.Start + i);
}
observer.OnCompleted();
return Disposable.Empty;
})
.Subscribe(x => Console.WriteLine(x));
// 出力: 1, 2, 3, 4, 55. 補足
CreateFrom との違い
CreateFrom は IAsyncEnumerable ベースで値を発行します。値を順番に yield return するだけの場合は CreateFrom のほうが簡潔に記述できます。Create は Observer を直接操作するため、非順次的な発行や複雑なエラーハンドリングなど、より柔軟な制御が可能です。
Defer との違い
Defer は既存の Observable の生成を購読時まで遅延させるだけであり、値の発行ロジック自体は内部の Observable に委ねます。Create は値の発行ロジックそのものを定義します。