Isolate
このページでは、`Isolate` APIを使用してIsolateを実装するいくつかの例について説明します。
アプリケーションが、他の計算を一時的にブロックするほど大きな計算を処理している場合は、常にIsolateを使用する必要があります。最も一般的な例はFlutterアプリケーションで、UIが応答しなくなる可能性のある大規模な計算を実行する必要がある場合です。
Isolateを *必ず* 使用しなければならないルールはありませんが、Isolateが役立つ状況をいくつか紹介します。
- 非常に大きなJSON BLOBの解析とデコード。
- 写真、音声、ビデオの処理と圧縮。
- 音声ファイルとビデオファイルの変換。
- 大規模なリスト内またはファイルシステム内での複雑な検索とフィルタリングの実行。
- データベースとの通信などのI/Oの実行。
- 大量のネットワークリクエストの処理。
シンプルなワーカーIsolateの実装
#これらの例では、シンプルなワーカーIsolateを生成するメインIsolateを実装します。`Isolate.run()`は、ワーカーIsolateの設定と管理の手順を簡素化します。
- Isolateを生成(開始と作成)します。
- 生成されたIsolateで関数を実行します。
- 結果を取得します。
- 結果をメインIsolateに返します。
- 作業が完了したらIsolateを終了します。
- 例外とエラーをチェック、取得し、メインIsolateに送り返します。
新しいIsolateで既存のメソッドを実行する
#- `run()`を呼び出して新しいIsolate(バックグラウンドワーカー)を生成します。これは、`main()`が結果を待っている間にメインIsolateで直接行われます。
const String filename = 'with_keys.json';
void main() async {
// Read some data.
final jsonData = await Isolate.run(_readAndParseJson);
// Use that data.
print('Number of JSON keys: ${jsonData.length}');
}
- 実行させたい関数を、最初の引数としてワーカーIsolateに渡します。この例では、既存の関数`_readAndParseJson()`です。
Future<Map<String, dynamic>> _readAndParseJson() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
}
`Isolate.run()`は`_readAndParseJson()`が返す結果を受け取り、その値をメインIsolateに送り返し、ワーカーIsolateをシャットダウンします。
ワーカーIsolateは、結果を保持しているメモリをメインIsolateに *転送* します。データを *コピー* しません。ワーカーIsolateは、オブジェクトが転送を許可されていることを確認するために検証パスを実行します。
`_readAndParseJson()`は、メインIsolateで直接実行することもできる既存の非同期関数です。代わりに`Isolate.run()`を使用して実行することで、並行処理が可能になります。ワーカーIsolateは`_readAndParseJson()`の計算を完全に抽象化します。メインIsolateをブロックせずに完了できます。
`Isolate.run()`の結果は常にFutureです。なぜなら、メインIsolateのコードは実行を継続するためです。ワーカーIsolateが実行する計算が同期か非同期かは、メインIsolateには影響しません。どちらの場合も、並行して実行されるためです。
完全なプログラムについては、send_and_receive.dartのサンプルをご覧ください。
Isolateでクロージャを送信する
#メインIsolateで関数リテラルまたはクロージャを使用して、`run()`でシンプルなワーカーIsolateを作成することもできます。
const String filename = 'with_keys.json';
void main() async {
// Read some data.
final jsonData = await Isolate.run(() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
});
// Use that data.
print('Number of JSON keys: ${jsonData.length}');
}
この例は、前の例と同じことを行います。新しいIsolateが生成され、何かを計算し、結果を送り返します。
ただし、ここでIsolateはクロージャを送信します。クロージャは、その機能とコードへの書き込み方法の両方において、一般的な名前付き関数よりも制限が少ないです。この例では、`Isolate.run()`は、並行してローカルコードのように見えるものを実行します。その意味で、`run()`は「並列で実行する」ための制御フロー演算子のように動作すると考えることができます。
ポートを使用してIsolate間で複数のメッセージを送受信する
#短命のIsolateは使い勝手が良いですが、新しいIsolateを生成し、オブジェクトをIsolate間でコピーするためにパフォーマンスのオーバーヘッドが必要です。コードが`Isolate.run`を使用して同じ計算を繰り返し実行することに依存している場合、すぐに終了しない長命のIsolateを作成することでパフォーマンスを向上させることができます。
そのためには、`Isolate.run`が抽象化しているいくつかの低レベルIsolate APIを使用できます。
このセクションでは、新しく生成されたIsolateとメインIsolate間の双方向通信を確立するために必要な手順について説明します。最初の例である基本的なポートでは、このプロセスを高いレベルで紹介します。2番目の例である堅牢なポートでは、現実世界の機能を徐々に追加していきます。
`ReceivePort`と`SendPort`
#Isolate間の長命な通信を設定するには、`Isolate`に加えて、`ReceivePort`と`SendPort`の2つのクラスが必要です。これらのポートは、Isolateがお互いに通信できる唯一の方法です。
`ReceivePort`は、他のIsolateから送信されたメッセージを処理するオブジェクトです。これらのメッセージは`SendPort`を介して送信されます。
ポートは`Stream`オブジェクトと同様に動作します(実際、受信ポートは`Stream`を実装しています!)。`SendPort`と`ReceivePort`を、それぞれStreamの`StreamController`とリスナーのように考えることができます。`SendPort`は`StreamController`のようなものです。なぜなら、`SendPort.send()`メソッドを使用してメッセージを追加し、これらのメッセージはリスナー(この場合は`ReceivePort`)によって処理されるためです。`ReceivePort`は、提供したコールバックに引数として渡すことで、受信したメッセージを処理します。
ポートの設定
#新しく生成されたIsolateは、`Isolate.spawn`呼び出しを介して受信した情報しか持ちません。メインIsolateが生成されたIsolateの初期作成後も通信を続ける必要がある場合は、生成されたIsolateがメインIsolateにメッセージを送信できる通信チャネルを設定する必要があります。Isolateはメッセージパッシングによってのみ通信できます。お互いのメモリの中を見ることができません。これが「Isolate」という名前の由来です。
この双方向通信を設定するには、まずメインIsolateで`ReceivePort`を作成し、`Isolate.spawn`で新しいIsolateを生成するときに、その`SendPort`を引数として渡します。新しいIsolateは独自の`ReceivePort`を作成し、メインIsolateから渡された`SendPort`で *その* `SendPort`を返送します。メインIsolateはこの`SendPort`を受信し、これで両側がメッセージの送受信を行うためのオープンチャネルを持つことになります。
- メインアイソレートで
ReceivePort
を作成します。SendPort
はReceivePort
のプロパティとして自動的に作成されます。 Isolate.spawn()
を使用してワーカーアイソレートを生成します。ReceivePort.sendPort
への参照を、ワーカーアイソレートへの最初のメッセージとして渡します。- ワーカーアイソレートで別の新しい
ReceivePort
を作成します。 - ワーカーアイソレートの
ReceivePort.sendPort
への参照を、メインアイソレートに最初のメッセージとして返送します。
ポートの作成と通信の設定に加えて、各ReceivePort
がメッセージを受信したときに何をするかを指定する必要があります。これは、各ReceivePort
のlisten
メソッドを使用して行います。
- メインアイソレートのワーカーアイソレートの
SendPort
への参照を介してメッセージを送信します。 - ワーカーアイソレートの
ReceivePort
のリスナーを介してメッセージを受信し、処理します。これは、メインアイソレートから移動したい計算が実行される場所です。 - ワーカーアイソレートのメインアイソレートの
SendPort
への参照を介して返信を送信します。 - メインアイソレートの
ReceivePort
のリスナーを介してメッセージを受信します。
基本的なポートの例
#この例では、メインアイソレートとの間で双方向通信を行う長寿命のワーカーアイソレートを設定する方法を示しています。コードは、JSONテキストを新しいアイソレートに送信する例を使用しており、JSONは解析およびデコードされた後、メインアイソレートに送り返されます。
ステップ1:ワーカークラスの定義
#まず、バックグラウンドワーカーアイソレートのクラスを作成します。このクラスには、必要なすべての機能が含まれています。
- アイソレートを生成する。
- そのアイソレートにメッセージを送信する。
- アイソレートにJSONをデコードさせる。
- デコードされたJSONをメインアイソレートに送り返す。
このクラスは、ワーカーアイソレートを生成するメソッドと、そのワーカーアイソレートにメッセージを送信するメソッドという2つのパブリックメソッドを公開します。
この例の後続のセクションでは、クラスメソッドを1つずつどのように埋めていくかを示します。
class Worker {
Future<void> spawn() async {
// TODO: Add functionality to spawn a worker isolate.
}
void _handleResponsesFromIsolate(dynamic message) {
// TODO: Handle messages sent back from the worker isolate.
}
static void _startRemoteIsolate(SendPort port) {
// TODO: Define code that should be executed on the worker isolate.
}
Future<void> parseJson(String message) async {
// TODO: Define a public method that can
// be used to send messages to the worker isolate.
}
}
ステップ2:ワーカーアイソレートの生成
#Worker.spawn
メソッドは、ワーカーアイソレートの作成と、メッセージの送受信を確実にできるようにするコードをグループ化する場所です。
- まず、
ReceivePort
を作成します。これにより、メインアイソレートは、新しく生成されたワーカーアイソレートから送信されたメッセージを受信できます。 - 次に、ワーカーアイソレートが送り返すメッセージを処理するために、受信ポートにリスナーを追加します。リスナーに渡されるコールバック
_handleResponsesFromIsolate
については、ステップ4で説明します。 - 最後に、
Isolate.spawn
を使用してワーカーアイソレートを生成します。これは、ワーカーアイソレートで実行される関数(ステップ3で説明)、および受信ポートのsendPort
プロパティという2つの引数を受け取ります。
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
receivePort.sendPort
引数は、ワーカーアイソレートで呼び出されるときに、コールバック(_startRemoteIsolate
)に引数として渡されます。これは、ワーカーアイソレートがメインアイソレートにメッセージを送り返す方法を確保する最初のステップです。
ステップ3:ワーカーアイソレートでのコードの実行
#このステップでは、ワーカーアイソレートに送信されて生成時に実行される_startRemoteIsolate
メソッドを定義します。このメソッドは、ワーカーアイソレートの「メイン」メソッドのようなものです。
- まず、別の新しい
ReceivePort
を作成します。このポートは、メインアイソレートからの将来のメッセージを受信します。 - 次に、そのポートの
SendPort
をメインアイソレートに送り返します。 - 最後に、新しい
ReceivePort
にリスナーを追加します。このリスナーは、メインアイソレートがワーカーアイソレートに送信するメッセージを処理します。
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
ワーカーのReceivePort
のリスクナーは、メインアイソレートから渡されたJSONをデコードしてから、デコードされたJSONをメインアイソレートに送り返します。
このリスナーは、メインアイソレートからワーカーアイソレートに送信されるメッセージのエントリポイントです。**これは、将来ワーカーアイソレートに実行させるコードを伝える唯一の機会です。**
ステップ4:メインアイソレートでのメッセージの処理
#最後に、メインアイソレートがワーカーアイソレートからメインアイソレートに送り返されたメッセージをどのように処理するかを指定する必要があります。そのためには、_handleResponsesFromIsolate
メソッドを埋める必要があります。ステップ2で説明したように、このメソッドはreceivePort.listen
メソッドに渡されます。
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
また、ステップ3でSendPort
をメインアイソレートに送り返したことを思い出してください。このメソッドは、そのSendPort
の受信と、将来のメッセージ(デコードされたJSONになります)の処理を処理します。
- まず、メッセージが
SendPort
かどうかを確認します。もしそうであれば、そのポートをクラスの_sendPort
プロパティに割り当てて、後でメッセージを送信できるようにします。 - 次に、メッセージが
Map<String, dynamic>
型(デコードされたJSONの予想される型)かどうかを確認します。もしそうであれば、アプリケーション固有のロジックでそのメッセージを処理します。この例では、メッセージが出力されます。
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
ステップ5:アイソレートが設定されるまでを保証するためのコンプリターの追加
#クラスを完成させるために、ワーカーアイソレートにメッセージを送信する役割を担うparseJson
というパブリックメソッドを定義します。また、アイソレートが完全に設定される前にメッセージを送信できることを保証する必要があります。これを処理するには、Completer
を使用します。
- まず、
Completer
と呼ばれるクラスレベルのプロパティを追加し、_isolateReady
という名前を付けます。 - 次に、ステップ4で作成した
_handleResponsesFromIsolate
メソッドで、メッセージがSendPort
の場合にコンプリターでcomplete()
を呼び出します。 - 最後に、
parseJson
メソッドで、_sendPort.send
を追加する前にawait _isolateReady.future
を追加します。これにより、ワーカーアイソレートが生成され、そのSendPort
をメインアイソレートに送り返すまで、ワーカーアイソレートにメッセージを送信できなくなります。
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
完全な例
#完全な例を見るには展開してください
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = Worker();
await worker.spawn();
await worker.parseJson('{"key":"value"}');
}
class Worker {
late SendPort _sendPort;
final Completer<void> _isolateReady = Completer.sync();
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
}
堅牢なポートの例
#前の例では、双方向通信を行う長寿命アイソレートを設定するために必要な基本的な構成要素について説明しました。前述のように、その例には、エラー処理、使用しなくなったポートを閉じる機能、および状況によってはメッセージの順序に関する不整合など、いくつかの重要な機能が欠けています。
この例は、これらの追加機能などを備えた長寿命のワーカーアイソレートを作成し、より良い設計パターンに従うことで、最初の例の情報に付け加えています。このコードは最初の例と類似点がありますが、その例の拡張ではありません。
ステップ1:ワーカークラスの定義
#まず、バックグラウンドワーカーアイソレートのクラスを作成します。このクラスには、必要なすべての機能が含まれています。
- アイソレートを生成する。
- そのアイソレートにメッセージを送信する。
- アイソレートにJSONをデコードさせる。
- デコードされたJSONをメインアイソレートに送り返す。
このクラスは、ワーカーアイソレートを作成するメソッド、そのワーカーアイソレートにメッセージを送信するメソッド、使用しなくなったポートをシャットダウンできるメソッドという3つのパブリックメソッドを公開します。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
Future<Object?> parseJson(String message) async {
// TODO: Ensure the port is still open.
_commands.send(message);
}
static Future<Worker> spawn() async {
// TODO: Add functionality to create a new Worker object with a
// connection to a spawned isolate.
throw UnimplementedError();
}
Worker._(this._responses, this._commands) {
// TODO: Initialize main isolate receive port listener.
}
void _handleResponsesFromIsolate(dynamic message) {
// TODO: Handle messages sent back from the worker isolate.
}
static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
// TODO: Handle messages sent back from the worker isolate.
}
static void _startRemoteIsolate(SendPort sp) {
// TODO: Initialize worker isolate's ports.
}
}
ステップ2:Worker.spawn
メソッドでRawReceivePort
を作成する
#アイソレートを生成する前に、より低レベルのReceivePort
であるRawReceivePort
を作成する必要があります。RawReceivePort
を使用することは推奨されるパターンであり、アイソレートの起動ロジックと、アイソレートでのメッセージパッシングを処理するロジックを分離できます。
Worker.spawn
メソッド内では
- まず、
RawReceivePort
を作成します。このReceivePort
は、ワーカーアイソレートから送信される最初のメッセージ(SendPort
になります)のみを受信する役割を担います。 - 次に、アイソレートがメッセージを受信できるようになったことを示す
Completer
を作成します。これが完了すると、ReceivePort
とSendPort
を含むレコードを返します。 - 次に、
RawReceivePort.handler
プロパティを定義します。このプロパティは、ReceivePort.listener
のように動作するFunction?
です。このポートでメッセージが受信されると、関数が呼び出されます。 - ハンドラー関数内で、
connection.complete()
を呼び出します。このメソッドは、ReceivePort
とSendPort
を含むレコードを引数として期待します。SendPort
は、ワーカーアイソレートから送信される最初のメッセージであり、次のステップで_commands
という名前のクラスレベルのSendPort
に割り当てられます。 - 次に、
ReceivePort.fromRawReceivePort
コンストラクターを使用して新しいReceivePort
を作成し、initPort
を渡します。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler.
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// ···
}
最初にRawReceivePort
を作成してからReceivePort
を作成することで、後でReceivePort.listen
に新しいコールバックを追加できます。逆に、ReceivePort
をすぐに作成した場合、ReceivePort
はStream
ではなくBroadcastStream
を実装するため、listener
を1つしか追加できません。
効果的に、これにより、アイソレートの起動ロジックと、通信の設定が完了した後にメッセージの受信を処理するロジックを分離できます。この利点は、他のメソッドのロジックが増えるにつれてより明白になります。
ステップ3:Isolate.spawn
を使用してワーカーアイソレートを生成する
#このステップでは、Worker.spawn
メソッドの入力を続けます。アイソレートを生成するために必要なコードを追加し、このクラスからWorker
のインスタンスを返します。この例では、Isolate.spawn
への呼び出しはtry
/catch
ブロックでラップされており、アイソレートの起動に失敗した場合、initPort
が閉じられ、Worker
オブジェクトが作成されなくなります。
- まず、
try
/catch
ブロックでワーカーアイソレートの生成を試みます。ワーカーアイソレートの生成に失敗した場合は、前のステップで作成された受信ポートを閉じます。Isolate.spawn
に渡されるメソッドについては、後のステップで説明します。 - 次に、
connection.future
を待機し、それが返すレコードから送信ポートと受信ポートを分解します。 - 最後に、プライベートコンストラクターを呼び出して、そのコンプリターからのポートを渡すことで、
Worker
のインスタンスを返します。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// Spawn the isolate.
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
この例では(前の例と比較して)、Worker.spawn
がこのクラスの非同期静的コンストラクターとして機能し、Worker
のインスタンスを作成する唯一の方法です。これによりAPIが簡素化され、Worker
のインスタンスを作成するコードがクリーンになります。
ステップ4:アイソレートの設定プロセスの完了
#このステップでは、基本的なIsolateの設定プロセスを完了します。これは前の例とほぼ完全に一致し、新しい概念はありません。コードがより多くのメソッドに分割されているというわずかな変更点があり、これはこの例を通してより多くの機能を追加するための設計プラクティスです。Isolateの設定の基本的なプロセスの詳細な手順については、基本ポートの例を参照してください。
最初に、Worker.spawn
メソッドから返されるプライベートコンストラクタを作成します。コンストラクタ本体に、メインIsolateで使用される受信ポートへのリスナーを追加し、まだ定義されていない_handleResponsesFromIsolate
というメソッドをそのリスナーに渡します。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
// ···
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
次に、ワーカーIsolate上のポートを初期化する役割を担う_startRemoteIsolate
にコードを追加します。思い出してください。このメソッドはWorker.spawn
メソッドでIsolate.spawn
に渡され、メインIsolateのSendPort
を引数として渡されます。
- 新しい
ReceivePort
を作成します。 - そのポートの
SendPort
をメインIsolateに送り返します。 _handleCommandsToIsolate
という新しいメソッドを呼び出し、新しいReceivePort
とメインIsolateからのSendPort
の両方を引数として渡します。
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
次に、メインIsolateからのメッセージを受信し、ワーカーIsolateでJSONをデコードし、デコードされたJSONをレスポンスとして送信する役割を担う_handleCommandsToIsolate
メソッドを追加します。
- 最初に、ワーカーIsolateの
ReceivePort
にリスナーを宣言します。 - リスナーに追加されたコールバック内では、
try
/catch
ブロック内で、メインIsolateから渡されたJSONのデコードを試みます。デコードが成功した場合は、デコードされたJSONをメインIsolateに送り返します。 - エラーが発生した場合は、
RemoteError
を送り返します。
static void _handleCommandsToIsolate(
ReceivePort receivePort, SendPort sendPort) {
receivePort.listen((message) {
try {
final jsonData = jsonDecode(message as String);
sendPort.send(jsonData);
} catch (e) {
sendPort.send(RemoteError(e.toString(), ''));
}
});
}
次に、_handleResponsesFromIsolate
メソッドのコードを追加します。
- 最初に、メッセージが
RemoteError
かどうかを確認します。その場合は、そのエラーをthrow
する必要があります。 - それ以外の場合は、メッセージを出力します。今後のステップでは、メッセージを出力するのではなく、メッセージを返すようにこのコードを更新します。
void _handleResponsesFromIsolate(dynamic message) {
if (message is RemoteError) {
throw message;
} else {
print(message);
}
}
最後に、外部コードがデコードするためにワーカーIsolateにJSONを送信できるようにする公開メソッドであるparseJson
メソッドを追加します。
Future<Object?> parseJson(String message) async {
_commands.send(message);
}
このメソッドは次のステップで更新します。
ステップ5:同時に複数のメッセージを処理する
#現在、ワーカーIsolateにメッセージを高速に送信すると、IsolateはデコードされたJSONレスポンスを完了した順序で送信し、送信された順序ではありません。どのレスポンスがどのメッセージに対応するのかを判断する方法はありません。
このステップでは、各メッセージにIDを割り当て、Completer
オブジェクトを使用して、外部コードがparseJson
を呼び出す際に、その呼び出し元に返されるレスポンスが正しいレスポンスになるようにすることで、この問題を解決します。
最初に、Worker
に2つのクラスレベルのプロパティを追加します。
Map<int, Completer<Object?>> _activeRequests
int _idCounter
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
_activeRequests
マップは、ワーカーIsolateに送信されたメッセージとCompleter
を関連付けます。_activeRequests
で使用されるキーは_idCounter
から取得され、メッセージの送信が増えるにつれて増加します。
次に、ワーカーIsolateにメッセージを送信する前にCompleterを作成するようにparseJson
メソッドを更新します。
- 最初に
Completer
を作成します。 - 次に、
_idCounter
を増分して、各Completer
を一意の数に関連付けます。 - キーが現在の
_idCounter
の数で、値がCompleterである_activeRequests
マップにエントリを追加します。 - IDと一緒にメッセージをワーカーIsolateに送信します。
SendPort
を通して1つの値しか送信できないため、IDとメッセージをレコードにラップします。 - 最後に、最終的にワーカーIsolateからのレスポンスを含むCompleterのFutureを返します。
Future<Object?> parseJson(String message) async {
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
また、このシステムを処理するために_handleResponsesFromIsolate
と_handleCommandsToIsolate
を更新する必要があります。
_handleCommandsToIsolate
では、message
がJSONテキストだけでなく、2つの値を持つレコードであることを考慮する必要があります。message
から値をデストラクチャリングすることでこれを行います。
次に、JSONをデコードした後、再びレコードを使用して、IDとデコードされたJSONの両方をメインIsolateに送り返すsendPort.send
への呼び出しを更新します。
static void _handleCommandsToIsolate(
ReceivePort receivePort, SendPort sendPort) {
receivePort.listen((message) {
final (int id, String jsonText) = message as (int, String); // New
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData)); // Updated
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
最後に、_handleResponsesFromIsolate
を更新します。
- 最初に、メッセージ引数からIDとレスポンスを再びデストラクチャリングします。
- 次に、このリクエストに対応するCompleterを
_activeRequests
マップから削除します。 - 最後に、エラーをスローしたり、デコードされたJSONを出力する代わりに、レスポンスを渡してCompleterを完了します。これが完了すると、レスポンスはメインIsolateで
parseJson
を呼び出したコードに返されます。
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?); // New
final completer = _activeRequests.remove(id)!; // New
if (response is RemoteError) {
completer.completeError(response); // Updated
} else {
completer.complete(response); // Updated
}
}
ステップ6:ポートを閉じる機能を追加する
#Isolateがコードによって使用されなくなった場合は、メインIsolateとワーカーIsolateのポートを閉じる必要があります。
- 最初に、ポートが閉じているかどうかを追跡するクラスレベルのブール値を追加します。
- 次に、
Worker.close
メソッドを追加します。このメソッド内では_closed
をtrueに更新します。- ワーカーIsolateに最終的なメッセージを送信します。このメッセージは「shutdown」という
String
ですが、任意のオブジェクトにすることができます。次のコードスニペットで使用します。
- 最後に、
_activeRequests
が空かどうかを確認します。空の場合は、_responses
という名前のメインIsolateのReceivePort
を閉じます。
class Worker {
bool _closed = false;
// ···
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
- 次に、ワーカーIsolateで「shutdown」メッセージを処理する必要があります。
_handleCommandsToIsolate
メソッドに次のコードを追加します。このコードは、メッセージが「shutdown」というString
かどうかをチェックします。その場合は、ワーカーIsolateのReceivePort
を閉じ、返ります。
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
// New if-block.
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
- 最後に、メッセージの送信を試みる前にポートが閉じているかどうかをチェックするコードを追加する必要があります。
Worker.parseJson
メソッドに1行を追加します。
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed'); // New
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
完全な例
#完全な例を見るにはここを展開してください
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = await Worker.spawn();
print(await worker.parseJson('{"key":"value"}'));
print(await worker.parseJson('"banana"'));
print(await worker.parseJson('[true, false, null, 1, "string"]'));
print(
await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
worker.close();
}
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// Spawn the isolate.
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id)!;
if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}
if (_closed && _activeRequests.isEmpty) _responses.close();
}
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
特に明記されていない限り、このサイトのドキュメントはDart 3.5.3を反映しています。ページは2024年8月24日に最後に更新されました。 ソースを表示 または 問題を報告する。