JavaScriptでロックを実装する
もともとの話は、私がDartでロックを実装したことが始まりでした。それまではsynchronizedパッケージを利用していましたが、なぜそんなことをしたかと言えば、このくらいの処理であれば、誰にでも仕組みを解説できるように、よくも悪くも中身を把握しておきたかったのです。
そのあとで、もっと大勢の方にとって分かりやすく、試しやすいようにJavaScriptに移植することにしました。DartはJavaScriptの置き換えを狙って作られた言語なので、少し癖はありますが、同じように書けば同じように動作するので、この作業は簡単でした。つまり、私がDartで実装したロックと同じものをJavaScriptで実装したことになります。
本稿ではJavaScriptによるロックの実装を解説しますが、独特で少し理解しにくいところもあるので、なるべく具体的かつ難しくなりすぎないように解説し、その仕組みに対する理解を深めていきます。
なお、Dartのコードは一切出てきません🎯
ロックが必要なとき
いつどんなときでもロックが必要というわけではありません。ロックを取って排他制御ができるのであれば安心ではありますが、それなりにコストのかかる作業ですし、必要のないロックを取ることによって性能にも影響が出ます。
では、ロックが必要なときとは、どういうときでしょうか?ビジネスロジックではいろんな状況があるとは思いますが、私の失敗経験からふたつほど紹介します(つまりやらかしたということです)。
処理の順番が重要なとき
JavaScriptはシングルスレッドなので、同じコードが一緒のタイミングで動作することはありませんが、プログラマが予期しない動作は発生します。次のコードを見てみましょう。
const statusHistory = []
element.addEventListener('click', async () => {
const response = await fetch('https://example.com')
statusHistory.push(response.statusCode)
})
このコードは特定のエレメントをクリックしたときに、https://example.com にHTTPリクエストを行い、そのステータスコードを変数に保存するものです。ここで、ごく短い時間に複数のクリックイベントが発生したとすると、どうなるでしょうか?
- statusHistoryはイベントが発生した順番でステータスコードが入る
- statusHistoryはイベント内のawaitが終了した順番でステータスコードが入る
正解は後者になります。つまり、シングルスレッドで動作するということは実行順で処理されるということと同義ではありません。await
による待機や then()
によるコールバックを利用する際は、イベントの発生順とプログラマが思っている処理順が一致しないことがあります。
そうなると悩ましいことが起きます。例えば特定のリソースにアクセスする際に、セッション情報が必要であるとします。セッション情報はそれを管理しているリポジトリに保存されていて、問い合わせるとキャッシュされたセッション情報を返してくれます。もしセッション情報が古くなっていれば、サーバーに問い合わせてセッションを更新して新しいものを返してくれるようになっています。
このリポジトリに対してA, Bというふたつの処理からアクセスがある場合に、A→Bの順番でアクセスしたらA→Bの順番で処理を実行してもらわないと困ったことになります。なぜなら、セッションが切れているときに、AとBが同時にアクセスすると、それぞれでセッションの更新が発生して、あとに実行されたほうのみが有効なセッションとなってしまうからです。こういうときには排他ロックを使って、必ず順序を保証して不整合が発生しないようにします。
多重でイベントが発火するとき
多重でイベントが発生する状況でもロックは有効です。画面を切り替えて戻ってきたときに、アップデートの確認をして、アップデートが必要であればそれを促すためのダイアログを表示したいとします。このダイアログは閉じられるまで await
で待機します。
document.addEventListener('visibilitychange', async () => {
if (document.visibilityState === 'visible') {
const mustUpdate = await getUpdateInfo()
if (mustUpdate) {
// アップデートダイアログを表示する。閉じられるまでここで待機する
await showDialog()
}
}
});
このコードの問題点は、画面を何回か連続で切り替えると、イベント自体は発生しているので切り替えた回数だけ続けてダイアログが表示されるということです。ユーザーインタフェースの観点からも、すでに表示されていれば表示しない、という考慮を入れたいところです。
少し慣れた人であればこのコードをどう修正すれば一瞬でわかるはずです。そうです、フラグを用意してやればいいのです。こんな感じで…。
let blocking = false
async function f() {
if (!blocking) {
blocking = true
// なにか処理をする
blocking = false
}
}
この方法はとにかく単純で、誰が見てもやりたいことがわかります。プロジェクトが十分に小さく、コードの把握が容易なときには有効な手段です。一方で、フラグを正しく制御することが前提なので、ちょっと賢くて人を信じられない人であれば、フラグの戻し忘れを防ぐためにリソース解放の仕組みを導入したくなります。
通常の排他ロックでは、すでにロックを獲得している処理があれば、あとから来た処理は先に来た処理が終わるまで待たされます。ここではダイアログが表示されていれば表示しないようにしたい、ということなので、前と同じ方法ではダイアログを閉じたあとに同じダイアログが表示される状況になるのでうまくいきません。
ですが、ノンブロッキングで排他ロックを獲得する、という考え方を導入すると、すでに実行中の処理があればロックが取れず実行しませんし、なければ実行するようになるので、フラグによるダイアログの表示制御をしていたときと同じことが実現できます。さらに嬉しいことは、ロックすることだけを考えればいいので、フラグの制御漏れを考えなくてよくなります。
Promiseを理解する
本稿ではロックにPromiseを使うので、そのためにはまずPromiseの知識が必要になります。初見だとPromiseの動作はトリッキーに映りますが、ひとつずつ順を追っていくことで、ロック実装のために最低限必要な知識を学んでいきたいと思います。
正直、私も最初はPromise(DartだとFuture)のことは曖昧に捉えていましたが、使っていくうちに慣れて「そういうもの」だと認識できるようになったので、最初はよくわからなくても問題ありません。おそれず手を出し続けていれば、そのうちわかるようになってくると思います。
Promiseとはなにか?
Promiseを簡単に言えば「いつ終わるかわからない処理」のことです。すぐに終わるかもしれませんし、数秒、数分かかるかもしれません。結果が正常に返ってくるかもしれませんし、異常終了するかもしれません。「人をおちょくってるとブッ飛ばすぞ!」と思うかもしれませんが、正直言ってそういう性質のものです。
新しいPromiseを作るのは簡単です。Promiseオブジェクトのコンストラクタを呼び出すだけです。
new Promise(resolve => {}) // 新しい待機状態のPromiseが生成される
new Promise((resolve, reject) => {}) // rejectはオプション
Promiseオブジェクトには最低でも resolve
引数を受け取るコールバックを指定しなければなりません。追加で reject
を受け取ることもできます。もちろん、resolve
や reject
といった名前は便宜的なもので、どういった名前で受け取るかは自由です。
Promiseの待機と履行
Promiseを生成するとコールバックが評価されます。そして、resolve()
か reject()
が実行されるまで、待機し続けます。呼び出し側では、Promiseから結果が返ってくるまで待機することができ、then()
によるノンブロッキング形式と、await
によるブロッキング形式を選択できます。もちろん、待たないという選択もあります。
// then()で結果を待つ(ノンブロッキング)
new Promise(resolve => {}).then(() => {})
// awaitで結果を待つ(ブロッキング)
await new Promise(resolve => {})
さて、ここでさっそく待機の実験をしてみるために、次のようなコードを書いて動かしてみました。すると、一向にHello, world!が表示されないことがわかります。なぜなら、いくら待っても resolve()
も reject()
も実行されないからです。
window.addEventListener('DOMContentLoaded', async () => {
await new Promise(resolve => {}) // ここで待機している!
console.log('Hello, world!')
})
そこで次のようにコードを書き換えてみます。コールバックの引数として渡される resolve
を呼び出すことによってPromiseが待機状態から履行状態になり、await
による待機が終了します。
window.addEventListener('DOMContentLoaded', async () => {
await new Promise(resolve => resolve()) // 解決する
console.log('Hello, world!') // 表示される
})
resolve
は呼び出しの際に引数を与えることもでき、それがPromiseの履行結果にもなります。
// x = 42になる
const x = await new Promise(resolve => resolve(42))
ちょっと難しいことをしてみましょう。Promiseと setTimeout()
を組み合わせると面白いことができます。それがこの sleep()
関数です。
function sleep(milliseconds) {
// setTimeout()でresolve()を遅らせる
return new Promise(resolve => setTimeout(() => resolve(), milliseconds))
}
window.addEventListener('DOMContentLoaded', async () => {
// 1秒待ってからHello, world!を表示する
await sleep(1000)
console.log('Hello, world!')
})
この sleep()
関数は await
したときにしか期待する動作をしませんが、await
によるPromiseの待機の挙動が理解できたと思います。
Promiseの拒否
Promiseは履行するだけでなく、拒否もできます。それが reject()
の実行です。拒否すると resolve()
による履行と同じく待機が終了しますが、エラーという形で報告されます。
エラーは await
している場合にはtry-catchで捕捉できますし、await
していない場合には、catch()
で捕捉できます。
try {
await new Promise((_, reject) => reject(42))
}
catch (e) {
console.error(e) // 42
}
new Promise((_, reject) => reject(42))
.then(() => {})
.catch(e => console.error(e)) // 42
async関数
Promiseを作るにはもうひとつの方法としてasync関数があります。async
修飾子がついた関数の返値はPromiseで包まれます。下の例ではコード上では 42
を返していますが、実際には Promise.resolve(42)
が返されることになります。
async function f() {
// 実際はPromise.resolve(42)が返却される
return 42
}
function g() {
// 見てのとおりPromise.resolve(42)が返却される
return Promise.resolve(42)
}
async関数はPromiseと同様に then()
や await
で結果を待つことができます。さらに、async関数の中ではPromiseをブロッキングで待機する await
ができるようになります。
asyncコンテキストとawait
await
はPromiseをブロッキングで待機できる便利な機能ですが、asyncコンテキストの中でしか使用できません。では then()
を使うしかないかというとそうではありません。ここには抜け穴があり、それは「コールバックはasyncになれる」ということです。
例えばみなさんご存知の window.addEventListener()
はコールバックにasync関数を指定することができます。
window.addEventListener('DOMContentLoaded', async () => {
// asyncコンテキスト これが大事⤴︎
const result = await f()
console.log(result)
})
ロックを実装する
ここまででPromiseの基本的な挙動は説明したので、ようやくロックの実装に移っていきます。Promiseについてあれやこれの説明がないぞ、という人がいるかもしれませんが、そういったところについてはほかの文献にお任せしたいと思います。
基本的なロックの実装は、Danny Kim氏のSimple TypeScript Mutex Implementationがわかりやすいので、これを参考にします。このコードはTypeScriptのものなので、JavaScriptに直した上で、利便性や理解を助けるために一部変更を加えています。
要件を整理する
ロックとはなにか?を今一度整理してみましょう。おおよそ次の3つが満たされていればそれは排他ロックと言えそうです。
- ロック範囲において、あるひとつの処理だけが同時に実行できる
- ロックを獲得できるのは早い者勝ちで、あとから来たものは待たされる(ブロッキングする)
- ロックが解放されると次に待っている処理が実行される
1.と3.についてはキュー(Queue)を使うことで、2.のブロッキングについてはPromiseが持つ「resolve()
または reject()
されるまで待たされる」という特性を応用すれば実現できそうです。それでは実装していきましょう。
1. 管理用プロパティを定義する
ロックの実現にはプロパティとしてキューがあれば十分です。参考文献ではもうひとつのプロパティとしてロックフラグを用意していますが、ここではよりコードがよりシンプルになるように、キューに追加するオブジェクトにロックフラグを持たせています。
class Lock {
constructor() {
// ロックを管理しておくためのキューで、ひとつずつ順番に取り出されて処理される
// 処理待ちのものはawaitによって待たされる(ブロックされる)
this._queue = []
}
/*
* @returns {boolean}
*/
get locked() {
// ロックが獲得されているか確認する
// (this._queue[0]?.locked ?? false) でもよい
return this._queue.some(x => x.locked)
}
}
2. ロックを獲得するまで待たせる: acquire()
ロックを獲得するまで待たせるメソッド acquire()
を定義します。このメソッドは実行するとPromiseを返しますが、acquire()
の中では resolve()
されないので、このあとに出てくる _release()
でロックが獲得できるまで呼び出し元は待機することになります。
/**
* @param {Object} options
* @param {boolean} options.blocking
* @param {number?} options.timeout
* @returns {Promise<Function()>}
*/
acquire({blocking = true, timeout} = {}) {
return new Promise(resolve => {
// キューにresolveを追加する。このresolveはこの場で解決されないので
// acquire()を呼び出すだけでは待たされ続ける(ブロックされる)
this._queue.push({resolve, locked: false})
// キューに入っている要素を評価する
this._release()
})
}
オプションで blocking
と timeout
を受け取るようになっていますが、後々のための用意なので今は気にしないでください。
3. ロックを獲得して解除させる: _release()
_release()
はキューの先頭の要素を確認して、ロックを獲得しようとします。ロックを獲得すると、ロック解放用の無名関数を返します。
_release() {
// 範囲外アクセスはundefinedになるのでこのコードは正しい
const first = this._queue[0]
if (!first || first.locked) {
// 要素がないか、ロックをすでに獲得できていれば無視する
return
}
// ここに到達したならロックされていないので、ロックを獲得する
first.locked = true
// ここでawait acquire()が待機が解除される
first.resolve(() => {
// この無名関数を評価すると現在処理中のロックをキューから取り除いてロックを解除する
this._queue.shift()
this._release()
})
}
ここの肝は first.resolve()
です。これによって、acquire()
が返したPromiseの待機が解除されることになりますが、その結果は first.resolve()
に与えられている無名関数です。この関数はキューの先頭要素を取り除くことによってロックを解除し、キューの新しい先頭要素を評価します。
つまり、acquire()
によってロックを獲得するとロック解放用の関数を返すので、ロック中にやりたいことを実行して、ロック解放用の関数を実行することで、(ロック獲得)→(任意の処理)→(ロック解放)→(次のロックを獲得)→…といった一連の流れを実現できます。
4. 組み合わせる: runExclusive()
最初のうちはこの組み合わさった形を見ないと、acquire()
と _release()
だけ見せられたとしてもピンとこないかもしれません。ふたつのメソッドを組み合わせて、ロックしたい処理を引数に取って実行する runExclusive()
を定義します。
/**
* @param {AsyncFunction<T>} callback
* @param {Object} options
* @param {boolean} options.blocking
* @param {number?} options.timeout
* @returns {T}
*/
async runExclusive(callback, {blocking = true, timeout} = {}) {
// ロックを獲得する。release()を実行するとロックを解放できる
const release = await this.acquire({blocking, timeout})
try {
// ロックが獲得できたら、与えられたコールバックを実行する
return await callback()
}
finally {
// デッドロックにならないように必ずfinallyでロックを解除する
release()
}
}
5. 試してみる
ロックが実装できたのでテストしてみましょう。テスト用の関数 push()
は配列に要素を追加するだけのものですが、処理前にランダムな遅延が発生するようにしています。そのため、連続で呼び出されたとしても、実行順と処理順が一致しません。pushWithLock()
は push()
を排他ロックで包んでいて、あとから来た処理は前の処理が終了するまで待たされることになります。
window.addEventListener('DOMContentLoaded', async () => {
// [0, ..., 9] の配列を生成する
const seq = [...new Array(10)].map((_, i) => i)
async function push(a, x) {
// ランダムに0~100ms待つので、呼び出し順と処理順は一致しないはず
await new Promise(resolve => setTimeout(resolve, Math.random() * 100))
a.push(x)
}
const lock = new Lock()
async function pushWithLock(a, x) {
// 排他ロックを取るようにする
await lock.runExclusive(async () => await push(a, x))
}
// ロックを取らないケース
const a = []
await Promise.all(seq.map(x => push(a, x)))
console.log(a) // [4, 1, 0, 2, 3, 7, 8, 9, 5, 6] (毎回変わる)
// ロックを取るケース
const b = []
await Promise.all(seq.map(x => pushWithLock(b, x)))
console.log(b) // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
})
実行してみると、ロックがない場合には乱数によって結果が毎回異なりますが、ロックがある場合には呼び出し順で処理されていることがわかります。もちろん、呼び出し順で実行されるということは、ロックしないときよりもすべての処理を実行するまでに、より多くの時間を要するということになります。
これはトレードオフなので、順不同で最速の処理をするか、時間が多少かかってもロックを取って順番を担保した処理をするかは見極めが重要です。
応用編
6. ブロッキングとタイムアウトを追加する
基本的な排他ロックの機能はここまでで実現できていますが、一手間加えることでタイムアウトとブロッキングを追加できます。このふたつを入れることで特定の状況で嬉しいことがあります。
ひとつめにロック獲得にタイムアウトを設定することは、失敗してもいい処理で、そのうち成功すればいいという状況ではパフォーマンスの改善が見込めます。好ましくはありませんが、特定の処理がほとんどロックを獲得しているようなときに、新しくロックを獲得しにいこうとするとかなりの時間待たされることが起き得ます。そういうときは現実的な範囲でタイムアウトを設定することで、「隙あらば」的なロックの獲得ができるでしょう。
ふたつめにノンブロッキングによるロック獲得は、すでに処理中の動作と同じ処理をさせないためのガードとして有効です。最初のほうで述べたように、なにかしらのイベントが発火した結果、ダイアログを await して表示する処理があった場合に、多重でイベントが発火しても、ロックの獲得に失敗するので同じダイアログが連続で表示されることを防げます。
さて、このふたつを実現するのはそれほど大変な仕事ではありません。acquire()
と _release()
に以下のコードを追加するだけです。
acquire({blocking = true, timeout} = {}) {
return new Promise((resolve, reject) => {
+ if (!blocking && this.locked) {
+ // ノンブロッキングでロックされているならロック獲得失敗
+ throw new LockError()
+ }
+ let timer = undefined
+ if (blocking && timeout?.constructor == Number) {
+ // ブロッキングでタイムアウトがあるならタイマーを設定する
+ timer = setTimeout(() => {
+ this._queue = this._queue.filter(x => x.resolve != resolve)
+ reject(new LockError())
+ }, timeout)
+ }
// キューにresolveを追加する。このresolveはこの場で解決されないので
// acquire()を呼び出すだけでは待たされ続ける(ブロックされる)
- this._queue.push({resolve, locked: false})
+ this._queue.push({resolve, timer, locked: false})
// キューに入っている要素を評価する
this._release()
})
}
_release() {
// 範囲外アクセスはundefinedになるのでこのコードは正しい
const first = this._queue[0]
if (!first || this.locked) {
// 要素がないか、ロックをすでに獲得されていれば無視する
return
}
+ // タイマーを解除する。clearTimeoutにundefinedを渡しても何も起きない
+ clearTimeout(first.timer)
// ここに到達したならロックされていないので、ロックを獲得する
first.locked = true
// ここでawait acquire()が履行状態になる
first.resolve(() => {
// 評価すると現在処理中のロックをキューから取り除いてロックを解除する
this._queue.shift()
this._release()
})
}
LockError
クラスは Error
クラスを継承したものです。特に面白いこともないので省略しています。
7. ふたたび試してみる
さきほどのテストコードと同じく、今度はブロッキングとタイムアウトを試してみましょう。
window.addEventListener('DOMContentLoaded', async () => {
// [0, ..., 9] の配列を生成する
const seq = [...new Array(10)].map((_, i) => i)
async function push(a, x) {
// 0~100ms待つ
await new Promise(resolve => setTimeout(resolve, Math.random() * 100))
a.push(x)
}
const lockA = new Lock()
async function pushWithLockA(a, x) {
try {
await lockA.runExclusive(async () => await push(a, x), {blocking: false})
}
catch {
}
}
const lockB = new Lock()
async function pushWithLockB(a, x) {
try {
await lockB.runExclusive(async () => await push(a, x), {timeout: 200})
}
catch {
}
}
const a = []
await Promise.all(seq.map(x => pushWithLockA(a, x)))
console.log(a) // [0] (ほかの処理はロックが獲得できずrejectされる)
const b = []
await Promise.all(seq.map(x => pushWithLockB(b, x)))
console.log(b) // [0, 1, 2, 3, 4, 5] (残りの処理はタイムアウトする)
})
pushWithLockA()
はノンブロッキングによるロックの獲得です。すでにロックが獲得されている状況では失敗するので、最初の処理のみロックを獲得できています。
pushWithLockB()
はタイムアウトつきのロックの獲得です。途中の処理までは実行できていますが、残りの処理はブロッキング中にタイムアウトを迎えて失敗しています。
8. 使い方
最後に作ったLockクラスの使い方です。試してみるの項から使い方は読み取れるとは思いますが、やっぱり単純な例はあったほうがいいので、改めて記載しておきます。
// 新しいロックを生成する
const lock = new Lock()
const result = await lock.runExclusive(async () => {
// 42を返すPromiseを実行
return await new Promise(resolve => resolve(42))
}, {blocking: true, timeout: 500})
console.log(result) // 42
まとめ
今回はPromiseとPromiseを使った排他ロックの実装を説明してきました。ロックは便利ではありますが、パフォーマンスの低下を起こすという副作用もありますので、用法容量を守って正しく排他制御をしましょう。
私も含めてほとんどの人はこういう部分に関しては責任を負いたくないので、実際にはライブラリに頼ることが多いとは思います。ですが、基本的な動作の仕組みを知っておくに越したことはありませんし、説明可能であることが大事なことも多いので、そういうところでこの記事がお役に立てたら嬉しいです。
おわり🌱
Discussion