Dart でのストリームの作成
執筆者:Lasse Nielsen
2013年4月(2021年5月更新)
dart:async ライブラリには、多くの Dart API で重要な 2 つの型が含まれています。Stream と Future。Future が単一の計算の結果を表すのに対し、ストリームは結果のシーケンスです。ストリームをリッスンして、結果(データとエラーの両方)とストリームのシャットダウンを通知されます。また、リッスン中に一時停止したり、ストリームが完了する前にリッスンを停止することもできます。
しかし、この記事はストリームの使用方法に関するものではありません。独自のストリームを作成する方法についてです。ストリームを作成するにはいくつかの方法があります。
- 既存のストリームを変換する。
async*
関数を使用してストリームをゼロから作成する。StreamController
を使用してストリームを作成する。
この記事では、各アプローチのコードを示し、ストリームを正しく実装するためのヒントを示します。
ストリームの使用方法については、非同期プログラミング:ストリーム を参照してください。
既存のストリームの変換
#ストリームを作成する一般的なケースは、既にストリームがあり、元のストリームのイベントに基づいて新しいストリームを作成したい場合です。たとえば、バイトのストリームがあり、入力を UTF-8 でデコードして文字列のストリームに変換したい場合があります。最も一般的なアプローチは、元のストリームのイベントを待機してから新しいイベントを出力する新しいストリームを作成することです。例
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
// Stores any partial line from the previous chunk.
var partial = '';
// Wait until a new chunk is available, then process it.
await for (final chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // Prepend partial line.
partial = lines.removeLast(); // Remove new partial line.
for (final line in lines) {
yield line; // Add lines to output stream.
}
}
// Add final partial line to output stream, if any.
if (partial.isNotEmpty) yield partial;
}
多くの一般的な変換では、map()
、where()
、expand()
、take()
などの、Stream
が提供する変換メソッドを使用できます。
たとえば、1 秒ごとに増加するカウンターを発行するストリーム counterStream
があるとします。実装方法は次のとおりです。
var counterStream =
Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);
イベントをすばやく確認するには、次のようなコードを使用できます。
counterStream.forEach(print); // Print an integer every second, 15 times.
ストリームイベントを変換するには、リッスンする前にストリームで map()
などの変換メソッドを呼び出すことができます。このメソッドは新しいストリームを返します。
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);
map()
の代わりに、次のようないずれかの変換メソッドを使用できます。
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.
多くの場合、変換メソッドだけで十分です。ただし、変換をさらに細かく制御する必要がある場合は、Stream
の transform()
メソッドで StreamTransformer を指定できます。プラットフォームライブラリは、多くの一般的なタスクに対してストリームトランスフォーマーを提供しています。たとえば、次のコードは、dart:convert ライブラリによって提供される utf8.decoder
と LineSplitter
トランスフォーマーを使用しています。
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(const LineSplitter())
.toList();
ストリームからの新規作成
#新しいストリームを作成する1つの方法は、非同期ジェネレーター(async*
)関数を使用することです。ストリームは関数が呼び出されたときに作成され、関数の本体はストリームがリッスンされると実行を開始します。関数が返されると、ストリームは閉じます。関数が返されるまで、yield
または yield*
ステートメントを使用してストリームにイベントを発行できます。
一定の間隔で数値を発行する基本的な例を次に示します。
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
この関数は Stream
を返します。そのストリームがリッスンされると、本体の実行が開始されます。要求された間隔で繰り返し遅延した後、次の数値を生成します。maxCount
パラメーターが省略されている場合、ループには停止条件がないため、ストリームはリスナーがサブスクリプションをキャンセルするまで、または無限に増加する数値を出力します。
リスナーが(listen()
メソッドによって返された StreamSubscription
オブジェクトで cancel()
を呼び出すことで)キャンセルすると、本体が次に yield
ステートメントに到達したときに、yield
は return
ステートメントとして機能します。任意の囲んでいる finally
ブロックが実行され、関数が終了します。関数が終了する前に値を生成しようとすると、失敗し、return として機能します。
関数が最終的に終了すると、cancel()
メソッドによって返された Future が完了します。関数がエラーで終了した場合、Future はそのエラーで完了します。そうでない場合は、null
で完了します。
もう1つの、より便利な例は、Future のシーケンスをストリームに変換する関数です。
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for (final future in futures) {
var result = await future;
yield result;
}
}
この関数は、futures
イテラブルに新しい Future を要求し、その Future を待ち、結果の値を発行し、ループします。Future がエラーで完了した場合、ストリームはそのエラーで完了します。
async*
関数が何もない状態からストリームを構築することはまれです。どこかからデータを取得する必要があり、ほとんどの場合、その場所は別のストリームです。上記の Future のシーケンスのように、データが他の非同期イベントソースから取得される場合もあります。ただし、多くの場合、async*
関数は複数のデータソースを容易に処理するには単純すぎます。そこで、StreamController
クラスが登場します。
StreamController の使用
#ストリームのイベントがプログラムの異なる部分から発生し、async
関数でトラバースできるストリームや Future からではない場合、StreamController を使用してストリームを作成および設定します。
StreamController
は、新しいストリームと、任意の時点、任意の場所からストリームにイベントを追加する方法を提供します。ストリームには、リスナーと一時停止を処理するために必要なすべてのロジックがあります。ストリームを返し、コントローラーを自分自身で使用します。
次の例(stream_controller_bad.dart から)は、前の例からの timedCounter()
関数を実装するために、StreamController
の基本的な、ただし欠陥のある使用方法を示しています。このコードは返すストリームを作成し、タイマーイベント(Future でもストリームイベントでもない)に基づいてデータを入力します。
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
return controller.stream;
}
前述のように、次のように timedCounter()
によって返されたストリームを使用できます。
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.
timedCounter()
のこの実装には、いくつかの問題があります。
- サブスクライバーがいない状態でイベントの生成を開始します。
- サブスクライバーが一時停止を要求した場合でも、イベントの生成を続けます。
次のセクションで示すように、StreamController
を作成するときに onListen
や onPause
などのコールバックを指定することで、これらの問題の両方を修正できます。
サブスクリプションの待機
#原則として、ストリームは、作業を開始する前にサブスクライバーを待つ必要があります。async*
関数はこれを自動的に行いますが、StreamController
を使用する場合、完全に制御でき、必要のない場合でもイベントを追加できます。ストリームにサブスクライバーがいない場合、その StreamController
はイベントをバッファリングし、ストリームがサブスクライバーを取得しない場合、メモリリークにつながる可能性があります。
ストリームを使用するコードを次のように変更してみてください。
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// After 5 seconds, add a listener.
await for (final n in counterStream) {
print(n); // Print an integer every second, 15 times.
}
}
このコードを実行すると、ストリームは動作しているにもかかわらず、最初の 5 秒間は何も出力されません。その後、リスナーが追加され、最初の 5 個程度のイベントが一度にすべて出力されます。これは、StreamController
によってバッファリングされていたためです。
サブスクリプションを通知するには、StreamController
を作成するときに onListen
引数を指定します。onListen
コールバックは、ストリームが最初のサブスクライバーを取得したときに呼び出されます。onCancel
コールバックを指定した場合、コントローラーが最後のサブスクライバーを失ったときに呼び出されます。前の例では、Timer.periodic()
は次のセクションで示すように、onListen
ハンドラーに移動する必要があります。
一時停止状態の尊重
#リスナーが一時停止を要求したときにイベントを生成することは避けてください。async*
関数は、ストリームサブスクリプションが一時停止されている間、yield
ステートメントで自動的に一時停止します。一方、StreamController
は、一時停止中にイベントをバッファリングします。イベントを提供するコードが一時停止を尊重しない場合、バッファーのサイズは無限に大きくなる可能性があります。また、リスナーが一時停止直後にリッスンを停止した場合、バッファーの作成に費やされた作業は無駄になります。
一時停止サポートがない場合に何が起こるかを確認するには、ストリームを使用するコードを次のように変更してみてください。
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
late StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // Print an integer every second.
if (counter == 5) {
// After 5 ticks, pause for five seconds, then resume.
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
5秒間のポーズが終了すると、その間に発生したイベントがすべて一度に受信されます。これは、ストリームのソースがポーズを無視し、ストリームにイベントを追加し続けるためです。そのため、ストリームはイベントをバッファリングし、ストリームがポーズ解除されるとバッファを空にします。
以下のtimedCounter()
のバージョン(stream_controller.dart参照)は、StreamController
のonListen
、onPause
、onResume
、onCancel
コールバックを使用してポーズを実装します。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer?.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
上記のlistenWithPause()
関数を使用してこのコードを実行してください。ポーズ中はカウントが停止し、その後正常に再開されることがわかります。
ポーズ状態の変化を通知されるには、すべてのリスナー(onListen
、onCancel
、onPause
、onResume
)を使用する必要があります。これは、サブスクリプションとポーズの状態が同時に変更された場合、onListen
またはonCancel
コールバックのみが呼び出されるためです。
最終的なヒント
#async*関数を使用せずにストリームを作成する場合は、これらのヒントを心に留めておいてください。
同期コントローラー(たとえば、
StreamController(sync: true)
を使用して作成されたコントローラー)を使用する際には注意が必要です。ポーズ解除された同期コントローラーでイベントを送信する場合(たとえば、EventSinkで定義されているadd()
、addError()
、close()
メソッドを使用する場合)、イベントはストリーム上のすべてのリスナーにすぐに送信されます。Stream
リスナーは、リスナーを追加したコードが完全に返されるまで呼び出されるべきではなく、間違ったタイミングで同期コントローラーを使用すると、この約束が破られ、正常なコードが失敗する可能性があります。同期コントローラーの使用は避けてください。StreamController
を使用する場合、onListen
コールバックはlisten
呼び出しがStreamSubscription
を返す前に呼び出されます。onListen
コールバックが既に存在するサブスクリプションに依存しないようにしてください。たとえば、次のコードでは、subscription
変数に有効な値が割り当てられる前に、onListen
イベントが発生し(そしてhandler
が呼び出されます)。dartsubscription = stream.listen(handler);
StreamController
で定義されたonListen
、onPause
、onResume
、onCancel
コールバックは、ストリームのリスナーの状態が変更されたときにストリームによって呼び出されますが、イベントの発生中または他の状態変更ハンドラーの呼び出し中は決して呼び出されません。これらの場合、状態変更コールバックは、前のコールバックが完了するまで遅延されます。自分で
Stream
インターフェースを実装しようとしないでください。イベント、コールバック、リスナーの追加と削除間の相互作用を微妙に間違えるのは簡単です。新しいストリームのlisten
呼び出しを実装するには、常に既存のストリーム(StreamController
からのストリームなど)を使用してください。Stream
クラスを拡張し、listen
メソッドと追加の機能を実装することで、より多くの機能を持つStream
を拡張するクラスを作成することは可能ですが、一般的には推奨されません。これは、ユーザーが考慮しなければならない新しい型を導入するためです。であるStream
(およびそれ以上)のクラスではなく、多くの場合、持つStream
(およびそれ以上)のクラスを作成できます。
特に記載がない限り、このサイトのドキュメントはDart 3.5.3を反映しています。ページは2021年5月16日に最終更新されました。 ソースコードを見る または 問題を報告する。