Synchronize
1. 概要
Synchronize は、Observable の通知(OnNext、OnErrorResume、OnCompleted)を lock ベースの排他制御で直列化するオペレーターです。複数のスレッドから同時に通知が発行される可能性がある場合に、スレッドセーフな通知配信を保証します。
2. シグネチャ
パラメータなし(内部 gate を使用)
public static Observable<T> Synchronize<T>(
this Observable<T> source)内部的に新しい object を gate として生成し、すべての通知を排他制御します。
gate を指定
public static Observable<T> Synchronize<T>(
this Observable<T> source,
object gate)指定した gate オブジェクトを使用して通知を排他制御します。同じ gate を共有することで、複数の Observable 間で同期を取ることができます。
オーバーロードの使い分け
- 単一の Observable を保護する → パラメータなしオーバーロードを使用。
- 複数の Observable 間で同期を取る → 同じ
gateオブジェクトを指定します。
3. マーブルダイアグラム
複数のスレッドから同時に発行された通知が、lock により直列化されて一つずつ配信されます。
4. サンプルコード
// 内部 gate で同期
var synchronized = observable.Synchronize();
synchronized.Subscribe(x => Console.WriteLine($"値: {x}"));
// 外部 gate を使用して複数の Observable を同期
var gate = new object();
var syncA = observableA.Synchronize(gate);
var syncB = observableB.Synchronize(gate);
syncA.Subscribe(x => Console.WriteLine($"A: {x}"));
syncB.Subscribe(x => Console.WriteLine($"B: {x}"));
// A と B の通知は同じ lock で排他制御される
// マルチスレッド環境での使用例
var subject = new Subject<int>();
var safe = subject.AsObservable().Synchronize();
// 複数のスレッドから安全に通知を送信可能
Parallel.For(0, 100, i =>
{
subject.OnNext(i);
});5. 補足
Trampoline との違いに注意してください。Synchronize は lock を使用してマルチスレッドからの通知を排他制御します。一方、Trampoline はキューを使用して再入(同じスレッド内での再帰的な通知)を防止します。
使用する場面が異なります:
- 複数スレッドから通知が発行される場合 →
Synchronizeを使用。 - OnNext ハンドラ内から同じ Observable に通知を発行する場合 →
Trampolineを使用。
デッドロックへの注意
Synchronize は lock を取得した状態で下流の OnNext、OnErrorResume、OnCompleted を呼び出します。そのため、購読ハンドラ内で別のロックを取得したり、Task.Wait() や .Result などで同期的に待機したり、別の Subject / Observable へ同期的に通知を戻したりすると、ロック取得順序によってデッドロックする可能性があります。
とくに Merge、CombineLatest、Zip、ZipLatest、Switch、Concat、SelectMany、Delay、Debounce、Throttle 系、Timeout など、複数ソースや時間制御を扱うために内部で排他制御を行うオペレーターと組み合わせる場合は、通知中にブロッキング処理や複数ロックの取得を行わないよう注意してください。
たとえば次のコードは、2 つのスレッドが gateA と gateB を逆順に取得しようとするため、タイミングによってデッドロックします。
using R3;
var gateA = new object();
var gateB = new object();
var subjectA = new Subject<int>();
var subjectB = new Subject<int>();
subjectA
.Synchronize(gateA)
.Subscribe(_ =>
{
// gateA を保持したまま gateB を取得しようとする
lock (gateB)
{
Console.WriteLine("A");
}
});
subjectB
.Synchronize(gateB)
.Subscribe(_ =>
{
// gateB を保持したまま gateA を取得しようとする
lock (gateA)
{
Console.WriteLine("B");
}
});
Parallel.Invoke(
() => subjectA.OnNext(1),
() => subjectB.OnNext(2));Synchronize を使う場合でも、OnNext 内ではできるだけ短時間で処理を終え、外部ロックの取得や同期的な待機を避けるのが安全です。通知を別スレッドへ逃がしたい場合は、用途に応じて ObserveOn や SubscribeOn などのスケジューリング系オペレーターも検討してください。