メインコンテンツにスキップ

非同期プログラミング:ストリーム

Dart の非同期プログラミングは、Future クラスと Stream クラスによって特徴付けられます。

Future は、すぐに完了しない計算を表します。通常の関数が結果を返すのに対し、非同期関数は Future を返します。これは最終的に結果を含むようになります。Future は、結果がいつ準備できたかを教えてくれます。

ストリームは、非同期イベントのシーケンスです。非同期の Iterable のようなものです。要求したときに次のイベントを取得するのではなく、ストリームはイベントが準備できたときにそれを通知します。

ストリームイベントの受信

#

ストリームはさまざまな方法で作成できますが、これは別の記事のトピックです。しかし、それらはすべて同じ方法で使用できます。非同期 for ループ(一般に単に await for と呼ばれる)は、for ループIterable を反復処理するのと同じように、ストリームのイベントを反復処理します。たとえば、

dart
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

このコードは、整数イベントのストリームの各イベントを受信し、それらを合計して、合計(の Future)を返します。ループ本体が終了すると、次のイベントが到着するかストリームが終了するまで、関数は一時停止します。

関数は async キーワードでマークされています。これは、await for ループを使用する場合に必要です。

次の例は、async* 関数を使用して整数の単純なストリームを生成することにより、前のコードをテストします。

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

エラーイベント

#

ストリームには、それ以上イベントがないときに終了します。イベントを受信するコードは、新しいイベントが到着したときと同じように、これを知らされます。await for ループを使用してイベントを読み取る場合、ストリームが終了するとループは停止します。

場合によっては、ストリームが終了する前にエラーが発生します。たとえば、リモートサーバーからファイルをフェッチ中にネットワークが失敗したか、イベントを作成するコードにバグがある可能性がありますが、誰かがそれを知る必要があります。

ストリームは、データイベントを配信するのと同じように、エラーイベントを配信することもできます。ほとんどのストリームは最初のエラーの後に停止しますが、複数のエラーを配信するストリームや、エラーイベントの後にさらにデータを配信するストリームも可能です。このドキュメントでは、最大1つのエラーを配信するストリームのみを議論します。

await for を使用してストリームを読み取るとき、ループステートメントによってエラーがスローされます。これにより、ループも終了します。try-catch を使用してエラーをキャッチできます。次の例では、ループイテレータが4に等しいときにエラーをスローします。

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

ストリームの操作

#

Stream クラスには、Iterable のメソッドと同様に、ストリームに対する一般的な操作を実行できる多くのヘルパーメソッドが含まれています。たとえば、Stream API の lastWhere() を使用して、ストリーム内の最後の正の整数を見つけることができます。

dart
Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

2種類のストリーム

#

ストリームには2種類あります。

シングルサブスクリプションストリーム

#

最も一般的な種類のストリームは、より大きな全体の一部であるイベントのシーケンスを含みます。イベントは、正しい順序で、欠落することなく配信される必要があります。これは、ファイルを読み取ったり、Webリクエストを受信したりするときに得られる種類のストリームです。

そのようなストリームは一度しかリッスンできません。後でもう一度リッスンすると、最初のイベントを見逃してしまう可能性があり、残りのストリームは意味をなさなくなります。リッスンを開始すると、データはチャンクで取得され、提供されます。

ブロードキャストストリーム

#

もう1種類のストリームは、個々のメッセージを一度に処理できるようにすることを目的としています。この種のストリームは、たとえばブラウザのマウスイベントに使用できます。

そのようなストリームはいつでもリッスンを開始でき、リッスン中に発生したイベントを取得できます。同時に複数のリスナーがリッスンでき、以前のサブスクリプションをキャンセルした後、後で再度リッスンすることもできます。

ストリームを処理するメソッド

#

Stream<T> の次のメソッドは、ストリームを処理し、結果を返します。

dart
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

これらの関数はすべて、drain()pipe() を除き、Iterable の同様の関数に対応しています。それぞれは、await for ループ(または他のメソッドのいずれかを使用)を使用した async 関数を使用して簡単に記述できます。たとえば、いくつかの実装は次のようになります。

dart
Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (final event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await forEach(result.add);
  return result;
}

Future<String> join([String separator = '']) async =>
    (await toList()).join(separator);

(実際の実装はわずかに複雑ですが、主に歴史的な理由によるものです。)

ストリームを変更するメソッド

#

Stream の次のメソッドは、元のストリームに基づいた新しいストリームを返します。それぞれが、新しいストリームがリッスンするまで、元のストリームをリッスンするのを待ちます。

dart
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

前のメソッドは、Iterable の同様のメソッドに対応しており、Iterable を別の Iterable に変換します。これらはすべて、await for ループを使用した async 関数を使用して簡単に記述できます。

dart
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

asyncExpand() および asyncMap() 関数は、expand() および map() と似ていますが、関数引数を非同期関数にすることができます。distinct() 関数は Iterable には存在しませんが、存在し得ました。

dart
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(
  Duration timeLimit, {
  void Function(EventSink<T> sink)? onTimeout,
});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最後の3つの関数はより特殊化されています。これらは、await for ループが直接管理できないエラー処理を伴います。最初に見つかったエラーは、ループとそのストリームサブスクリプションを終了します。回復のための組み込みメカニズムはありません。

次のコードは、handleError() を使用して、await for ループで消費される前にストリームからエラーをフィルタリングする方法を示しています。

dart
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));

  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

前の例では、ストリームからイベントが発行されない場合、await for ループは決して返されません。これを回避するには、timeout() 関数を使用して新しいストリームを作成します。timeout() を使用すると、時間制限を設定し、返されたストリームでイベントの発行を続けることができます。

次のコードは、前の例を変更したものです。2秒のタイムアウトを追加し、2秒以上イベントが発生しない場合は関連するエラーを生成します。

dart
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  var streamWithTimeout = streamWithoutErrors.timeout(
    const Duration(seconds: 2),
    onTimeout: (eventSink) {
      eventSink.addError('Timed out after 2 seconds');
      eventSink.close();
    },
  );

  await for (final event in streamWithTimeout) {
    yield convert(event);
  }
}

transform() 関数

#

transform() 関数はエラー処理のためだけではありません。ストリームのより一般的な「マップ」です。通常のマップは、受信するイベントごとに1つの値が必要です。しかし、特に I/O ストリームの場合、出力イベントを生成するために複数の受信イベントが必要になる場合があります。 StreamTransformer はそれを行うことができます。たとえば、Utf8Decoder のようなデコーダーはトランスフォーマーです。トランスフォーマーは1つの関数、bind() のみ必要とし、これは async 関数で簡単に実装できます。

ファイルの読み込みとデコード

#

次のコードは、ファイルを読み取り、ストリームに対して2つの変換を実行します。まず、データをUTF8から変換し、次に LineSplitter を通します。ハッシュタグ「#」で始まる行を除き、すべての行が印刷されます。

dart
import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

listen() メソッド

#

Stream の最後のメソッドは listen() です。これは「低レベル」メソッドであり、他のすべてのストリーム関数は listen() で定義されています。

dart
StreamSubscription<T> listen(
  void Function(T event)? onData, {
  Function? onError,
  void Function()? onDone,
  bool? cancelOnError,
});

新しい Stream タイプを作成するには、Stream クラスを拡張し、listen() メソッドを実装するだけでかまいません。Stream の他のすべてのメソッドは、機能するために listen() を呼び出します。

listen() メソッドを使用すると、ストリームをリッスンできます。そうするまで、ストリームは表示したいイベントを記述する不活性なオブジェクトです。リッスンすると、イベントを生成するアクティブなストリームを表す StreamSubscription オブジェクトが返されます。これは、Iterable が単なるオブジェクトのコレクションであり、イテレータが実際のイテレーションを実行するのと同じようなものです。

ストリームサブスクリプションを使用すると、サブスクリプションを一時停止したり、一時停止後に再開したり、完全にキャンセルしたりできます。各データイベントまたはエラーイベント、およびストリームが閉じられたときに呼び出されるコールバックを設定できます。

その他のリソース

#

Dart でストリームと非同期プログラミングを使用することの詳細については、次のドキュメントをお読みください。