メインコンテンツにスキップ

Isolate

このページでは、Isolate API を使用してアイソレートを実装するいくつかの例について説明します。

アプリケーションが他の処理を一時的にブロックするほど大きな計算を処理している場合は、常にアイソレートを使用する必要があります。最も一般的な例は、Flutter アプリケーションで、UI が応答しなくなる可能性のある大きな計算を実行する必要がある場合です。

アイソレートを必須で使用するルールはありませんが、それらが役立つ可能性のある状況がいくつかあります。

  • 非常に大きな JSONBlob の解析とデコード。
  • 写真、音声、ビデオの処理と圧縮。
  • 音声ファイルとビデオファイルの変換。
  • 大きなリストまたはファイルシステム内での複雑な検索とフィルタリングの実行。
  • データベースとの通信など、I/O の実行。
  • 大量のネットワークリクエストの処理。

シンプルなワーカーアイソレートの実装

#

これらの例では、メインアイソレートがシンプルなワーカーアイソレートを生成します。Isolate.run() は、ワーカーアイソレートの設定と管理の背後にある手順を簡略化します。

  1. アイソレートを生成(開始および作成)します。
  2. 生成されたアイソレートで関数を実行します。
  3. 結果をキャプチャします。
  4. 結果をメインアイソレートに返します。
  5. 作業が完了したらアイソレートを終了します。
  6. 例外とエラーをチェック、キャプチャし、メインアイソレートにスローバックします。

既存のメソッドを新しいアイソレートで実行する

#
  1. run() を呼び出して、新しいアイソレート(バックグラウンドワーカー)を生成します。これはメインアイソレートで直接実行され、main() は結果を待ちます。
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. ワーカーアイソレートに実行させたい関数を最初の引数として渡します。この例では、既存の関数 _readAndParseJson() です。
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run()_readAndParseJson() が返す結果を取得し、値をメインアイソレートに送信して、ワーカーアイソレートをシャットダウンします。

  2. ワーカーアイソレートは、結果を保持するメモリをメインアイソレートに*転送*します。データを*コピー*しません。ワーカーアイソレートは、オブジェクトが転送可能であることを確認するための検証パスを実行します。

_readAndParseJson() は既存の非同期関数であり、メインアイソレートで直接実行することも容易です。代わりに Isolate.run() を使用して実行すると、並列処理が可能になります。ワーカーアイソレートは _readAndParseJson() の計算を完全に抽象化します。メインアイソレートをブロックせずに完了できます。

Isolate.run() の結果は常に Future です。これは、メインアイソレートのコードが実行を続けるためです。ワーカーアイソレートが実行する計算が同期か非同期かは、どちらにしても並列で実行されるため、メインアイソレートに影響しません。

完全なプログラムについては、send_and_receive.dart サンプルを参照してください。

クロージャをアイソレートで送信する

#

メインアイソレートで関数リテラルまたはクロージャを使用して run() でシンプルなワーカーアイソレートを作成することもできます。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

この例は、前の例と同じことを達成します。新しいアイソレートが生成され、何かが計算され、結果が返されます。

しかし、今度はアイソレートがクロージャを送信します。クロージャは、通常の名前付き関数よりも機能とコードへの記述方法の両方で制限が少なくなります。この例では、Isolate.run() がローカルコードのように見えるものを並列で実行します。その意味で、run() を「並列で実行する」ための制御フロー演算子のように考えることができます。

ポートを使用してアイソレート間で複数のメッセージを送受信する

#

短命のアイソレートは便利ですが、新しいアイソレートを生成したり、アイソレート間でオブジェクトをコピーしたりするにはパフォーマンスのオーバーヘッドが必要です。コードが Isolate.run を使用して同じ計算を繰り返し実行することに依存している場合は、代わりにすぐに終了しない長命のアイソレートを作成することでパフォーマンスが向上する可能性があります。

これを行うには、Isolate.run が抽象化する低レベルのアイソレート API のいくつかを使用できます。

このセクションでは、新しく生成されたアイソレートとメインアイソレート間の双方向通信を確立するために必要な手順について説明します。最初の例、基本的なポートは、プロセスをハイレベルで紹介します。2 番目の例、堅牢なポートは、徐々に最初の例に実践的で現実的な機能を追加していきます。

ReceivePortSendPort

#

アイソレート間の長命の通信を設定するには、2 つのクラス(Isolate に加えて)が必要です。ReceivePortSendPort です。これらのポートは、アイソレートが互いに通信できる唯一の方法です。

ReceivePort は、他のアイソレートから送信されたメッセージを処理するオブジェクトです。これらのメッセージは SendPort を介して送信されます。

ポートはStream オブジェクトと似ています(実際、受信ポートは Stream を実装しています!)。SendPortReceivePort を、それぞれ Stream の StreamController とリスナーのように考えることができます。SendPortStreamController に似ています。なぜなら、SendPort.send() メソッドを使用してメッセージを「追加」し、それらのメッセージはリスナー、この場合は ReceivePort によって処理されるためです。ReceivePort は、受信したメッセージを指定したコールバックに引数として渡すことで処理します。

ポートの設定

#

新しく生成されたアイソレートは、Isolate.spawn 呼び出しを通じて受信した情報のみを持っています。メインアイソレートが生成されたアイソレートとの通信を継続する必要がある場合は、生成されたアイソレートがメインアイソレートにメッセージを送信できる通信チャネルを設定する必要があります。アイソレートはメッセージパッシングを介してのみ通信できます。互いのメモリを「見ることが」できないため、「アイソレート」という名前が付けられています。

この双方向通信を設定するには、まずメインアイソレートに ReceivePort を作成し、その SendPortIsolate.spawn で生成する際に新しいアイソレートに引数として渡します。新しいアイソレートは、独自の ReceivePort を作成し、メインアイソレートから渡された SendPort に*その* SendPort を返します。メインアイソレートはこの SendPort を受信し、これで両方の側で送受信メッセージを開いたチャネルを持つことができます。

A figure showing events being fed, one by one, into the event loop

  1. メインアイソレートに ReceivePort を作成します。SendPortReceivePort のプロパティとして自動的に作成されます。
  2. Isolate.spawn() を使用してワーカーアイソレートを生成します。
  3. ReceivePort.sendPort への参照をワーカーアイソレートへの最初のメッセージとして渡します。
  4. ワーカーアイソレートに別の新しい ReceivePort を作成します。
  5. ワーカーアイソレートの ReceivePort.sendPort への参照を、メインアイソレートへの最初のメッセージとして*返します*。

ポートの作成と通信の設定に加えて、メッセージを受信したときにポートが何をするかを指定する必要があります。これは、各対応する ReceivePortlisten メソッドを使用して行います。

A figure showing events being fed, one by one, into the event loop

  1. ワーカーアイソレートの SendPort へのメインアイソレートの参照を介してメッセージを送信します。
  2. ワーカーアイソレートの ReceivePort のリスナーを介してメッセージを受信して処理します。これは、メインアイソレートから移動したい計算が実行される場所です。
  3. メインアイソレートへの参照を使用するワーカーアイソレートの SendPort を介して戻りメッセージを送信します。
  4. メインアイソレートの ReceivePort のリスナーを介してメッセージを受信します。

基本的なポートの例

#

この例は、メインアイソレートと双方向通信を備えた長命のワーカーアイソレートをセットアップする方法を示しています。コードは、JSON テキストを新しいアイソレートに送信し、そこで JSON が解析およびデコードされてからメインアイソレートに返されるという例を使用します。

ステップ 1: ワーカークラスの定義

#

まず、バックグラウンドワーカーアイソレート用のクラスを作成します。このクラスには、必要なすべての機能が含まれています。

  • アイソレートを生成します。
  • そのアイソレートにメッセージを送信します。
  • アイソレートに JSON をデコードさせます。
  • デコードされた JSON をメインアイソレートに返します。

クラスは 2 つの公開メソッドを公開します。1 つはワーカーアイソレートを生成するメソッド、もう 1 つはワーカーアイソレートへのメッセージ送信を処理するメソッドです。

この例の残りのセクションでは、クラスメソッドを 1 つずつ入力する方法を示します。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

ステップ 2: ワーカーアイソレートの生成

#

Worker.spawn メソッドは、ワーカーアイソレートを作成し、メッセージを受信および送信できるようにするためのコードをグループ化する場所です。

  • まず、ReceivePort を作成します。これにより、メインアイソレートは新しく生成されたワーカーアイソレートから送信されたメッセージを受信できます。
  • 次に、受信ポートにリスナーを追加して、ワーカーアイソレートが返信するメッセージを処理します。リスナーに渡されるコールバック _handleResponsesFromIsolate は、ステップ 4 で説明します。
  • 最後に、Isolate.spawn を使用してワーカーアイソレートを生成します。これは 2 つの引数を受け取ります。ワーカーアイソレートで実行する関数(ステップ 3 で説明)と、受信ポートの sendPort プロパティです。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort 引数は、ワーカーアイソレートで呼び出されるコールバック(_startRemoteIsolate)に引数として渡されます。これは、ワーカーアイソレートがメインアイソレートにメッセージを送信する方法を持っていることを保証するための最初のステップです。

ステップ 3: ワーカーアイソレートでのコードの実行

#

このステップでは、ワーカーアイソレートに送信され、生成時に実行されるメソッド _startRemoteIsolate を定義します。このメソッドは、ワーカーアイソレートの「main」メソッドのようなものです。

  • まず、別の新しい ReceivePort を作成します。このポートは、メインアイソレートからの将来のメッセージを受信します。
  • 次に、そのポートの SendPort をメインアイソレートに返します。
  • 最後に、新しい ReceivePort にリスナーを追加します。このリスナーは、メインアイソレートがワーカーアイソレートに送信するメッセージを処理します。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

ワーカーの ReceivePort のリスナーは、メインアイソレートから渡された JSON をデコードし、デコードされた JSON をメインアイソレートに返します。

このリスナーは、メインアイソレートからワーカーアイソレートに送信されるメッセージのエントリポイントです。これは、ワーカーアイソレートに将来実行するコードを指示する唯一の機会です。

ステップ 4: メインアイソレートでのメッセージの処理

#

最後に、メインアイソレートがワーカーアイソレートからメインアイソレートに送信されたメッセージをどのように処理するかを指示する必要があります。これを行うには、_handleResponsesFromIsolate メソッドに入力する必要があります。ステップ 2 で説明したように、このメソッドが receivePort.listen メソッドに渡されたことを思い出してください。

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

また、ステップ 3SendPort がメインアイソレートに返されたことを思い出してください。このメソッドは、その SendPort の受信を処理し、将来のメッセージ(デコードされた JSON)を処理します。

  • まず、メッセージが SendPort であるかどうかを確認します。そうであれば、後でメッセージを送信するために使用できるように、そのポートをクラスの _sendPort プロパティに割り当てます。
  • 次に、メッセージが Map<String, dynamic> 型(デコードされた JSON の期待される型)であるかどうかを確認します。そうであれば、そのメッセージをアプリケーション固有のロジックで処理します。この例では、メッセージは印刷されます。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

ステップ 5: アイソレートのセットアップを保証するためのコンプリーターの追加

#

クラスを完了するには、メッセージをワーカーアイソレートに送信する責任を負う公開メソッド parseJson を定義します。また、アイソレートが完全にセットアップされる前にメッセージを送信できることを確認する必要があります。これを処理するには、Completer を使用します。

  • まず、クラスレベルのプロパティである Completer を追加し、_isolateReady という名前を付けます。
  • 次に、ステップ 4 で作成した _handleResponsesFromIsolate メソッドで、コンプリーターの complete() を呼び出します。メッセージが SendPort の場合は、そのようにします。
  • 最後に、parseJson メソッドで、_sendPort.send を追加する前に await _isolateReady.future を追加します。これにより、アイソレートが生成され*、SendPort をメインアイソレートに返送してから、ワーカーアイソレートにメッセージが送信されることが保証されます。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完全な例

#
展開して完全な例を表示します
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }

}

堅牢なポートの例

#

前の例では、双方向通信を備えた長命のアイソレートを設定するために必要な基本的な構成要素を説明しました。前述のように、この例には、エラー処理、ポートを不要になったときに閉じる機能、一部の状況でのメッセージ順序の不整合など、いくつかの重要な機能が欠けています。

この例は、最初の例の情報を拡張し、これらの追加機能やその他の機能を持つ長命のワーカーアイソレートを作成し、より良い設計パターンに従います。このコードは最初の例と類似していますが、その例の拡張ではありません。

ステップ 1: ワーカークラスの定義

#

まず、バックグラウンドワーカーアイソレート用のクラスを作成します。このクラスには、必要なすべての機能が含まれています。

  • アイソレートを生成します。
  • そのアイソレートにメッセージを送信します。
  • アイソレートに JSON をデコードさせます。
  • デコードされた JSON をメインアイソレートに返します。

クラスは 3 つの公開メソッドを公開します。1 つはワーカーアイソレートを作成するメソッド、1 つはワーカーアイソレートへのメッセージ送信を処理するメソッド、もう 1 つは不要になったときにポートをシャットダウンできるメソッドです。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

ステップ 2: Worker.spawn メソッドで RawReceivePort を作成する

#

アイソレートを生成する前に、より低レベルの ReceivePort である RawReceivePort を作成する必要があります。RawReceivePort を使用することは、アイソレートの開始ロジックと、アイソレートでのメッセージパッシングを処理するロジックを分離できるため、推奨されるパターンです。

Worker.spawn メソッドで

  • まず、RawReceivePort を作成します。この ReceivePort は、ワーカーアイソレートから送信される初期メッセージ(SendPort)を受信することのみを担当します。
  • 次に、アイソレートがメッセージを受信できるようになったことを示す Completer を作成します。これが完了すると、ReceivePortSendPort のレコードが返されます。
  • 次に、RawReceivePort.handler プロパティを定義します。このプロパティは Function? であり、ReceivePort.listener のように動作します。この関数は、このポートでメッセージが受信されたときに呼び出されます。
  • ハンドラー関数内で、connection.complete() を呼び出します。このメソッドは、引数として ReceivePortSendPort を持つレコードを期待します。SendPort はワーカーアイソレートから送信される初期メッセージであり、次のステップでクラスレベルの SendPort _commands に割り当てられます。
  • 次に、ReceivePort.fromRawReceivePort コンストラクタを使用して新しい ReceivePort を作成し、initPort を渡します。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
  }
}

まず RawReceivePort を作成し、次に ReceivePort を作成することで、後で ReceivePort.listen に新しいコールバックを追加できます。逆に、すぐに ReceivePort を作成した場合、ReceivePortStream ではなく BroadcastStream を実装しているため、1 つの listener しか追加できません。

これにより、アイソレートの開始ロジックと、通信設定が完了した後のメッセージ受信を処理するロジックを効果的に分離できます。この利点は、他のメソッドのロジックが増えるにつれてより明らかになります。

ステップ 3: Isolate.spawn を使用してワーカーアイソレートを生成する

#

このステップでは、Worker.spawn メソッドへの入力を続行します。アイソレートを生成するために必要なコードを追加し、このクラスの Worker インスタンスを返します。この例では、Isolate.spawn への呼び出しはtry/catch ブロックでラップされており、アイソレートの起動に失敗した場合、initPort が閉じられ、Worker オブジェクトが作成されないことが保証されます。

  • まず、try/catch ブロックでワーカーアイソレートの生成を試みます。ワーカーアイソレートの生成に失敗した場合は、前のステップで作成した受信ポートを閉じます。Isolate.spawn に渡されるメソッドは、後で説明します。
  • 次に、connection.future を待機し、それが返すレコードから送信ポートと受信ポートを分割します。
  • 最後に、コンプリーターからポートを渡して、そのプライベートコンストラクタを呼び出すことで、Worker のインスタンスを返します。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }
}

この例(前の例と比較して)では、Worker.spawn はこのクラスの非同期静的コンストラクタとして機能し、Worker のインスタンスを作成する唯一の方法です。これにより API が簡略化され、Worker のインスタンスを作成するコードがクリーンになります。

ステップ 4: アイソレートのセットアッププロセスの完了

#

このステップでは、基本的なアイソレートセットアッププロセスを完了します。これは、前の例とほぼ完全に一致しており、新しい概念はありません。コードがより多くのメソッドに分割されているというわずかな変更があります。これは、この例の残りの部分でさらに機能を追加するための設計プラクティスです。アイソレートのセットアップの基本的なプロセスの詳細なウォークスルーについては、基本的なポートの例を参照してください。

まず、Worker.spawn メソッドから返されるプライベートコンストラクタを作成します。コンストラクタ本体で、メインアイソレートで使用される受信ポートにリスナーを追加し、まだ未定義のメソッド _handleResponsesFromIsolate をそのリスナーに渡します。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }
}

次に、_startRemoteIsolate に、ワーカーアイソレートでポートを初期化するコードを追加します。思い出してください。このメソッドは Worker.spawn メソッドで Isolate.spawn に渡され、メインアイソレートの SendPort が引数として渡されます。

  • 新しい ReceivePort を作成します。
  • そのポートの SendPort をメインアイソレートに返します。
  • 新しいメソッド _handleCommandsToIsolate を呼び出し、メインアイソレートからの新しい ReceivePortSendPort の両方を引数として渡します。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

次に、メインアイソレートからメッセージを受信し、ワーカーアイソレートで JSON をデコードし、デコードされた JSON を応答として返す _handleCommandsToIsolate メソッドのコードを追加します。

  • まず、ワーカーアイソレートの ReceivePort にリスナーを宣言します。
  • リスナーに追加されたコールバック内で、メインアイソレートから渡された JSON をtry/catch ブロック内でデコードしようとします。デコードが成功した場合は、デコードされた JSON をメインアイソレートに返します。
  • エラーが発生した場合は、RemoteError を返します。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

次に、_handleResponsesFromIsolate メソッドのコードを追加します。

  • まず、メッセージが RemoteError であるかどうかを確認します。その場合は、そのエラーをthrow します。
  • それ以外の場合は、メッセージを印刷します。後で、このコードを印刷するのではなくメッセージを返すように更新します。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最後に、外部コードが JSON をワーカーアイソレートに送信してデコードできるようにする公開メソッド parseJson を追加します。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

次のステップでこのメソッドを更新します。

ステップ 5: 同時に複数のメッセージを処理する

#

現在、ワーカーアイソレートにメッセージを迅速に送信すると、アイソレートは送信された順序ではなく*完了した順序*でデコードされた JSON 応答を送信します。どの応答がどのメッセージに対応するかを判断する方法がありません。

このステップでは、各メッセージに ID を与え、Completer オブジェクトを使用して、外部コードが parseJson を呼び出したときに、その呼び出し元に返される応答が正しい応答であることを保証することで、この問題を解決します。

まず、Worker に 2 つのクラスレベルプロパティを追加します。

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  // ···
}

_activeRequests マップは、ワーカーアイソレートに送信されたメッセージと Completer を関連付けます。_activeRequests で使用されるキーは _idCounter から取得されます。これは、より多くのメッセージが送信されると増加します。

次に、parseJson メソッドを更新して、メッセージをワーカーアイソレートに送信する前にコンプリーターを作成するようにします。

  • まず、Completer を作成します。
  • 次に、_idCounter をインクリメントして、各 Completer が一意の番号に関連付けられるようにします。
  • _activeRequests マップにエントリを追加します。キーは現在の _idCounter の番号、値はコンプリーターです。
  • ID とメッセージを SendPort を介して 1 つの値しか送信できないため、ID とメッセージをレコードにラップして、ワーカーアイソレートにメッセージを送信します。
  • 最後に、最終的にワーカーアイソレートからの応答を含むコンプリーターの Future を返します。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

このシステムを処理するために、_handleResponsesFromIsolate_handleCommandsToIsolate も更新する必要があります。

_handleCommandsToIsolate では、message が単なる JSON テキストではなく 2 つの値を持つレコードであることを考慮に入れる必要があります。message から値を分割することで、それを行います。

次に、JSON をデコードした後、sendPort.send への呼び出しを更新して、ID とデコードされた JSON の両方を再びレコードを使用してメインアイソレートに返します。

dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最後に、_handleResponsesFromIsolate を更新します。

  • まず、メッセージ引数から ID と応答を再び分割します。
  • 次に、このリクエストに対応するコンプリーターを _activeRequests マップから削除します。
  • 最後に、エラーをスローしたり、デコードされた JSON を印刷したりする代わりに、コンプリーターを完了し、応答を渡します。これが完了すると、応答はメインアイソレートで parseJson を呼び出したコードに返されます。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

ステップ 6: ポートを閉じる機能の追加

#

アイソレートがコードによって使用されなくなった場合は、メインアイソレートとワーカーアイソレートのポートを閉じる必要があります。

  • まず、ポートが閉じられているかどうかを追跡するクラスレベルのブール値を追加します。
  • 次に、Worker.close メソッドを追加します。このメソッド内で
    • _closed を true に更新します。
    • 最後に、ワーカーアイソレートにメッセージを送信します。このメッセージは「shutdown」という文字列ですが、任意のオブジェクトにすることができます。次のコードスニペットで使用します。
  • 最後に、_activeRequests が空であるかどうかを確認します。空の場合は、メインアイソレートの ReceivePort _responses をシャットダウンします。
dart
class Worker {
  bool _closed = false;
  // ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}
  • 次に、ワーカーアイソレートで「shutdown」メッセージを処理する必要があります。_handleCommandsToIsolate メソッドに次のコードを追加します。このコードは、メッセージが「shutdown」という文字列であるかどうかを確認します。そうであれば、ワーカーアイソレートの ReceivePort を閉じ、返します。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最後に、メッセージを送信しようとする前にポートが閉じられているかどうかを確認するコードを追加します。Worker.parseJson メソッドに 1 行追加します。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完全な例

#
展開して完全な例を表示します
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
    await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]),
  );
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}