Skip to content

TakeUntil

1. 概要

TakeUntil は、外部のシグナル(別の Observable、CancellationToken、Task)や条件(predicate)が満たされるまでソースシーケンスの要素を取得するオペレーターです。シグナルが発生した時点でシーケンスを完了させます。多様なトリガーに対応する複数のオーバーロードを提供します。

2. シグネチャ

別の Observable がトリガー

csharp
public static Observable<T> TakeUntil<T, TOther>(
    this Observable<T> source,
    Observable<TOther> other)

other が最初の要素を発行した時点でソースシーケンスを完了させます。

csharp
// stopSignal が発行されるまで要素を取得
source.TakeUntil(stopSignal);

CancellationToken がトリガー

csharp
public static Observable<T> TakeUntil<T>(
    this Observable<T> source,
    CancellationToken cancellationToken)

cancellationToken がキャンセルされた時点でシーケンスを完了させます。

csharp
// キャンセルトークンでシーケンスを制御
source.TakeUntil(cts.Token);

Task がトリガー

csharp
public static Observable<T> TakeUntil<T>(
    this Observable<T> source,
    Task task,
    Boolean configureAwait = true)

task が完了した時点でシーケンスを完了させます。

csharp
// Task の完了をトリガーにする
source.TakeUntil(someAsyncTask);

非同期関数がトリガー

csharp
public static Observable<T> TakeUntil<T>(
    this Observable<T> source,
    Func<T, CancellationToken, ValueTask> asyncFunc,
    Boolean configureAwait = true)

各要素に対して非同期関数を実行し、関数が完了した時点でシーケンスを完了させます。

csharp
// 非同期処理の完了をトリガーにする
source.TakeUntil(async (x, ct) => await ProcessAsync(x, ct));

条件関数がトリガー

csharp
public static Observable<T> TakeUntil<T>(
    this Observable<T> source,
    Func<T, Boolean> predicate)

predicatetrue を返す要素まで(その要素を含めて)取得し、シーケンスを完了させます。

csharp
// 値が 10 以上になった時点で完了(その要素を含む)
source.TakeUntil(x => x >= 10);

インデックス付き条件関数がトリガー

csharp
public static Observable<T> TakeUntil<T>(
    this Observable<T> source,
    Func<T, Int32, Boolean> predicate)

条件関数の第 2 引数に 0 始まりのインデックスが渡されます。

csharp
// 5 番目の要素まで、または値が負になるまで
source.TakeUntil((x, index) => index >= 5 || x < 0);

overload の使い分け

overload使う場面
Observable<TOther>別のイベントストリームをトリガーにしたい場合
CancellationTokenキャンセルトークンでライフサイクルを管理している場合
Task非同期タスクの完了をトリガーにしたい場合
Func<T, CancellationToken, ValueTask>要素ごとの非同期処理完了をトリガーにしたい場合
Func<T, Boolean>要素の値に基づいてシーケンスを終了したい場合
Func<T, Int32, Boolean>要素の値とインデックスで終了を判定したい場合

3. マーブルダイアグラム

TakeUntil のマーブルダイアグラム

Observable トリガーの場合、トリガーシーケンスが最初の要素を発行した時点でソースシーケンスが完了します。predicate の場合、条件が true になった要素まで(その要素を含めて)通過させ、完了します。

4. サンプルコード

csharp
using R3;

// 別の Observable をトリガーにする
var source = Observable.Interval(TimeSpan.FromMilliseconds(200));
var stopper = Observable.Timer(TimeSpan.FromSeconds(1));

source
    .TakeUntil(stopper)
    .Subscribe(
        x => Console.WriteLine($"値: {x}"),
        () => Console.WriteLine("完了"));
// 出力(おおよそ):
// 値: 0
// 値: 1
// 値: 2
// 値: 3
// 値: 4
// 完了

// CancellationToken を使った例
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(2));

Observable.Interval(TimeSpan.FromMilliseconds(500))
    .TakeUntil(cts.Token)
    .Subscribe(x => Console.WriteLine($"値: {x}"));

// 条件関数を使った例(条件を満たす要素を含めて取得)
var values = new[] { 1, 3, 5, 8, 12, 4 };

values.ToObservable()
    .TakeUntil(x => x >= 10)
    .Subscribe(x => Console.WriteLine($"値: {x}"));
// 出力:
// 値: 1
// 値: 3
// 値: 5
// 値: 8
// 値: 12  ← 条件を満たした要素も含まれる

5. 補足

TakeWhile との違い

  • TakeUntil(predicate): 条件が true になる要素まで取得します。条件を満たした要素も含まれます
  • TakeWhile: 条件が true取得します。条件が false になった要素は含まれません
csharp
var values = new[] { 1, 3, 5, 8, 12, 4 };

// TakeUntil: x >= 10 になるまで(12 を含む)
source.TakeUntil(x => x >= 10);  // → 1, 3, 5, 8, 12

// TakeWhile: x < 10 の間
source.TakeWhile(x => x < 10);    // → 1, 3, 5, 8

SkipUntil との関係

SkipUntilTakeUntil の逆で、シグナルが発生するまでの要素をスキップし、シグナル後の要素を通過させます。