目次

Isolate

このページでは、`Isolate` APIを使用してIsolateを実装するいくつかの例について説明します。

アプリケーションが、他の計算を一時的にブロックするほど大きな計算を処理している場合は、常にIsolateを使用する必要があります。最も一般的な例はFlutterアプリケーションで、UIが応答しなくなる可能性のある大規模な計算を実行する必要がある場合です。

Isolateを *必ず* 使用しなければならないルールはありませんが、Isolateが役立つ状況をいくつか紹介します。

  • 非常に大きなJSON BLOBの解析とデコード。
  • 写真、音声、ビデオの処理と圧縮。
  • 音声ファイルとビデオファイルの変換。
  • 大規模なリスト内またはファイルシステム内での複雑な検索とフィルタリングの実行。
  • データベースとの通信などのI/Oの実行。
  • 大量のネットワークリクエストの処理。

シンプルなワーカーIsolateの実装

#

これらの例では、シンプルなワーカーIsolateを生成するメインIsolateを実装します。`Isolate.run()`は、ワーカーIsolateの設定と管理の手順を簡素化します。

  1. Isolateを生成(開始と作成)します。
  2. 生成されたIsolateで関数を実行します。
  3. 結果を取得します。
  4. 結果をメインIsolateに返します。
  5. 作業が完了したらIsolateを終了します。
  6. 例外とエラーをチェック、取得し、メインIsolateに送り返します。

新しいIsolateで既存のメソッドを実行する

#
  1. `run()`を呼び出して新しいIsolate(バックグラウンドワーカー)を生成します。これは、`main()`が結果を待っている間にメインIsolateで直接行われます。
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 実行させたい関数を、最初の引数としてワーカーIsolateに渡します。この例では、既存の関数`_readAndParseJson()`です。
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. `Isolate.run()`は`_readAndParseJson()`が返す結果を受け取り、その値をメインIsolateに送り返し、ワーカーIsolateをシャットダウンします。

  2. ワーカーIsolateは、結果を保持しているメモリをメインIsolateに *転送* します。データを *コピー* しません。ワーカーIsolateは、オブジェクトが転送を許可されていることを確認するために検証パスを実行します。

`_readAndParseJson()`は、メインIsolateで直接実行することもできる既存の非同期関数です。代わりに`Isolate.run()`を使用して実行することで、並行処理が可能になります。ワーカーIsolateは`_readAndParseJson()`の計算を完全に抽象化します。メインIsolateをブロックせずに完了できます。

`Isolate.run()`の結果は常にFutureです。なぜなら、メインIsolateのコードは実行を継続するためです。ワーカーIsolateが実行する計算が同期か非同期かは、メインIsolateには影響しません。どちらの場合も、並行して実行されるためです。

完全なプログラムについては、send_and_receive.dartのサンプルをご覧ください。

Isolateでクロージャを送信する

#

メインIsolateで関数リテラルまたはクロージャを使用して、`run()`でシンプルなワーカーIsolateを作成することもできます。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

この例は、前の例と同じことを行います。新しいIsolateが生成され、何かを計算し、結果を送り返します。

ただし、ここでIsolateはクロージャを送信します。クロージャは、その機能とコードへの書き込み方法の両方において、一般的な名前付き関数よりも制限が少ないです。この例では、`Isolate.run()`は、並行してローカルコードのように見えるものを実行します。その意味で、`run()`は「並列で実行する」ための制御フロー演算子のように動作すると考えることができます。

ポートを使用してIsolate間で複数のメッセージを送受信する

#

短命のIsolateは使い勝手が良いですが、新しいIsolateを生成し、オブジェクトをIsolate間でコピーするためにパフォーマンスのオーバーヘッドが必要です。コードが`Isolate.run`を使用して同じ計算を繰り返し実行することに依存している場合、すぐに終了しない長命のIsolateを作成することでパフォーマンスを向上させることができます。

そのためには、`Isolate.run`が抽象化しているいくつかの低レベルIsolate APIを使用できます。

このセクションでは、新しく生成されたIsolateとメインIsolate間の双方向通信を確立するために必要な手順について説明します。最初の例である基本的なポートでは、このプロセスを高いレベルで紹介します。2番目の例である堅牢なポートでは、現実世界の機能を徐々に追加していきます。

`ReceivePort`と`SendPort`

#

Isolate間の長命な通信を設定するには、`Isolate`に加えて、`ReceivePort`と`SendPort`の2つのクラスが必要です。これらのポートは、Isolateがお互いに通信できる唯一の方法です。

`ReceivePort`は、他のIsolateから送信されたメッセージを処理するオブジェクトです。これらのメッセージは`SendPort`を介して送信されます。

ポートは`Stream`オブジェクトと同様に動作します(実際、受信ポートは`Stream`を実装しています!)。`SendPort`と`ReceivePort`を、それぞれStreamの`StreamController`とリスナーのように考えることができます。`SendPort`は`StreamController`のようなものです。なぜなら、`SendPort.send()`メソッドを使用してメッセージを追加し、これらのメッセージはリスナー(この場合は`ReceivePort`)によって処理されるためです。`ReceivePort`は、提供したコールバックに引数として渡すことで、受信したメッセージを処理します。

ポートの設定

#

新しく生成されたIsolateは、`Isolate.spawn`呼び出しを介して受信した情報しか持ちません。メインIsolateが生成されたIsolateの初期作成後も通信を続ける必要がある場合は、生成されたIsolateがメインIsolateにメッセージを送信できる通信チャネルを設定する必要があります。Isolateはメッセージパッシングによってのみ通信できます。お互いのメモリの中を見ることができません。これが「Isolate」という名前の由来です。

この双方向通信を設定するには、まずメインIsolateで`ReceivePort`を作成し、`Isolate.spawn`で新しいIsolateを生成するときに、その`SendPort`を引数として渡します。新しいIsolateは独自の`ReceivePort`を作成し、メインIsolateから渡された`SendPort`で *その* `SendPort`を返送します。メインIsolateはこの`SendPort`を受信し、これで両側がメッセージの送受信を行うためのオープンチャネルを持つことになります。

A figure showing events being fed, one by one, into the event loop

  1. メインアイソレートでReceivePortを作成します。SendPortReceivePortのプロパティとして自動的に作成されます。
  2. Isolate.spawn()を使用してワーカーアイソレートを生成します。
  3. ReceivePort.sendPortへの参照を、ワーカーアイソレートへの最初のメッセージとして渡します。
  4. ワーカーアイソレートで別の新しいReceivePortを作成します。
  5. ワーカーアイソレートのReceivePort.sendPortへの参照を、メインアイソレートに最初のメッセージとして返送します。

ポートの作成と通信の設定に加えて、各ReceivePortがメッセージを受信したときに何をするかを指定する必要があります。これは、各ReceivePortlistenメソッドを使用して行います。

A figure showing events being fed, one by one, into the event loop

  1. メインアイソレートのワーカーアイソレートのSendPortへの参照を介してメッセージを送信します。
  2. ワーカーアイソレートのReceivePortのリスナーを介してメッセージを受信し、処理します。これは、メインアイソレートから移動したい計算が実行される場所です。
  3. ワーカーアイソレートのメインアイソレートのSendPortへの参照を介して返信を送信します。
  4. メインアイソレートのReceivePortのリスナーを介してメッセージを受信します。

基本的なポートの例

#

この例では、メインアイソレートとの間で双方向通信を行う長寿命のワーカーアイソレートを設定する方法を示しています。コードは、JSONテキストを新しいアイソレートに送信する例を使用しており、JSONは解析およびデコードされた後、メインアイソレートに送り返されます。

ステップ1:ワーカークラスの定義

#

まず、バックグラウンドワーカーアイソレートのクラスを作成します。このクラスには、必要なすべての機能が含まれています。

  • アイソレートを生成する。
  • そのアイソレートにメッセージを送信する。
  • アイソレートにJSONをデコードさせる。
  • デコードされたJSONをメインアイソレートに送り返す。

このクラスは、ワーカーアイソレートを生成するメソッドと、そのワーカーアイソレートにメッセージを送信するメソッドという2つのパブリックメソッドを公開します。

この例の後続のセクションでは、クラスメソッドを1つずつどのように埋めていくかを示します。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

ステップ2:ワーカーアイソレートの生成

#

Worker.spawnメソッドは、ワーカーアイソレートの作成と、メッセージの送受信を確実にできるようにするコードをグループ化する場所です。

  • まず、ReceivePortを作成します。これにより、メインアイソレートは、新しく生成されたワーカーアイソレートから送信されたメッセージを受信できます。
  • 次に、ワーカーアイソレートが送り返すメッセージを処理するために、受信ポートにリスナーを追加します。リスナーに渡されるコールバック_handleResponsesFromIsolateについては、ステップ4で説明します。
  • 最後に、Isolate.spawnを使用してワーカーアイソレートを生成します。これは、ワーカーアイソレートで実行される関数(ステップ3で説明)、および受信ポートのsendPortプロパティという2つの引数を受け取ります。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort引数は、ワーカーアイソレートで呼び出されるときに、コールバック(_startRemoteIsolate)に引数として渡されます。これは、ワーカーアイソレートがメインアイソレートにメッセージを送り返す方法を確保する最初のステップです。

ステップ3:ワーカーアイソレートでのコードの実行

#

このステップでは、ワーカーアイソレートに送信されて生成時に実行される_startRemoteIsolateメソッドを定義します。このメソッドは、ワーカーアイソレートの「メイン」メソッドのようなものです。

  • まず、別の新しいReceivePortを作成します。このポートは、メインアイソレートからの将来のメッセージを受信します。
  • 次に、そのポートのSendPortをメインアイソレートに送り返します。
  • 最後に、新しいReceivePortにリスナーを追加します。このリスナーは、メインアイソレートがワーカーアイソレートに送信するメッセージを処理します。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

ワーカーのReceivePortのリスクナーは、メインアイソレートから渡されたJSONをデコードしてから、デコードされたJSONをメインアイソレートに送り返します。

このリスナーは、メインアイソレートからワーカーアイソレートに送信されるメッセージのエントリポイントです。**これは、将来ワーカーアイソレートに実行させるコードを伝える唯一の機会です。**

ステップ4:メインアイソレートでのメッセージの処理

#

最後に、メインアイソレートがワーカーアイソレートからメインアイソレートに送り返されたメッセージをどのように処理するかを指定する必要があります。そのためには、_handleResponsesFromIsolateメソッドを埋める必要があります。ステップ2で説明したように、このメソッドはreceivePort.listenメソッドに渡されます。

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

また、ステップ3SendPortをメインアイソレートに送り返したことを思い出してください。このメソッドは、そのSendPortの受信と、将来のメッセージ(デコードされたJSONになります)の処理を処理します。

  • まず、メッセージがSendPortかどうかを確認します。もしそうであれば、そのポートをクラスの_sendPortプロパティに割り当てて、後でメッセージを送信できるようにします。
  • 次に、メッセージがMap<String, dynamic>型(デコードされたJSONの予想される型)かどうかを確認します。もしそうであれば、アプリケーション固有のロジックでそのメッセージを処理します。この例では、メッセージが出力されます。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

ステップ5:アイソレートが設定されるまでを保証するためのコンプリターの追加

#

クラスを完成させるために、ワーカーアイソレートにメッセージを送信する役割を担うparseJsonというパブリックメソッドを定義します。また、アイソレートが完全に設定される前にメッセージを送信できることを保証する必要があります。これを処理するには、Completerを使用します。

  • まず、Completerと呼ばれるクラスレベルのプロパティを追加し、_isolateReadyという名前を付けます。
  • 次に、ステップ4で作成した_handleResponsesFromIsolateメソッドで、メッセージがSendPortの場合にコンプリターでcomplete()を呼び出します。
  • 最後に、parseJsonメソッドで、_sendPort.sendを追加する前にawait _isolateReady.futureを追加します。これにより、ワーカーアイソレートが生成され、そのSendPortをメインアイソレートに送り返すまで、ワーカーアイソレートにメッセージを送信できなくなります。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完全な例

#
完全な例を見るには展開してください
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

堅牢なポートの例

#

前の例では、双方向通信を行う長寿命アイソレートを設定するために必要な基本的な構成要素について説明しました。前述のように、その例には、エラー処理、使用しなくなったポートを閉じる機能、および状況によってはメッセージの順序に関する不整合など、いくつかの重要な機能が欠けています。

この例は、これらの追加機能などを備えた長寿命のワーカーアイソレートを作成し、より良い設計パターンに従うことで、最初の例の情報に付け加えています。このコードは最初の例と類似点がありますが、その例の拡張ではありません。

ステップ1:ワーカークラスの定義

#

まず、バックグラウンドワーカーアイソレートのクラスを作成します。このクラスには、必要なすべての機能が含まれています。

  • アイソレートを生成する。
  • そのアイソレートにメッセージを送信する。
  • アイソレートにJSONをデコードさせる。
  • デコードされたJSONをメインアイソレートに送り返す。

このクラスは、ワーカーアイソレートを作成するメソッド、そのワーカーアイソレートにメッセージを送信するメソッド、使用しなくなったポートをシャットダウンできるメソッドという3つのパブリックメソッドを公開します。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

ステップ2:Worker.spawnメソッドでRawReceivePortを作成する

#

アイソレートを生成する前に、より低レベルのReceivePortであるRawReceivePortを作成する必要があります。RawReceivePortを使用することは推奨されるパターンであり、アイソレートの起動ロジックと、アイソレートでのメッセージパッシングを処理するロジックを分離できます。

Worker.spawnメソッド内では

  • まず、RawReceivePortを作成します。このReceivePortは、ワーカーアイソレートから送信される最初のメッセージ(SendPortになります)のみを受信する役割を担います。
  • 次に、アイソレートがメッセージを受信できるようになったことを示すCompleterを作成します。これが完了すると、ReceivePortSendPortを含むレコードを返します。
  • 次に、RawReceivePort.handlerプロパティを定義します。このプロパティは、ReceivePort.listenerのように動作するFunction?です。このポートでメッセージが受信されると、関数が呼び出されます。
  • ハンドラー関数内で、connection.complete()を呼び出します。このメソッドは、ReceivePortSendPortを含むレコードを引数として期待します。SendPortは、ワーカーアイソレートから送信される最初のメッセージであり、次のステップで_commandsという名前のクラスレベルのSendPortに割り当てられます。
  • 次に、ReceivePort.fromRawReceivePortコンストラクターを使用して新しいReceivePortを作成し、initPortを渡します。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

最初にRawReceivePortを作成してからReceivePortを作成することで、後でReceivePort.listenに新しいコールバックを追加できます。逆に、ReceivePortをすぐに作成した場合、ReceivePortStreamではなくBroadcastStreamを実装するため、listenerを1つしか追加できません。

効果的に、これにより、アイソレートの起動ロジックと、通信の設定が完了した後にメッセージの受信を処理するロジックを分離できます。この利点は、他のメソッドのロジックが増えるにつれてより明白になります。

ステップ3:Isolate.spawnを使用してワーカーアイソレートを生成する

#

このステップでは、Worker.spawnメソッドの入力を続けます。アイソレートを生成するために必要なコードを追加し、このクラスからWorkerのインスタンスを返します。この例では、Isolate.spawnへの呼び出しはtry/catchブロックでラップされており、アイソレートの起動に失敗した場合、initPortが閉じられ、Workerオブジェクトが作成されなくなります。

  • まず、try/catchブロックでワーカーアイソレートの生成を試みます。ワーカーアイソレートの生成に失敗した場合は、前のステップで作成された受信ポートを閉じます。Isolate.spawnに渡されるメソッドについては、後のステップで説明します。
  • 次に、connection.futureを待機し、それが返すレコードから送信ポートと受信ポートを分解します。
  • 最後に、プライベートコンストラクターを呼び出して、そのコンプリターからのポートを渡すことで、Workerのインスタンスを返します。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

この例では(前の例と比較して)、Worker.spawnがこのクラスの非同期静的コンストラクターとして機能し、Workerのインスタンスを作成する唯一の方法です。これによりAPIが簡素化され、Workerのインスタンスを作成するコードがクリーンになります。

ステップ4:アイソレートの設定プロセスの完了

#

このステップでは、基本的なIsolateの設定プロセスを完了します。これは前の例とほぼ完全に一致し、新しい概念はありません。コードがより多くのメソッドに分割されているというわずかな変更点があり、これはこの例を通してより多くの機能を追加するための設計プラクティスです。Isolateの設定の基本的なプロセスの詳細な手順については、基本ポートの例を参照してください。

最初に、Worker.spawnメソッドから返されるプライベートコンストラクタを作成します。コンストラクタ本体に、メインIsolateで使用される受信ポートへのリスナーを追加し、まだ定義されていない_handleResponsesFromIsolateというメソッドをそのリスナーに渡します。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

次に、ワーカーIsolate上のポートを初期化する役割を担う_startRemoteIsolateにコードを追加します。思い出してください。このメソッドはWorker.spawnメソッドでIsolate.spawnに渡され、メインIsolateのSendPortを引数として渡されます。

  • 新しいReceivePortを作成します。
  • そのポートのSendPortをメインIsolateに送り返します。
  • _handleCommandsToIsolateという新しいメソッドを呼び出し、新しいReceivePortとメインIsolateからのSendPortの両方を引数として渡します。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

次に、メインIsolateからのメッセージを受信し、ワーカーIsolateでJSONをデコードし、デコードされたJSONをレスポンスとして送信する役割を担う_handleCommandsToIsolateメソッドを追加します。

  • 最初に、ワーカーIsolateのReceivePortにリスナーを宣言します。
  • リスナーに追加されたコールバック内では、try/catchブロック内で、メインIsolateから渡されたJSONのデコードを試みます。デコードが成功した場合は、デコードされたJSONをメインIsolateに送り返します。
  • エラーが発生した場合は、RemoteErrorを送り返します。
dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

次に、_handleResponsesFromIsolateメソッドのコードを追加します。

  • 最初に、メッセージがRemoteErrorかどうかを確認します。その場合は、そのエラーをthrowする必要があります。
  • それ以外の場合は、メッセージを出力します。今後のステップでは、メッセージを出力するのではなく、メッセージを返すようにこのコードを更新します。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最後に、外部コードがデコードするためにワーカーIsolateにJSONを送信できるようにする公開メソッドであるparseJsonメソッドを追加します。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

このメソッドは次のステップで更新します。

ステップ5:同時に複数のメッセージを処理する

#

現在、ワーカーIsolateにメッセージを高速に送信すると、IsolateはデコードされたJSONレスポンスを完了した順序で送信し、送信された順序ではありません。どのレスポンスがどのメッセージに対応するのかを判断する方法はありません。

このステップでは、各メッセージにIDを割り当て、Completerオブジェクトを使用して、外部コードがparseJsonを呼び出す際に、その呼び出し元に返されるレスポンスが正しいレスポンスになるようにすることで、この問題を解決します。

最初に、Workerに2つのクラスレベルのプロパティを追加します。

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

_activeRequestsマップは、ワーカーIsolateに送信されたメッセージとCompleterを関連付けます。_activeRequestsで使用されるキーは_idCounterから取得され、メッセージの送信が増えるにつれて増加します。

次に、ワーカーIsolateにメッセージを送信する前にCompleterを作成するようにparseJsonメソッドを更新します。

  • 最初にCompleterを作成します。
  • 次に、_idCounterを増分して、各Completerを一意の数に関連付けます。
  • キーが現在の_idCounterの数で、値がCompleterである_activeRequestsマップにエントリを追加します。
  • IDと一緒にメッセージをワーカーIsolateに送信します。SendPortを通して1つの値しか送信できないため、IDとメッセージをレコードにラップします。
  • 最後に、最終的にワーカーIsolateからのレスポンスを含むCompleterのFutureを返します。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

また、このシステムを処理するために_handleResponsesFromIsolate_handleCommandsToIsolateを更新する必要があります。

_handleCommandsToIsolateでは、messageがJSONテキストだけでなく、2つの値を持つレコードであることを考慮する必要があります。messageから値をデストラクチャリングすることでこれを行います。

次に、JSONをデコードした後、再びレコードを使用して、IDとデコードされたJSONの両方をメインIsolateに送り返すsendPort.sendへの呼び出しを更新します。

dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最後に、_handleResponsesFromIsolateを更新します。

  • 最初に、メッセージ引数からIDとレスポンスを再びデストラクチャリングします。
  • 次に、このリクエストに対応するCompleterを_activeRequestsマップから削除します。
  • 最後に、エラーをスローしたり、デコードされたJSONを出力する代わりに、レスポンスを渡してCompleterを完了します。これが完了すると、レスポンスはメインIsolateでparseJsonを呼び出したコードに返されます。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

ステップ6:ポートを閉じる機能を追加する

#

Isolateがコードによって使用されなくなった場合は、メインIsolateとワーカーIsolateのポートを閉じる必要があります。

  • 最初に、ポートが閉じているかどうかを追跡するクラスレベルのブール値を追加します。
  • 次に、Worker.closeメソッドを追加します。このメソッド内では
    • _closedをtrueに更新します。
    • ワーカーIsolateに最終的なメッセージを送信します。このメッセージは「shutdown」というStringですが、任意のオブジェクトにすることができます。次のコードスニペットで使用します。
  • 最後に、_activeRequestsが空かどうかを確認します。空の場合は、_responsesという名前のメインIsolateのReceivePortを閉じます。
dart
class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
  • 次に、ワーカーIsolateで「shutdown」メッセージを処理する必要があります。_handleCommandsToIsolateメソッドに次のコードを追加します。このコードは、メッセージが「shutdown」というStringかどうかをチェックします。その場合は、ワーカーIsolateのReceivePortを閉じ、返ります。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最後に、メッセージの送信を試みる前にポートが閉じているかどうかをチェックするコードを追加する必要があります。Worker.parseJsonメソッドに1行を追加します。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完全な例

#
完全な例を見るにはここを展開してください
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}