目次

非同期プログラミング:ストリーム

Dartの非同期プログラミングは、FutureStreamクラスを特徴としています。

Futureは、すぐに完了しない計算を表します。通常の関数が結果を返すのに対し、非同期関数は結果を最終的に含むFutureを返します。Futureは、結果の準備ができたことを通知します。

ストリームは、非同期イベントのシーケンスです。非同期Iterableのようなもので、要求したときに次のイベントを取得する代わりに、ストリームは準備ができたときにイベントがあることを通知します。

ストリームイベントの受信

#

ストリームは多くの方法で作成できますが(これは別の記事のトピックです)、すべて同じ方法で使用できます。*非同期forループ*(一般的に**await for**と呼ばれます)は、**forループ**がIterableを反復処理するのと同じように、ストリームのイベントを反復処理します。例:

dart
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

このコードは、整数イベントのストリームの各イベントを受け取り、それらを合計し、合計(Future)を返します。ループ本体が終了すると、次のイベントが到着するか、ストリームが完了するまで関数は一時停止します。

関数は`async`キーワードでマークされています。これは**await for**ループを使用する場合に必要です。

次の例では、`async*`関数を使用して単純な整数ストリームを生成することにより、前のコードをテストします。

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

エラーイベント

#

ストリームは、イベントがなくなると完了し、イベントを受信するコードには、新しいイベントが到着したときと同じように、これが通知されます。**await for**ループを使用してイベントを読み取ると、ストリームが完了するとループが停止します。

場合によっては、ストリームが完了する前にエラーが発生することがあります。たとえば、リモートサーバーからファイルを取得中にネットワークエラーが発生したか、イベントを作成するコードにバグがある場合などです。

ストリームは、データイベントと同様にエラーイベントも配信できます。ほとんどのストリームは最初のエラー後に停止しますが、複数のエラーを配信するストリームや、エラーイベント後にさらにデータを提供するストリームも可能です。このドキュメントでは、最大で1つのエラーを配信するストリームのみについて説明します。

**await for**を使用してストリームを読み取ると、エラーはループステートメントによってスローされます。これによりループも終了します。**try-catch**を使用してエラーをキャッチできます。次の例では、ループイテレーターが4に等しいときにエラーをスローします。

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

ストリームの操作

#

Streamクラスには、Iterableのメソッドと同様に、ストリームで一般的な操作を実行できる多くのヘルパーメソッドが含まれています。たとえば、Stream APIの`lastWhere()`を使用して、ストリーム内の最後の正の整数を検索できます。

dart
Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

2種類のストリーム

#

ストリームには2種類あります。

シングルサブスクリプションストリーム

#

最も一般的な種類のストリームには、より大きな全体の一部であるイベントのシーケンスが含まれています。イベントは正しい順序で、欠落することなく配信される必要があります。これは、ファイルを読み取ったり、Webリクエストを受け取ったりする場合に取得するストリームの種類です。

このようなストリームは、一度だけリスンできます。後で再びリスンすると、最初のイベントを見逃す可能性があり、ストリームの残りの部分は意味をなさなくなります。リスンを開始すると、データはチャンクで取得および提供されます。

ブロードキャストストリーム

#

もう1つの種類のストリームは、個々のメッセージを一度に1つずつ処理するために意図されています。たとえば、ブラウザーのマウスイベントにこの種類のストリームを使用できます。

いつでもこのようなストリームのリスンを開始でき、リスンしている間に発生したイベントを取得します。複数のリスナーが同時にリスンでき、以前のサブスクリプションをキャンセルした後でも、後で再びリスンできます。

ストリームを処理するメソッド

#

Stream<T> の次のメソッドはストリームを処理し、結果を返します。

dart
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

`drain()`と`pipe()`を除くこれらの関数はすべて、Iterableの同様の関数に対応しています。それぞれ、`async`関数と**await for**ループを使用して簡単に記述できます(または他のメソッドの1つを使用するだけで済みます)。たとえば、いくつかの実装は次のようになります。

dart
Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (final event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await forEach(result.add);
  return result;
}

Future<String> join([String separator = '']) async =>
    (await toList()).join(separator);

(実際の実装は少し複雑ですが、主に歴史的な理由によるものです。)

ストリームを変更するメソッド

#

Streamの次のメソッドは、元のストリームに基づいて新しいストリームを返します。新しいストリームでリスンする人がいるまで、それぞれが元のストリームをリスンしません。

dart
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

上記のメソッドは、Iterableを変換して別のIterableにするIterableの同様のメソッドに対応しています。これらはすべて、`async`関数と**await for**ループを使用して簡単に記述できます。

dart
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

`asyncExpand()`と`asyncMap()`関数は`expand()`と`map()`に似ていますが、関数引数を非同期関数にすることができます。`distinct()`関数は`Iterable`には存在しませんが、存在することもできました。

dart
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink)? onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最後の3つの関数は、より特殊です。これらはエラー処理を伴いますが、await forループはエラー処理を行うことができません。ループに到達した最初のエラーによってループとストリームへのサブスクリプションが終了します。そこから回復することはできません。次のコードは、await forループで使用する前に、handleError()を使用してストリームからエラーを削除する方法を示しています。

dart
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

transform()関数

#

transform()関数は、エラー処理のためだけではありません。ストリームのためのより一般的な「map」です。通常のmapは、受信する各イベントに対して1つの値を必要とします。しかし、特にI/Oストリームでは、出力イベントを生成するために複数の受信イベントが必要になる場合があります。StreamTransformerはこのような状況に対応できます。例えば、Utf8Decoderのようなデコーダーはトランスフォーマーです。トランスフォーマーは、bind()という1つの関数のみを必要とし、これはasync関数によって簡単に実装できます。

ファイルの読み取りとデコード

#

次のコードはファイルを読み取り、ストリームに対して2つの変換を実行します。まずデータをUTF8から変換し、次にLineSplitterに通します。ハッシュタグ#で始まる行を除くすべての行が出力されます。

dart
import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

listen()メソッド

#

Streamの最終的なメソッドはlisten()です。これは「低レベル」のメソッドであり、他のすべてのストリーム関数はlisten()に基づいて定義されています。

dart
StreamSubscription<T> listen(void Function(T event)? onData,
    {Function? onError, void Function()? onDone, bool? cancelOnError});

新しいStream型を作成するには、Streamクラスを拡張し、listen()メソッドを実装するだけです。Streamの他のすべてのメソッドは、動作するためにlisten()を呼び出します。

listen()メソッドを使用すると、ストリームをリスンし始めることができます。リスンするまで、ストリームは、表示したいイベントを記述する不活性なオブジェクトです。リスンすると、イベントを生成するアクティブなストリームを表すStreamSubscriptionオブジェクトが返されます。これは、Iterableがオブジェクトのコレクションに過ぎず、イテレータが実際の反復処理を行う方法に似ています。

ストリームサブスクリプションを使用すると、サブスクリプションの一時停止、一時停止後の再開、完全なキャンセルを行うことができます。各データイベントまたはエラーイベント、およびストリームが閉じられたときに呼び出されるコールバックを設定できます。

その他のリソース

#

Dartでのストリームと非同期プログラミングの詳細については、次のドキュメントを参照してください。