Open1
GenStage を再起動する方法
GenStage で producer が空の events を送ったとき、consumer は新たに events が得られるまで待機する。
handle_demand/2
を使わずに(consumer からのバックプレッシャーによらずに)events を送るには、producer に handle_cast/2
を実装して GenStage.cast/2
から呼ぶか、 handle_call/3
を実装して GenStage.call/3
を使う。
てっきり空の events を送ったとき consumer は停止しているのだと思っていた。
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(__MODULE__, number, name: A)
end
def init(counter) do
Process.send_after(self(), :cast_auto_reply, 100)
{:producer, counter}
end
def handle_cast(:auto_reply, counter) do
IO.inspect("handle_cast =================")
Process.send_after(self(), :cast_auto_reply, 10_000)
demand = 12
events = Enum.to_list(counter..(counter + demand - 1))
{:noreply, events, counter + demand}
end
def handle_info(:cast_auto_reply, state) do
IO.inspect("handle_info cast_auto_reply")
GenStage.cast(self(), :auto_reply)
{:noreply, [], state}
end
def handle_demand(demand, counter) when demand > 0 do
IO.inspect("A handle_demand #{demand}")
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
if :rand.uniform() < 0.1 do
IO.inspect("empty demand ------------------")
{:noreply, [], counter}
else
events = Enum.to_list(counter..(counter + demand - 1))
{:noreply, events, counter + demand}
end
end
end
defmodule B do
use GenStage
def start_link(multiplier) do
GenStage.start_link(__MODULE__, multiplier, name: B)
end
def init(multiplier) do
{:producer_consumer, multiplier, subscribe_to: [{A, max_demand: 10}]}
end
def handle_events(events, _from, multiplier) do
events = Enum.map(events, &(&1 * multiplier))
{:noreply, events, multiplier}
end
end
defmodule ConsumerSup do
use ConsumerSupervisor
def start_link(arg) do
ConsumerSupervisor.start_link(__MODULE__, arg)
end
def init(_arg) do
# Note: By default the restart for a child is set to :permanent
# which is not supported in ConsumerSupervisor. You need to explicitly
# set the :restart option either to :temporary or :transient.
children = [%{id: Printer, start: {Printer, :start_link, []}, restart: :transient}]
# children = [
# {Printer, []}
# ]
opts = [strategy: :one_for_one, subscribe_to: [{B, max_demand: 50}]]
ConsumerSupervisor.init(children, opts)
end
end
defmodule Printer do
def start_link(event) do
# Note: this function must return the format of `{:ok, pid}` and like
# all children started by a Supervisor, the process must be linked
# back to the supervisor (if you use `Task.start_link/1` then both
# these requirements are met automatically)
Task.start_link(fn ->
if rem(event, 100) == 0 do
IO.inspect("kill Printer #{event}")
1 / 0
else
IO.inspect({self(), event})
end
end)
end
end
defmodule TryGenstage.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
{A, 0},
{B, 2},
{ConsumerSup, []}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: TryGenstage.Supervisor]
Supervisor.start_link(children, opts)
end
end