メインコンテンツにスキップ

Dart でストリームを作成する

執筆者: Lasse Nielsen
2013年4月 (2021年5月更新)

dart:async ライブラリには、多くの Dart API にとって重要な 2 つの型があります: StreamFuture. Future が単一の計算結果を表すのに対し、ストリームは結果のシーケンスです。ストリームをリッスンすると、結果 (データとエラーの両方) およびストリームの終了が通知されます。リッスン中に一時停止したり、完了する前にストリームのリッスンを停止したりすることもできます。

しかし、この記事はストリームを使用することについての記事ではありません。独自のストリームを作成することについての記事です。ストリームはいくつかの方法で作成できます。

  • 既存のストリームを変換する。
  • async* 関数を使用して、ゼロからストリームを作成する。
  • StreamController を使用してストリームを作成する。

この記事では、各アプローチのコードを示し、ストリームを正しく実装するためのヒントを提供します。

ストリームの使用方法については、非同期プログラミング: ストリームを参照してください。

既存のストリームを変換する

#

ストリームを作成する一般的なケースは、すでにストリームがあり、元のストリームのイベントに基づいて新しいストリームを作成したい場合です。たとえば、UTF-8 でデコードして文字列のストリームに変換したいバイトのストリームがあるかもしれません。最も一般的なアプローチは、元のストリームのイベントを待機し、新しいイベントを出力する新しいストリームを作成することです。例

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

多くの一般的な変換には、map()where()expand()take() などの Stream 提供の変換メソッドを使用できます。

たとえば、1 秒ごとに増加するカウンターを放出する counterStream というストリームがあるとします。実装は次のようになります。

dart
var counterStream = Stream<int>.periodic(
  const Duration(seconds: 1),
  (x) => x,
).take(15);

イベントをすばやく確認するには、次のようなコードを使用できます。

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

ストリームイベントを変換するには、リッスンする前にストリームで map() などの変換メソッドを呼び出します。このメソッドは新しいストリームを返します。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

map() の代わりに、次のような他の変換メソッドを使用することもできます。

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

多くの場合、変換メソッドがあれば十分です。しかし、変換に対してさらに制御が必要な場合は、Streamtransform() メソッドを使用して StreamTransformer を指定できます。プラットフォームライブラリは、多くの一般的なタスク用のストリームトランスフォーマーを提供します。たとえば、次のコードは、dart:convert ライブラリで提供される utf8.decoder および LineSplitter トランスフォーマーを使用しています。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

ゼロからストリームを作成する

#

新しいストリームを作成する 1 つの方法は、非同期ジェネレーター (async*) 関数を使用することです。ストリームは関数が呼び出されたときに作成され、関数の本体はストリームがリッスンされたときに実行を開始します。関数が返ると、ストリームは閉じます。関数が返るまで、yield または yield* ステートメントを使用してストリームにイベントを放出できます。

これは、一定の間隔で数値を放出する基本的な例です。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

この関数は Stream を返します。そのストリームがリッスンされると、本体の実行が開始されます。指定された間隔で遅延し、次の数値を yield します。maxCount パラメータが省略されている場合、ループに停止条件はありません。そのため、ストリームは無限に増加する数値を放出します。または、リスナーがサブスクリプションをキャンセルするまで。

リスナーがキャンセルすると (listen() メソッドが返す StreamSubscription オブジェクトで cancel() を呼び出すことにより)、次に本体が yield ステートメントに達すると、yieldreturn ステートメントとして機能します。囲んでいる finally ブロックが実行され、関数が終了します。関数が終了する前に値を yield しようとすると、失敗して return として機能します。

関数が最終的に終了すると、cancel() メソッドが返した Future が完了します。関数がエラーで終了した場合、Future はそのエラーで完了します。それ以外の場合は、null で完了します。

別の、より有用な例は、Future のシーケンスをストリームに変換する関数です。

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

この関数は futures イテラブルから新しい Future を要求し、その Future を待機し、結果の値を放出し、ループします。Future がエラーで完了した場合、ストリームはそのエラーで完了します。

async* 関数がゼロからストリームを構築することはまれです。データはどこかから取得する必要があります。ほとんどの場合、その「どこか」は別のストリームです。上記のシーケンスのような場合、データは他の非同期イベントソースから来ます。しかし、多くの場合、async* 関数は単純すぎて、複数のデータソースを簡単に処理できません。そこで StreamController クラスが登場します。

StreamController を使用する

#

ストリームのイベントがプログラムのさまざまな部分から発生し、async 関数でトラバースできるストリームまたは Future からだけではない場合は、StreamController を使用してストリームを作成および入力します。

StreamController は、新しいストリームと、いつでもどこからでもストリームにイベントを追加する方法を提供します。ストリームには、リスナーと一時停止を処理するために必要なすべてのロジックが含まれています。ストリームを返し、コントローラーは自分自身に保持します。

次の例 ( stream_controller_bad.dart より) は、前の例の timedCounter() 関数を実装するために StreamController を基本的には、ただし欠陥のある方法で使用する方法を示しています。このコードは、返すストリームを作成し、タイマーイベントに基づいてデータを取り込みます。タイマーイベントは Future でもストリームイベントでもありません。

baddart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

これまでと同様に、次のようなコードで timedCounter() から返されたストリームを使用できます。

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

この timedCounter() の実装には、いくつかの問題があります。

  • サブスクライバーが存在する前にイベントの生成を開始します。
  • サブスクライバーが一時停止を要求しても、イベントの生成を続けます。

次のセクションで示すように、StreamController を作成するときに onListenonPause などのコールバックを指定することで、これらの問題を両方とも修正できます。

サブスクリプションを待つ

#

原則として、ストリームは作業を開始する前にサブスクライバーを待つ必要があります。async* 関数はこれを自動的に行いますが、StreamController を使用する場合、すべてを自分で制御でき、不必要にイベントを追加できます。ストリームにサブスクライバーがない場合、その StreamController はイベントをバッファリングし、ストリームにサブスクライバーがまったくない場合、メモリリークにつながる可能性があります。

ストリームを使用するコードを次のように変更してみてください。

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

このコードが実行されると、ストリームは作業していますが、最初の 5 秒間は何も表示されません。次にリスナーが追加され、バッファリングされていたため、最初の 5 つ程度のイベントがすべて一度に表示されます。

サブスクリプションの通知を受けるには、StreamController を作成するときに onListen 引数を指定します。onListen コールバックは、ストリームが最初のサブスクライバーを取得したときに呼び出されます。onCancel コールバックを指定した場合、コントローラーが最後のサブスクライバーを失ったときに呼び出されます。前の例では、Timer.periodic() は次のセクションで示すように、onListen ハンドラーに移動する必要があります。

一時停止状態を尊重する

#

リスナーが一時停止を要求したときにイベントを生成しないようにしてください。async* 関数は、ストリームサブスクリプションが一時停止している間、yield ステートメントで自動的に一時停止します。一方、StreamController は一時停止中にイベントをバッファリングします。イベントを提供するコードが一時停止を尊重しない場合、バッファのサイズは無限に増加する可能性があります。また、リスナーが一時停止後すぐにリッスンを停止した場合、バッファを作成するために費やされた作業は無駄になります。

一時停止サポートなしで何が起こるかを確認するには、ストリームを使用するコードを次のように変更してみてください。

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

5 秒間の一時停止が終了すると、その間に発生したイベントがすべて一度に受信されます。これは、ストリームのソースが一時停止を尊重せず、イベントをストリームに追加し続けるためです。そのため、ストリームはイベントをバッファリングし、ストリームが一時停止解除されるとバッファを空にします。

timedCounter() の次のバージョン ( stream_controller.dart より) は、StreamControlleronListenonPauseonResume、および onCancel コールバックを使用して一時停止を実装しています。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
    onListen: startTimer,
    onPause: stopTimer,
    onResume: startTimer,
    onCancel: stopTimer,
  );

  return controller.stream;
}

上記の listenWithPause() 関数でこのコードを実行してください。一時停止中はカウントが停止し、その後スムーズに再開されることがわかります。

一時停止状態の変更を通知するには、すべてのリスナー (onListenonCancelonPauseonResume) を使用する必要があります。その理由は、サブスクリプション状態と一時停止状態が同時に変更された場合、onListen または onCancel コールバックのみが呼び出されるためです。

最終的なヒント

#

async* 関数を使用せずにストリームを作成する場合、これらのヒントを念頭に置いてください。

  • 同期コントローラー (たとえば、StreamController(sync: true) を使用して作成されたもの) を使用する場合は注意してください。一時停止していない同期コントローラーにイベントを送信すると (たとえば、EventSink で定義されている add()addError()、または close() メソッドを使用)、イベントはストリーム上のすべてのリスナーに即座に送信されます。Stream リスナーは、リスナーを追加したコードが完全に返されるまで呼び出してはなりません。不適切なタイミングで同期コントローラーを使用すると、この約束が破られ、正常なコードが失敗する可能性があります。同期コントローラーの使用は避けてください。

  • StreamController を使用する場合、onListen コールバックは listen 呼び出しが StreamSubscription を返す前に呼び出されます。onListen コールバックが、サブスクリプションが既に存在することに依存しないようにしてください。たとえば、次のコードでは、onListen イベントが発生し (handler が呼び出される)、subscription 変数に有効な値が設定される前に、onListen イベントが発生します。

    dart
    subscription = stream.listen(handler);
  • StreamController によって定義された onListenonPauseonResume、および onCancel コールバックは、ストリームのリスナー状態が変更されたときにストリームによって呼び出されますが、イベントの発生中や他の状態変更ハンドラーの呼び出し中には呼び出されません。これらの場合、状態変更コールバックは、前のコールバックが完了するまで遅延されます。

  • Stream インターフェイスを自分で実装しようとしないでください。イベント、コールバック、リスナーの追加と削除の相互作用を微妙に間違えやすいです。常に既存のストリーム (おそらく StreamController から) を使用して、新しいストリームの listen 呼び出しを実装してください。

  • Stream クラスを拡張して、Stream クラスを拡張し、listen メソッドと追加の機能を実装することで、より多くの機能を持つクラスを作成することは可能ですが、ユーザーが考慮する必要がある新しい型が導入されるため、一般的には推奨されません。Stream (以上) であるクラスの代わりに、Stream (以上) を持つクラスを作成できることがよくあります。