Open1

GenStage を再起動する方法

sankakusankaku

https://hexdocs.pm/gen_stage/GenStage.html#module-buffering-demand

GenStage で producer が空の events を送ったとき、consumer は新たに events が得られるまで待機する。
handle_demand/2 を使わずに(consumer からのバックプレッシャーによらずに)events を送るには、producer に handle_cast/2 を実装して GenStage.cast/2 から呼ぶか、 handle_call/3 を実装して GenStage.call/3 を使う。

てっきり空の events を送ったとき consumer は停止しているのだと思っていた。

defmodule A do
  use GenStage

  def start_link(number) do
    GenStage.start_link(__MODULE__, number, name: A)
  end

  def init(counter) do
    Process.send_after(self(), :cast_auto_reply, 100)

    {:producer, counter}
  end

  def handle_cast(:auto_reply, counter) do
    IO.inspect("handle_cast =================")
    Process.send_after(self(), :cast_auto_reply, 10_000)

    demand = 12
    events = Enum.to_list(counter..(counter + demand - 1))
    {:noreply, events, counter + demand}
  end

  def handle_info(:cast_auto_reply, state) do
    IO.inspect("handle_info cast_auto_reply")
    GenStage.cast(self(), :auto_reply)
    {:noreply, [], state}
  end

  def handle_demand(demand, counter) when demand > 0 do
    IO.inspect("A handle_demand #{demand}")
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    if :rand.uniform() < 0.1 do
      IO.inspect("empty demand ------------------")
      {:noreply, [], counter}
    else
      events = Enum.to_list(counter..(counter + demand - 1))
      {:noreply, events, counter + demand}
    end
  end
end
defmodule B do
  use GenStage

  def start_link(multiplier) do
    GenStage.start_link(__MODULE__, multiplier, name: B)
  end

  def init(multiplier) do
    {:producer_consumer, multiplier, subscribe_to: [{A, max_demand: 10}]}
  end

  def handle_events(events, _from, multiplier) do
    events = Enum.map(events, &(&1 * multiplier))
    {:noreply, events, multiplier}
  end
end
defmodule ConsumerSup do
  use ConsumerSupervisor

  def start_link(arg) do
    ConsumerSupervisor.start_link(__MODULE__, arg)
  end

  def init(_arg) do
    # Note: By default the restart for a child is set to :permanent
    # which is not supported in ConsumerSupervisor. You need to explicitly
    # set the :restart option either to :temporary or :transient.
    children = [%{id: Printer, start: {Printer, :start_link, []}, restart: :transient}]
    # children = [
    #   {Printer, []}
    # ]

    opts = [strategy: :one_for_one, subscribe_to: [{B, max_demand: 50}]]
    ConsumerSupervisor.init(children, opts)
  end
end

defmodule Printer do
  def start_link(event) do
    # Note: this function must return the format of `{:ok, pid}` and like
    # all children started by a Supervisor, the process must be linked
    # back to the supervisor (if you use `Task.start_link/1` then both
    # these requirements are met automatically)
    Task.start_link(fn ->
      if rem(event, 100) == 0 do
        IO.inspect("kill Printer #{event}")
        1 / 0
      else
        IO.inspect({self(), event})
      end
    end)
  end
end
defmodule TryGenstage.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {A, 0},
      {B, 2},
      {ConsumerSup, []}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: TryGenstage.Supervisor]
    Supervisor.start_link(children, opts)
  end
end