目次

Dart でのストリームの作成

執筆者:Lasse Nielsen
2013年4月(2021年5月更新)

dart:async ライブラリには、多くの Dart API で重要な 2 つの型が含まれています。StreamFuture。Future が単一の計算の結果を表すのに対し、ストリームは結果のシーケンスです。ストリームをリッスンして、結果(データとエラーの両方)とストリームのシャットダウンを通知されます。また、リッスン中に一時停止したり、ストリームが完了する前にリッスンを停止することもできます。

しかし、この記事はストリームの使用方法に関するものではありません。独自のストリームを作成する方法についてです。ストリームを作成するにはいくつかの方法があります。

  • 既存のストリームを変換する。
  • async* 関数を使用してストリームをゼロから作成する。
  • StreamController を使用してストリームを作成する。

この記事では、各アプローチのコードを示し、ストリームを正しく実装するためのヒントを示します。

ストリームの使用方法については、非同期プログラミング:ストリーム を参照してください。

既存のストリームの変換

#

ストリームを作成する一般的なケースは、既にストリームがあり、元のストリームのイベントに基づいて新しいストリームを作成したい場合です。たとえば、バイトのストリームがあり、入力を UTF-8 でデコードして文字列のストリームに変換したい場合があります。最も一般的なアプローチは、元のストリームのイベントを待機してから新しいイベントを出力する新しいストリームを作成することです。例

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

多くの一般的な変換では、map()where()expand()take() などの、Stream が提供する変換メソッドを使用できます。

たとえば、1 秒ごとに増加するカウンターを発行するストリーム counterStream があるとします。実装方法は次のとおりです。

dart
var counterStream =
    Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);

イベントをすばやく確認するには、次のようなコードを使用できます。

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

ストリームイベントを変換するには、リッスンする前にストリームで map() などの変換メソッドを呼び出すことができます。このメソッドは新しいストリームを返します。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

map() の代わりに、次のようないずれかの変換メソッドを使用できます。

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

多くの場合、変換メソッドだけで十分です。ただし、変換をさらに細かく制御する必要がある場合は、Streamtransform() メソッドで StreamTransformer を指定できます。プラットフォームライブラリは、多くの一般的なタスクに対してストリームトランスフォーマーを提供しています。たとえば、次のコードは、dart:convert ライブラリによって提供される utf8.decoderLineSplitter トランスフォーマーを使用しています。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

ストリームからの新規作成

#

新しいストリームを作成する1つの方法は、非同期ジェネレーター(async*)関数を使用することです。ストリームは関数が呼び出されたときに作成され、関数の本体はストリームがリッスンされると実行を開始します。関数が返されると、ストリームは閉じます。関数が返されるまで、yield または yield* ステートメントを使用してストリームにイベントを発行できます。

一定の間隔で数値を発行する基本的な例を次に示します。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

この関数は Stream を返します。そのストリームがリッスンされると、本体の実行が開始されます。要求された間隔で繰り返し遅延した後、次の数値を生成します。maxCount パラメーターが省略されている場合、ループには停止条件がないため、ストリームはリスナーがサブスクリプションをキャンセルするまで、または無限に増加する数値を出力します。

リスナーが(listen() メソッドによって返された StreamSubscription オブジェクトで cancel() を呼び出すことで)キャンセルすると、本体が次に yield ステートメントに到達したときに、yieldreturn ステートメントとして機能します。任意の囲んでいる finally ブロックが実行され、関数が終了します。関数が終了する前に値を生成しようとすると、失敗し、return として機能します。

関数が最終的に終了すると、cancel() メソッドによって返された Future が完了します。関数がエラーで終了した場合、Future はそのエラーで完了します。そうでない場合は、null で完了します。

もう1つの、より便利な例は、Future のシーケンスをストリームに変換する関数です。

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

この関数は、futures イテラブルに新しい Future を要求し、その Future を待ち、結果の値を発行し、ループします。Future がエラーで完了した場合、ストリームはそのエラーで完了します。

async* 関数が何もない状態からストリームを構築することはまれです。どこかからデータを取得する必要があり、ほとんどの場合、その場所は別のストリームです。上記の Future のシーケンスのように、データが他の非同期イベントソースから取得される場合もあります。ただし、多くの場合、async* 関数は複数のデータソースを容易に処理するには単純すぎます。そこで、StreamController クラスが登場します。

StreamController の使用

#

ストリームのイベントがプログラムの異なる部分から発生し、async 関数でトラバースできるストリームや Future からではない場合、StreamController を使用してストリームを作成および設定します。

StreamController は、新しいストリームと、任意の時点、任意の場所からストリームにイベントを追加する方法を提供します。ストリームには、リスナーと一時停止を処理するために必要なすべてのロジックがあります。ストリームを返し、コントローラーを自分自身で使用します。

次の例(stream_controller_bad.dart から)は、前の例からの timedCounter() 関数を実装するために、StreamController の基本的な、ただし欠陥のある使用方法を示しています。このコードは返すストリームを作成し、タイマーイベント(Future でもストリームイベントでもない)に基づいてデータを入力します。

悪いdart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

前述のように、次のように timedCounter() によって返されたストリームを使用できます。

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter() のこの実装には、いくつかの問題があります。

  • サブスクライバーがいない状態でイベントの生成を開始します。
  • サブスクライバーが一時停止を要求した場合でも、イベントの生成を続けます。

次のセクションで示すように、StreamController を作成するときに onListenonPause などのコールバックを指定することで、これらの問題の両方を修正できます。

サブスクリプションの待機

#

原則として、ストリームは、作業を開始する前にサブスクライバーを待つ必要があります。async* 関数はこれを自動的に行いますが、StreamController を使用する場合、完全に制御でき、必要のない場合でもイベントを追加できます。ストリームにサブスクライバーがいない場合、その StreamController はイベントをバッファリングし、ストリームがサブスクライバーを取得しない場合、メモリリークにつながる可能性があります。

ストリームを使用するコードを次のように変更してみてください。

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

このコードを実行すると、ストリームは動作しているにもかかわらず、最初の 5 秒間は何も出力されません。その後、リスナーが追加され、最初の 5 個程度のイベントが一度にすべて出力されます。これは、StreamController によってバッファリングされていたためです。

サブスクリプションを通知するには、StreamController を作成するときに onListen 引数を指定します。onListen コールバックは、ストリームが最初のサブスクライバーを取得したときに呼び出されます。onCancel コールバックを指定した場合、コントローラーが最後のサブスクライバーを失ったときに呼び出されます。前の例では、Timer.periodic() は次のセクションで示すように、onListen ハンドラーに移動する必要があります。

一時停止状態の尊重

#

リスナーが一時停止を要求したときにイベントを生成することは避けてください。async* 関数は、ストリームサブスクリプションが一時停止されている間、yield ステートメントで自動的に一時停止します。一方、StreamController は、一時停止中にイベントをバッファリングします。イベントを提供するコードが一時停止を尊重しない場合、バッファーのサイズは無限に大きくなる可能性があります。また、リスナーが一時停止直後にリッスンを停止した場合、バッファーの作成に費やされた作業は無駄になります。

一時停止サポートがない場合に何が起こるかを確認するには、ストリームを使用するコードを次のように変更してみてください。

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

5秒間のポーズが終了すると、その間に発生したイベントがすべて一度に受信されます。これは、ストリームのソースがポーズを無視し、ストリームにイベントを追加し続けるためです。そのため、ストリームはイベントをバッファリングし、ストリームがポーズ解除されるとバッファを空にします。

以下のtimedCounter()のバージョン(stream_controller.dart参照)は、StreamControlleronListenonPauseonResumeonCancelコールバックを使用してポーズを実装します。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

上記のlistenWithPause()関数を使用してこのコードを実行してください。ポーズ中はカウントが停止し、その後正常に再開されることがわかります。

ポーズ状態の変化を通知されるには、すべてのリスナー(onListenonCancelonPauseonResume)を使用する必要があります。これは、サブスクリプションとポーズの状態が同時に変更された場合、onListenまたはonCancelコールバックのみが呼び出されるためです。

最終的なヒント

#

async*関数を使用せずにストリームを作成する場合は、これらのヒントを心に留めておいてください。

  • 同期コントローラー(たとえば、StreamController(sync: true)を使用して作成されたコントローラー)を使用する際には注意が必要です。ポーズ解除された同期コントローラーでイベントを送信する場合(たとえば、EventSinkで定義されているadd()addError()close()メソッドを使用する場合)、イベントはストリーム上のすべてのリスナーにすぐに送信されます。Streamリスナーは、リスナーを追加したコードが完全に返されるまで呼び出されるべきではなく、間違ったタイミングで同期コントローラーを使用すると、この約束が破られ、正常なコードが失敗する可能性があります。同期コントローラーの使用は避けてください。

  • StreamControllerを使用する場合、onListenコールバックはlisten呼び出しがStreamSubscriptionを返す前に呼び出されます。onListenコールバックが既に存在するサブスクリプションに依存しないようにしてください。たとえば、次のコードでは、subscription変数に有効な値が割り当てられる前に、onListenイベントが発生し(そしてhandlerが呼び出されます)。

    dart
    subscription = stream.listen(handler);
  • StreamControllerで定義されたonListenonPauseonResumeonCancelコールバックは、ストリームのリスナーの状態が変更されたときにストリームによって呼び出されますが、イベントの発生中または他の状態変更ハンドラーの呼び出し中は決して呼び出されません。これらの場合、状態変更コールバックは、前のコールバックが完了するまで遅延されます。

  • 自分でStreamインターフェースを実装しようとしないでください。イベント、コールバック、リスナーの追加と削除間の相互作用を微妙に間違えるのは簡単です。新しいストリームのlisten呼び出しを実装するには、常に既存のストリーム(StreamControllerからのストリームなど)を使用してください。

  • Streamクラスを拡張し、listenメソッドと追加の機能を実装することで、より多くの機能を持つStreamを拡張するクラスを作成することは可能ですが、一般的には推奨されません。これは、ユーザーが考慮しなければならない新しい型を導入するためです。であるStream(およびそれ以上)のクラスではなく、多くの場合、持つStream(およびそれ以上)のクラスを作成できます。