🦔

[Elixir] DynamicSupervisor と Registry を使ったプロセスの動的生成と管理

6 min read

Elixir の主なユースケースはリアルタイムメッセージ配送だと思いますが、これをふわっとした要件にすると例えば以下になります。

  • WebSocket コネクションに対してメッセージを送る
  • MQTT トピックに対して PubSub する
  • チャットルームに投稿されたメッセージを参加者に配る

これらの実装を考えてみると、どれでも以下のような同じ処理が必要になります。

  1. {セッション}に対してメッセージ配送が必要になったタイミングで対応するプロセスを生成する
  2. 生成したプロセスを{セッション}の ID をキー、プロセスを値にした Elixir 内の KVS に登録する
  3. ID をキーにした命令を受信したら KVS から対応するプロセスを取得して、そのプロセスに対してメッセージを転送する
  4. {セッション}に対してメッセージ配送が不要になったタイミングで対応するプロセスを終了する
  5. プロセスの正常異常にかかわらず終了したタイミングで KVS から削除する

{セッション}に "WebSocket コネクション"、"MQTT トピック"、"チャットルーム"のどれでもあてはめることができます。

この処理を実現するために Elixir で提供されている標準ライブラリが DynamicSupervisor と Registry です。この文書では、DynamicSupervisor と Registry の使い方をサンプルコードを書きながら見ていきます。サンプルコードの全体は https://github.com/keshihoriuchi/samples/tree/master/elixir/registry_and_dynamic_supervisor にあります。Elixir 1.12 で動作確認しています。

実装

まず--sup オプション付きでプロジェクトを生成します。

$ mix new --sup session_manager
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/session_manager.ex
* creating lib/session_manager/application.ex
* creating test
* creating test/test_helper.exs
* creating test/session_manager_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd session_manager
    mix test

Run "mix help" for more commands.

以下の 3 つの名前のプロセスを定義していきます。

名前 概要
SessionManager.SessionWorker 先の例での生成終了対象となる GenServer。
SessionManager.SessionSupervisor SessionWorker を監視する DynamicSupervisor。
SessionManager.SessionRegistry 先の例でのプロセスを登録する KVS。Registry を使う。

lib/session_manager/application.exstart/2children を以下のように書き換えます。

lib/session_manager/application.ex
def start(_type, _args) do
  children = [
    {DynamicSupervisor, name: SessionManager.SessionSupervisor, strategy: :one_for_one},
    {Registry, keys: :unique, name: SessionManager.SessionRegistry}
  ]

  opts = [strategy: :one_for_all, name: SessionManager.Supervisor]
  Supervisor.start_link(children, opts)
end

これで SessionManager.SessionSupervisor と SessionManager.SessionRegistry が SessionManager.Supervisor の配下に生成されることになります。Elixir では Supervisor に{<モジュール名>, <引数>}のタプルを与えると<モジュール名>.start_link(<引数>)が呼ばれます。Registry のkeys:オプションで:uniqueオプションを指定していますが、代わりに:duplicateオプションも指定できます。:uniqueだと同じキーに登録できるプロセスが単一に限定され、:duplicateだと複数が許可されます。

また、optsのstrategy::one_for_allに変更します。DynamicSupervisorとRegistryは相互に整合性を保っている必要があり、片方がクラッシュして再起動したらもう片方も再起動してほしいからです。

次に SessionWorker を書きます。

lib/session_manager/session_worker.ex
defmodule SessionManager.SessionWorker do
  use GenServer, restart: :temporary

  # (Dynamic)Supervisor向けのAPI
  def start_link({k, v}) do
    name = {:via, Registry, {SessionManager.SessionRegistry, k}}
    GenServer.start_link(__MODULE__, {k, v}, name: name)
  end

  # 以下2つClient向けのAPI
  def get_value(pid) do
    GenServer.call(pid, :getvalue)
  end

  def stop(pid) do
    GenServer.cast(pid, :stop)
  end

  # 以下3つGenServerのコールバック実装
  @impl true
  def init({k, v}) do
    {:ok, {k, v}}
  end

  @impl true
  def handle_call(:getvalue, _from, {k, v}) do
    {:reply, v, {k, v}}
  end

  @impl true
  def handle_cast(:stop, state) do
    {:stop, :normal, state}
  end
end

use GenServer, restart: :temporary:temporary でプロセスが終了したらいかなる場合も再起動しないことを表します。DynamicSupervisor の監視対象にする GenServer は大体このオプションが良いかと思います。デフォルトだといかなる場合も再起動する:permanentになるので注意が必要です。

GenServer.start_link/3name:{:via, Registry, {SessionManager.SessionRegistry, k}}を与えています。これでプロセス生成時に k をキーにして SessionRegistry に登録する、という意味になります。

一通り実装できたので使い方を表すテストコードを書きます。

test/session_manager_test.exs
defmodule SessionManager.SessionManagerTest do
  use ExUnit.Case

  alias SessionManager.SessionSupervisor
  alias SessionManager.SessionRegistry
  alias SessionManager.SessionWorker

  test "regsiter and unregister" do
    {:ok, pid} = DynamicSupervisor.start_child(SessionSupervisor, {SessionWorker, {"k1", "v1"}})
    assert [{pid, nil}] == Registry.lookup(SessionRegistry, "k1")
    ref = Process.monitor(pid)
    assert SessionWorker.get_value(pid) == "v1"
    SessionWorker.stop(pid)
    assert_receive({:DOWN, ^ref, :process, ^pid, :normal})
    :ok = wait_until_process_removed(20)
  end

  defp wait_until_process_removed(0) do
    :error
  end

  defp wait_until_process_removed(n) do
    case Registry.lookup(SessionRegistry, "k1") do
      [] ->
        :ok

      [{_pid, nil}] ->
        Process.sleep(50)
        wait_until_process_removed(n - 1)
    end
  end
end

{:ok, pid} = DynamicSupervisor.start_child(SessionSupervisor, {SessionWorker, {"k1", "v1"}}) で SessionSupervisor の監視対象として SessionWorker.start_link/1{"k1", "v1"}を与えて生成する、という意味になります。application.exの SessionManager.Supervisor の場合と同様に、ここでも {<モジュール名>, <引数>}のタプルを与えると<モジュール名>.start_link(<引数>)が呼ばれます。

これで、[{pid, nil}] = Registry.lookup(SessionRegistry, "k1") で k1 をキーにして生成したプロセスの pid を取得できるようになります。

プロセスを停止してそのモニターを受け取った後、再度Registry.lookup(SessionRegistry, "k1")すると[]が返ってきてほしいところですがそうなりません。プロセスの停止と Registry から削除されるまでの間にタイムラグが存在します。よって、テストコードでは苦しいですがループして[]になるまで待機しています。

停止済みの pid を lookup で引いた場合の処理はアプリケーションの要件依存になると思いますが、例えば ping メッセージを send して pong メッセージを receive する前にモニターの DOWN を receive したら lookup からやり直す、といった処理が必要かもしれません。

参考

  • 公式ガイド Mix and OTP
    • Dynamic supervisors - The Elixir programming language
      • 監視対象プロセスの restart を :temporary にすることの解説もあります
    • ETS - The Elixir programming language
      • このセクションにくるまで ETS を使ったプロセスの KVS の実装方法について説明されていきますが、このセクションの最後で本番では Registry を使った方がいいよ、という趣旨のことが書いてあります

        In practice, if you find yourself in a position where you need a process registry for dynamic processes, you should use the Registry module provided as part of Elixir.

  • API リファレンス

Discussion

ログインするとコメントできます