mutex.dart(=ミューテックス排他制御)がおもしろい
2024/06/17 追記
利用編とも呼べる続編も書いたのでそちらも読んでみてください。
きっかけ
それはGeminiを試そうと、以下の公式サンプルで遊んでいたときのことでした。google_generative_aiパッケージの実装であるchat.dartを眺めていると、定義された変数の中に生成AI関連っぽくないやつがいる…なにこれ!?となったというわけです。
使用しているfinal class ChatSession {
//こういうのは生成AIの関連っぽい
final Future<GenerateContentResponse> Function(Iterable<Content> content,
{List<SafetySetting>? safetySettings,
GenerationConfig? generationConfig}) _generateContent;
//こういうのも生成AIの関連っぽい
final Stream<GenerateContentResponse> Function(Iterable<Content> content,
{List<SafetySetting>? safetySettings,
GenerationConfig? generationConfig}) _generateContentStream;
//なにこれ!?
final _mutex = Mutex();
Mutexとは?
タイトルですでに答えをいっちゃってますが、「(狭義の)排他制御」のことです。
英語に強い人や、バックエンドの経験がある人、またはchat.dartでの使い方を見れば勘のいい人はわかったと思います。
Future<GenerateContentResponse> sendMessage(Content message) async {
//非同期APIを実行する前にロックの取得
final lock = await _mutex.acquire();
try {
//APIの実行
final response = await _generateContent(_history.followedBy([message]),
//略
return response;
} finally {
//実行後はロックのリリース
lock.release();
}
}
mutex.dartのここがすごい!
まずコード全文は以下のように(コメントを除けば)40行ほどと大変お手頃です
import 'dart:async';
import 'dart:collection' show Queue;
class Mutex {
final Queue<Completer<Lock>> _pending = Queue();
Future<Lock> acquire() {
final completer = Completer<Lock>();
_pending.add(completer);
if (_pending.length == 1) {
completer.complete(Lock._(this));
}
return completer.future;
}
void _release() {
assert(_pending.isNotEmpty);
assert(_pending.first.isCompleted);
_pending.removeFirst();
if (_pending.isNotEmpty) {
_pending.first.complete(Lock._(this));
}
}
}
class Lock {
Mutex? _mutex;
Lock._(this._mutex);
void release() {
final mutex = _mutex;
if (mutex == null) throw StateError('Already released');
_mutex = null;
mutex._release();
}
}
挙動をざっくり説明すると、
Mutexクラスのacuire
メソッドは、実行する度に_pending
というキューに、タスクに割り当てるLock(のCompleter)を追加していきます。
a. タスクがひとつなら即時実行してLockオブジェクトを返します。b.そうでないなら実行せず待ち状態となります。
Future<Lock> acquire() {
final completer = Completer<Lock>();
_pending.add(completer);
if (_pending.length == 1) {
completer.complete(Lock._(this));
}
return completer.future;
}
a.に該当した場合、呼び出しもとでは受けとったLockインスタンスに対してタスクの実行後、release
メソッドを実行します。
release
メソッドではコンストラクタのmutex
の初期化をした上で、Mutexクラスの_release
メソッドを実行します。
void release() {
final mutex = _mutex;
if (mutex == null) throw StateError('Already released');
_mutex = null;
mutex._release();
}
_release
メソッドでは_pending
先頭の処理が完了したものを排除してから、b.の場合に待ち状態となっていた先頭を即時実行して(次のタスクの)Lockを取得します
void _release() {
assert(_pending.isNotEmpty);
assert(_pending.first.isCompleted);
_pending.removeFirst();
if (_pending.isNotEmpty) {
_pending.first.complete(Lock._(this));
}
}
こうすることで例えばchat.dartのsendMessage
の場合は、連続して実行されたとしてもチャット順を保存する変数であるhisotry
の整合性が保たれるようになっています。
これだけでも十分すごいのですが、個人的には以下の点でも新たな発見がありました。
-
dart:collection
のQueueクラスを使っている- その存在すら知らなかったんですが、FIFO(先入先出)に最適化されたListです
- Completerの使い方がわかる
- Futureだと非同期処理の開始がコントロールしにくいので、コントロールしたいときに使います。非同期処理の実行は
complete
メソッドを使います
- Futureだと非同期処理の開始がコントロールしにくいので、コントロールしたいときに使います。非同期処理の実行は
- プライベートコンストラクタの適切な使用
-
Lock._(this._mutex)
のことです。こうすることでLockインスタンスは、Mutexクラスのaquireメソッドからしか生成されないようになってます
-
- 相互に参照のあるクラス定義
- Mutexクラスは変数宣言からLockクラスを参照してますし、Lockクラスはコンストラクの引数としてMutexクラスを参照してます。つまり1つのクラスとして書けるものを2つのクラスに「あえて」分割することで、Mutexは排他制御(=順番待ち)に関することだけ、Lockはロック状態(=解放)に関することだけをしていて、単一責任の原則に沿った構成が実現できています。
このように相互に参照のあるクラスは自分では書こうという気すら起こらないので一番の発見でした
- Mutexクラスは変数宣言からLockクラスを参照してますし、Lockクラスはコンストラクの引数としてMutexクラスを参照してます。つまり1つのクラスとして書けるものを2つのクラスに「あえて」分割することで、Mutexは排他制御(=順番待ち)に関することだけ、Lockはロック状態(=解放)に関することだけをしていて、単一責任の原則に沿った構成が実現できています。
余談
隔月でFlutter.Okinawaというイベントを開催しています。
一周年を迎えて順調に減ってきている参加者がゼロになるまでは続けようと思っています!
Discussion