[Elixir] DynamicSupervisor と Registry を使ったプロセスの動的生成と管理
Elixir の主なユースケースはリアルタイムメッセージ配送だと思いますが、これをふわっとした要件にすると例えば以下になります。
- WebSocket コネクションに対してメッセージを送る
- MQTT トピックに対して PubSub する
- チャットルームに投稿されたメッセージを参加者に配る
これらの実装を考えてみると、どれでも以下のような同じ処理が必要になります。
- {セッション}に対してメッセージ配送が必要になったタイミングで対応するプロセスを生成する
- 生成したプロセスを{セッション}の ID をキー、プロセスを値にした Elixir 内の KVS に登録する
- ID をキーにした命令を受信したら KVS から対応するプロセスを取得して、そのプロセスに対してメッセージを転送する
- {セッション}に対してメッセージ配送が不要になったタイミングで対応するプロセスを終了する
- プロセスの正常異常にかかわらず終了したタイミングで KVS から削除する
{セッション}に "WebSocket コネクション"、"MQTT トピック"、"チャットルーム"のどれでもあてはめることができます。
この処理を実現するために Elixir で提供されている標準ライブラリが DynamicSupervisor と Registry です。この文書では、DynamicSupervisor と Registry の使い方をサンプルコードを書きながら見ていきます。サンプルコードの全体は https://github.com/keshihoriuchi/samples/tree/master/elixir/registry_and_dynamic_supervisor にあります。Elixir 1.12 で動作確認しています。
実装
まず--sup オプション付きでプロジェクトを生成します。
$ mix new --sup session_manager
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/session_manager.ex
* creating lib/session_manager/application.ex
* creating test
* creating test/test_helper.exs
* creating test/session_manager_test.exs
Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:
cd session_manager
mix test
Run "mix help" for more commands.
以下の 3 つの名前のプロセスを定義していきます。
名前 | 概要 |
---|---|
SessionManager.SessionWorker | 先の例での生成終了対象となる GenServer。 |
SessionManager.SessionSupervisor | SessionWorker を監視する DynamicSupervisor。 |
SessionManager.SessionRegistry | 先の例でのプロセスを登録する KVS。Registry を使う。 |
lib/session_manager/application.ex
でstart/2
の children
を以下のように書き換えます。
def start(_type, _args) do
children = [
{DynamicSupervisor, name: SessionManager.SessionSupervisor, strategy: :one_for_one},
{Registry, keys: :unique, name: SessionManager.SessionRegistry}
]
opts = [strategy: :one_for_all, name: SessionManager.Supervisor]
Supervisor.start_link(children, opts)
end
これで SessionManager.SessionSupervisor と SessionManager.SessionRegistry が SessionManager.Supervisor の配下に生成されることになります。Elixir では Supervisor に{<モジュール名>, <引数>}
のタプルを与えると<モジュール名>.start_link(<引数>)
が呼ばれます。Registry のkeys:
オプションで:unique
オプションを指定していますが、代わりに:duplicate
オプションも指定できます。:unique
だと同じキーに登録できるプロセスが単一に限定され、:duplicate
だと複数が許可されます。
また、optsのstrategy:
を:one_for_all
に変更します。DynamicSupervisorとRegistryは相互に整合性を保っている必要があり、片方がクラッシュして再起動したらもう片方も再起動してほしいからです。
次に SessionWorker を書きます。
defmodule SessionManager.SessionWorker do
use GenServer, restart: :temporary
# (Dynamic)Supervisor向けのAPI
def start_link({k, v}) do
name = {:via, Registry, {SessionManager.SessionRegistry, k}}
GenServer.start_link(__MODULE__, {k, v}, name: name)
end
# 以下2つClient向けのAPI
def get_value(pid) do
GenServer.call(pid, :getvalue)
end
def stop(pid) do
GenServer.cast(pid, :stop)
end
# 以下3つGenServerのコールバック実装
@impl true
def init({k, v}) do
{:ok, {k, v}}
end
@impl true
def handle_call(:getvalue, _from, {k, v}) do
{:reply, v, {k, v}}
end
@impl true
def handle_cast(:stop, state) do
{:stop, :normal, state}
end
end
use GenServer, restart: :temporary
の :temporary
でプロセスが終了したらいかなる場合も再起動しないことを表します。DynamicSupervisor の監視対象にする GenServer は大体このオプションが良いかと思います。デフォルトだといかなる場合も再起動する:permanent
になるので注意が必要です。
GenServer.start_link/3
のname:
に{:via, Registry, {SessionManager.SessionRegistry, k}}
を与えています。これでプロセス生成時に k をキーにして SessionRegistry に登録する、という意味になります。
一通り実装できたので使い方を表すテストコードを書きます。
defmodule SessionManager.SessionManagerTest do
use ExUnit.Case
alias SessionManager.SessionSupervisor
alias SessionManager.SessionRegistry
alias SessionManager.SessionWorker
test "regsiter and unregister" do
{:ok, pid} = DynamicSupervisor.start_child(SessionSupervisor, {SessionWorker, {"k1", "v1"}})
assert [{pid, nil}] == Registry.lookup(SessionRegistry, "k1")
ref = Process.monitor(pid)
assert SessionWorker.get_value(pid) == "v1"
SessionWorker.stop(pid)
assert_receive({:DOWN, ^ref, :process, ^pid, :normal})
:ok = wait_until_process_removed(20)
end
defp wait_until_process_removed(0) do
:error
end
defp wait_until_process_removed(n) do
case Registry.lookup(SessionRegistry, "k1") do
[] ->
:ok
[{_pid, nil}] ->
Process.sleep(50)
wait_until_process_removed(n - 1)
end
end
end
{:ok, pid} = DynamicSupervisor.start_child(SessionSupervisor, {SessionWorker, {"k1", "v1"}})
で SessionSupervisor の監視対象として SessionWorker.start_link/1
に{"k1", "v1"}
を与えて生成する、という意味になります。application.ex
の SessionManager.Supervisor の場合と同様に、ここでも {<モジュール名>, <引数>}
のタプルを与えると<モジュール名>.start_link(<引数>)
が呼ばれます。
これで、[{pid, nil}] = Registry.lookup(SessionRegistry, "k1")
で k1 をキーにして生成したプロセスの pid を取得できるようになります。
プロセスを停止してそのモニターを受け取った後、再度Registry.lookup(SessionRegistry, "k1")
すると[]
が返ってきてほしいところですがそうなりません。プロセスの停止と Registry から削除されるまでの間にタイムラグが存在します。よって、テストコードでは苦しいですがループして[]
になるまで待機しています。
停止済みの pid を lookup で引いた場合の処理はアプリケーションの要件依存になると思いますが、例えば ping メッセージを send して pong メッセージを receive する前にモニターの DOWN を receive したら lookup からやり直す、といった処理が必要かもしれません。
参考
- 公式ガイド Mix and OTP
-
Dynamic supervisors - The Elixir programming language
- 監視対象プロセスの restart を
:temporary
にすることの解説もあります
- 監視対象プロセスの restart を
-
ETS - The Elixir programming language
- このセクションにくるまで ETS を使ったプロセスの KVS の実装方法について説明されていきますが、このセクションの最後で本番では Registry を使った方がいいよ、という趣旨のことが書いてあります
In practice, if you find yourself in a position where you need a process registry for dynamic processes, you should use the Registry module provided as part of Elixir.
- このセクションにくるまで ETS を使ったプロセスの KVS の実装方法について説明されていきますが、このセクションの最後で本番では Registry を使った方がいいよ、という趣旨のことが書いてあります
-
Dynamic supervisors - The Elixir programming language
- API リファレンス
- DynamicSupervisor — Elixir v1.12.2
-
Registry — Elixir v1.12.2
- Registrations のセクションにプロセスの停止と Registry から削除されるまでの間にタイムラグが存在することが書いてあります
Discussion