🙄

mutex.dart(=ミューテックス排他制御)がおもしろい

2024/04/15に公開

2024/06/17 追記

利用編とも呼べる続編も書いたのでそちらも読んでみてください。

https://qiita.com/kitaike/items/d1c753b838af8a59f9f0

きっかけ

それはGeminiを試そうと、以下の公式サンプルで遊んでいたときのことでした。
https://github.com/google/generative-ai-dart/tree/main/samples/flutter_app
使用しているgoogle_generative_aiパッケージの実装であるchat.dartを眺めていると、定義された変数の中に生成AI関連っぽくないやつがいる…なにこれ!?となったというわけです。

final class ChatSession {
  //こういうのは生成AIの関連っぽい
  final Future<GenerateContentResponse> Function(Iterable<Content> content,
      {List<SafetySetting>? safetySettings,
      GenerationConfig? generationConfig}) _generateContent;
  //こういうのも生成AIの関連っぽい
  final Stream<GenerateContentResponse> Function(Iterable<Content> content,
      {List<SafetySetting>? safetySettings,
      GenerationConfig? generationConfig}) _generateContentStream;

  //なにこれ!?
  final _mutex = Mutex();

Mutexとは?

タイトルですでに答えをいっちゃってますが、「(狭義の)排他制御」のことです。
英語に強い人や、バックエンドの経験がある人、またはchat.dartでの使い方を見れば勘のいい人はわかったと思います。

  Future<GenerateContentResponse> sendMessage(Content message) async {
    //非同期APIを実行する前にロックの取得
    final lock = await _mutex.acquire();
    try {
      //APIの実行
      final response = await _generateContent(_history.followedBy([message]),
      //略
      return response;
    } finally {
        //実行後はロックのリリース
      lock.release();
    }
  }

mutex.dartのここがすごい!

まずコード全文は以下のように(コメントを除けば)40行ほどと大変お手頃です

import 'dart:async';
import 'dart:collection' show Queue;

class Mutex {
  final Queue<Completer<Lock>> _pending = Queue();

  Future<Lock> acquire() {
    final completer = Completer<Lock>();
    _pending.add(completer);
    if (_pending.length == 1) {
      completer.complete(Lock._(this));
    }
    return completer.future;
  }

  void _release() {
    assert(_pending.isNotEmpty);
    assert(_pending.first.isCompleted);
    _pending.removeFirst();
    if (_pending.isNotEmpty) {
      _pending.first.complete(Lock._(this));
    }
  }
}

class Lock {
  Mutex? _mutex;
  Lock._(this._mutex);

  void release() {
    final mutex = _mutex;
    if (mutex == null) throw StateError('Already released');
    _mutex = null;
    mutex._release();
  }
}

挙動をざっくり説明すると、

Mutexクラスのacuireメソッドは、実行する度に_pendingというキューに、タスクに割り当てるLock(のCompleter)を追加していきます。
a. タスクがひとつなら即時実行してLockオブジェクトを返します。b.そうでないなら実行せず待ち状態となります。

  Future<Lock> acquire() {
    final completer = Completer<Lock>();
    _pending.add(completer);
    if (_pending.length == 1) {
      completer.complete(Lock._(this));
    }
    return completer.future;
  }

a.に該当した場合、呼び出しもとでは受けとったLockインスタンスに対してタスクの実行後、releaseメソッドを実行します。
releaseメソッドではコンストラクタのmutexの初期化をした上で、Mutexクラスの_releaseメソッドを実行します。

  void release() {
    final mutex = _mutex;
    if (mutex == null) throw StateError('Already released');
    _mutex = null;
    mutex._release();
  }

_releaseメソッドでは_pending先頭の処理が完了したものを排除してから、b.の場合に待ち状態となっていた先頭を即時実行して(次のタスクの)Lockを取得します

  void _release() {
    assert(_pending.isNotEmpty);
    assert(_pending.first.isCompleted);
    _pending.removeFirst();
    if (_pending.isNotEmpty) {
      _pending.first.complete(Lock._(this));
    }
  }

こうすることで例えばchat.dartのsendMessageの場合は、連続して実行されたとしてもチャット順を保存する変数であるhisotryの整合性が保たれるようになっています。

これだけでも十分すごいのですが、個人的には以下の点でも新たな発見がありました。

  1. dart:collectionのQueueクラスを使っている
    • その存在すら知らなかったんですが、FIFO(先入先出)に最適化されたListです
  2. Completerの使い方がわかる
    • Futureだと非同期処理の開始がコントロールしにくいので、コントロールしたいときに使います。非同期処理の実行はcompleteメソッドを使います
  3. プライベートコンストラクタの適切な使用
    • Lock._(this._mutex)のことです。こうすることでLockインスタンスは、Mutexクラスのaquireメソッドからしか生成されないようになってます
  4. 相互に参照のあるクラス定義
    • Mutexクラスは変数宣言からLockクラスを参照してますし、Lockクラスはコンストラクの引数としてMutexクラスを参照してます。つまり1つのクラスとして書けるものを2つのクラスに「あえて」分割することで、Mutexは排他制御(=順番待ち)に関することだけ、Lockはロック状態(=解放)に関することだけをしていて、単一責任の原則に沿った構成が実現できています。
      このように相互に参照のあるクラスは自分では書こうという気すら起こらないので一番の発見でした

余談

隔月でFlutter.Okinawaというイベントを開催しています。
一周年を迎えて順調に減ってきている参加者がゼロになるまでは続けようと思っています!
https://flutter-okinawa.connpass.com/event/310848/

CBcloud Tech Blog

Discussion