From 4400f0ece66c956ff6df21609ce9af21d9ccfb7b Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Wed, 12 Jun 2024 15:26:42 -0500 Subject: [PATCH 1/9] Optionally use genstage for consuming publish_mutation broadcast events --- lib/absinthe/subscription/local_consumer.ex | 12 ++++++ .../subscription/local_consumer_supervisor.ex | 20 ++++++++++ lib/absinthe/subscription/local_producer.ex | 40 +++++++++++++++++++ lib/absinthe/subscription/stage_supervisor.ex | 23 +++++++++++ lib/absinthe/subscription/supervisor.ex | 13 ++++-- mix.exs | 1 + mix.lock | 1 + 7 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 lib/absinthe/subscription/local_consumer.ex create mode 100644 lib/absinthe/subscription/local_consumer_supervisor.ex create mode 100644 lib/absinthe/subscription/local_producer.ex create mode 100644 lib/absinthe/subscription/stage_supervisor.ex diff --git a/lib/absinthe/subscription/local_consumer.ex b/lib/absinthe/subscription/local_consumer.ex new file mode 100644 index 00000000..bfd017dc --- /dev/null +++ b/lib/absinthe/subscription/local_consumer.ex @@ -0,0 +1,12 @@ +defmodule Absinthe.Subscription.LocalConsumer do + @moduledoc """ + Processes the publish_mutation request on the + local node. + """ + + def start_link({pubsub, mutation_result, subscribed_fields}) do + Task.start_link(fn -> + Absinthe.Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields) + end) + end +end diff --git a/lib/absinthe/subscription/local_consumer_supervisor.ex b/lib/absinthe/subscription/local_consumer_supervisor.ex new file mode 100644 index 00000000..fd79290b --- /dev/null +++ b/lib/absinthe/subscription/local_consumer_supervisor.ex @@ -0,0 +1,20 @@ +defmodule Absinthe.Subscription.LocalConsumerSupervisor do + @moduledoc """ + Supervisor for consuming publish_mutation requests + """ + + use ConsumerSupervisor + + alias Absinthe.Subscription.LocalProducer + alias Absinthe.Subscription.LocalConsumer + + def start_link(args) do + ConsumerSupervisor.start_link(__MODULE__, args) + end + + def init([min_demand, max_demand]) do + children = [%{id: LocalConsumer, start: {LocalConsumer, :start_link, []}, restart: :transient}] + opts = [strategy: :one_for_one, subscribe_to: [{LocalProducer, min_demand: min_demand, max_demand: max_demand}]] + ConsumerSupervisor.init(children, opts) + end +end diff --git a/lib/absinthe/subscription/local_producer.ex b/lib/absinthe/subscription/local_producer.ex new file mode 100644 index 00000000..56c9806c --- /dev/null +++ b/lib/absinthe/subscription/local_producer.ex @@ -0,0 +1,40 @@ +defmodule Absinthe.Subscription.LocalProducer do + @moduledoc """ + GenStage producer that listens for `publish_mutation` broadcasts + in order to process the subscriptions on this node. + """ + use GenStage + + def start_link(args) do + GenStage.start_link(__MODULE__, args) + end + + def topic(shard), do: "__absinthe__:proxy:#{shard}" + + def init([pubsub, shards, buffer_size]) do + Enum.each(shards, fn shard -> + :ok = pubsub.subscribe(topic(shard)) + end) + + # default buffer_size + {:producer, %{pubsub: pubsub, node: pubsub.node_name()}, buffer_size: buffer_size} + end + + @doc """ + Callback for the consumer to ask for more + subscriptions to process. Since we will be sending + them immediately when we get a message from pubsub, + this just sends an empty list + """ + def handle_demand(_demand, state) do + {:noreply, [], state} + end + + def handle_info(payload, state) do + if payload.node == state.node do + {:noreply, [], state} + else + {:noreply, [{state.pubsub, payload.mutation_result, payload.subscribed_fields}], state} + end + end +end diff --git a/lib/absinthe/subscription/stage_supervisor.ex b/lib/absinthe/subscription/stage_supervisor.ex new file mode 100644 index 00000000..83f85b88 --- /dev/null +++ b/lib/absinthe/subscription/stage_supervisor.ex @@ -0,0 +1,23 @@ +defmodule Absinthe.Subscription.StageSupervisor do + @moduledoc false + + use Supervisor + + def start_link([pubsub, registry, pool_size]) do + Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size}) + end + + def init({pubsub, _registry, pool_size}) do + min_demand = 1 + max_demand = pool_size + shards = Enum.to_list(0..(pool_size - 1)) + buffer_size = 10_000 + + children = [ + {Absinthe.Subscription.LocalProducer, [pubsub, shards, buffer_size]}, + {Absinthe.Subscription.LocalConsumerSupervisor, [min_demand, max_demand]} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 7b63ae4a..2eb69620 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -23,14 +23,21 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) + use_stage? = Keyword.get(opts, :use_stage?, true) - Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?}) + Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, use_stage?}) end - def init({pubsub, pool_size, compress_registry?}) do + def init({pubsub, pool_size, compress_registry?, use_stage?}) do registry_name = Absinthe.Subscription.registry_name(pubsub) meta = [pool_size: pool_size] + supervisor = if use_stage? do + Absinthe.Subscription.StageSupervisor + else + Absinthe.Subscription.ProxySupervisor + end + children = [ {Registry, [ @@ -40,7 +47,7 @@ defmodule Absinthe.Subscription.Supervisor do meta: meta, compressed: compress_registry? ]}, - {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} + {supervisor, [pubsub, registry_name, pool_size]} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/mix.exs b/mix.exs index 61d5c58b..d73b2306 100644 --- a/mix.exs +++ b/mix.exs @@ -73,6 +73,7 @@ defmodule Absinthe.Mixfile do defp deps do [ + {:gen_stage, "~> 1.2"}, {:nimble_parsec, "~> 1.2.2 or ~> 1.3"}, {:telemetry, "~> 1.0 or ~> 0.4"}, {:dataloader, "~> 1.0.0 or ~> 2.0", optional: true}, diff --git a/mix.lock b/mix.lock index 472f9ca1..09a8a594 100644 --- a/mix.lock +++ b/mix.lock @@ -8,6 +8,7 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, From a8b7973de955b1866b4512fe580d74252c380885 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Wed, 12 Jun 2024 15:38:22 -0500 Subject: [PATCH 2/9] Formatting --- .../subscription/local_consumer_supervisor.ex | 11 +++++++++-- lib/absinthe/subscription/supervisor.ex | 11 ++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/absinthe/subscription/local_consumer_supervisor.ex b/lib/absinthe/subscription/local_consumer_supervisor.ex index fd79290b..b731d928 100644 --- a/lib/absinthe/subscription/local_consumer_supervisor.ex +++ b/lib/absinthe/subscription/local_consumer_supervisor.ex @@ -13,8 +13,15 @@ defmodule Absinthe.Subscription.LocalConsumerSupervisor do end def init([min_demand, max_demand]) do - children = [%{id: LocalConsumer, start: {LocalConsumer, :start_link, []}, restart: :transient}] - opts = [strategy: :one_for_one, subscribe_to: [{LocalProducer, min_demand: min_demand, max_demand: max_demand}]] + children = [ + %{id: LocalConsumer, start: {LocalConsumer, :start_link, []}, restart: :transient} + ] + + opts = [ + strategy: :one_for_one, + subscribe_to: [{LocalProducer, min_demand: min_demand, max_demand: max_demand}] + ] + ConsumerSupervisor.init(children, opts) end end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 2eb69620..30b3af85 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -32,11 +32,12 @@ defmodule Absinthe.Subscription.Supervisor do registry_name = Absinthe.Subscription.registry_name(pubsub) meta = [pool_size: pool_size] - supervisor = if use_stage? do - Absinthe.Subscription.StageSupervisor - else - Absinthe.Subscription.ProxySupervisor - end + supervisor = + if use_stage? do + Absinthe.Subscription.StageSupervisor + else + Absinthe.Subscription.ProxySupervisor + end children = [ {Registry, From d507665e007f858ab5843183bb0d0cce290a2e4d Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Wed, 12 Jun 2024 15:38:54 -0500 Subject: [PATCH 3/9] Default use_stage? to false --- lib/absinthe/subscription/supervisor.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 30b3af85..facd51bf 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -23,7 +23,7 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) - use_stage? = Keyword.get(opts, :use_stage?, true) + use_stage? = Keyword.get(opts, :use_stage?, false) Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, use_stage?}) end From 422bb04c4b4f4c4dceb2f7c18bb59930731bc3aa Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Wed, 12 Jun 2024 16:07:21 -0500 Subject: [PATCH 4/9] give producer unique name so that it can be used many times in tests --- lib/absinthe/subscription/local_consumer_supervisor.ex | 5 ++--- lib/absinthe/subscription/local_producer.ex | 6 +++--- lib/absinthe/subscription/stage_supervisor.ex | 6 ++++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/lib/absinthe/subscription/local_consumer_supervisor.ex b/lib/absinthe/subscription/local_consumer_supervisor.ex index b731d928..31809a63 100644 --- a/lib/absinthe/subscription/local_consumer_supervisor.ex +++ b/lib/absinthe/subscription/local_consumer_supervisor.ex @@ -5,21 +5,20 @@ defmodule Absinthe.Subscription.LocalConsumerSupervisor do use ConsumerSupervisor - alias Absinthe.Subscription.LocalProducer alias Absinthe.Subscription.LocalConsumer def start_link(args) do ConsumerSupervisor.start_link(__MODULE__, args) end - def init([min_demand, max_demand]) do + def init([min_demand, max_demand, producer_name]) do children = [ %{id: LocalConsumer, start: {LocalConsumer, :start_link, []}, restart: :transient} ] opts = [ strategy: :one_for_one, - subscribe_to: [{LocalProducer, min_demand: min_demand, max_demand: max_demand}] + subscribe_to: [{producer_name, min_demand: min_demand, max_demand: max_demand}] ] ConsumerSupervisor.init(children, opts) diff --git a/lib/absinthe/subscription/local_producer.ex b/lib/absinthe/subscription/local_producer.ex index 56c9806c..8a9c63bf 100644 --- a/lib/absinthe/subscription/local_producer.ex +++ b/lib/absinthe/subscription/local_producer.ex @@ -5,13 +5,13 @@ defmodule Absinthe.Subscription.LocalProducer do """ use GenStage - def start_link(args) do - GenStage.start_link(__MODULE__, args) + def start_link([_pubsub, _shards, _buffer_size, name] = args) do + GenStage.start_link(__MODULE__, args, name: name) end def topic(shard), do: "__absinthe__:proxy:#{shard}" - def init([pubsub, shards, buffer_size]) do + def init([pubsub, shards, buffer_size, _name]) do Enum.each(shards, fn shard -> :ok = pubsub.subscribe(topic(shard)) end) diff --git a/lib/absinthe/subscription/stage_supervisor.ex b/lib/absinthe/subscription/stage_supervisor.ex index 83f85b88..dc2139c2 100644 --- a/lib/absinthe/subscription/stage_supervisor.ex +++ b/lib/absinthe/subscription/stage_supervisor.ex @@ -13,9 +13,11 @@ defmodule Absinthe.Subscription.StageSupervisor do shards = Enum.to_list(0..(pool_size - 1)) buffer_size = 10_000 + unique_producer_name = :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" + children = [ - {Absinthe.Subscription.LocalProducer, [pubsub, shards, buffer_size]}, - {Absinthe.Subscription.LocalConsumerSupervisor, [min_demand, max_demand]} + {Absinthe.Subscription.LocalProducer, [pubsub, shards, buffer_size, unique_producer_name]}, + {Absinthe.Subscription.LocalConsumerSupervisor, [min_demand, max_demand, unique_producer_name]} ] Supervisor.init(children, strategy: :one_for_one) From 5c87c2ba9ac8e67e52d7da4febc4aef3ae6b915f Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 13 Jun 2024 08:14:03 -0500 Subject: [PATCH 5/9] Formatting --- lib/absinthe/subscription/stage_supervisor.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/absinthe/subscription/stage_supervisor.ex b/lib/absinthe/subscription/stage_supervisor.ex index dc2139c2..9ad17d4b 100644 --- a/lib/absinthe/subscription/stage_supervisor.ex +++ b/lib/absinthe/subscription/stage_supervisor.ex @@ -13,11 +13,13 @@ defmodule Absinthe.Subscription.StageSupervisor do shards = Enum.to_list(0..(pool_size - 1)) buffer_size = 10_000 - unique_producer_name = :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" + unique_producer_name = + :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" children = [ {Absinthe.Subscription.LocalProducer, [pubsub, shards, buffer_size, unique_producer_name]}, - {Absinthe.Subscription.LocalConsumerSupervisor, [min_demand, max_demand, unique_producer_name]} + {Absinthe.Subscription.LocalConsumerSupervisor, + [min_demand, max_demand, unique_producer_name]} ] Supervisor.init(children, strategy: :one_for_one) From e9a3b410a54acf250fc4fa26a0f31bca9f90b529 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Mon, 17 Jun 2024 10:49:30 -0500 Subject: [PATCH 6/9] Rename stages. handle queueing instead of using internal buffer --- lib/absinthe/subscription/local_consumer.ex | 12 -- .../subscription/local_consumer_supervisor.ex | 26 ---- lib/absinthe/subscription/local_producer.ex | 40 ------- .../subscription/mutation_publish_listener.ex | 112 ++++++++++++++++++ .../mutation_publish_processor.ex | 42 +++++++ .../mutation_publish_processor_supervisor.ex | 30 +++++ .../mutation_publish_supervisor.ex | 22 ++++ lib/absinthe/subscription/stage_supervisor.ex | 27 ----- lib/absinthe/subscription/supervisor.ex | 20 +--- 9 files changed, 212 insertions(+), 119 deletions(-) delete mode 100644 lib/absinthe/subscription/local_consumer.ex delete mode 100644 lib/absinthe/subscription/local_consumer_supervisor.ex delete mode 100644 lib/absinthe/subscription/local_producer.ex create mode 100644 lib/absinthe/subscription/mutation_publish_listener.ex create mode 100644 lib/absinthe/subscription/mutation_publish_processor.ex create mode 100644 lib/absinthe/subscription/mutation_publish_processor_supervisor.ex create mode 100644 lib/absinthe/subscription/mutation_publish_supervisor.ex delete mode 100644 lib/absinthe/subscription/stage_supervisor.ex diff --git a/lib/absinthe/subscription/local_consumer.ex b/lib/absinthe/subscription/local_consumer.ex deleted file mode 100644 index bfd017dc..00000000 --- a/lib/absinthe/subscription/local_consumer.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule Absinthe.Subscription.LocalConsumer do - @moduledoc """ - Processes the publish_mutation request on the - local node. - """ - - def start_link({pubsub, mutation_result, subscribed_fields}) do - Task.start_link(fn -> - Absinthe.Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields) - end) - end -end diff --git a/lib/absinthe/subscription/local_consumer_supervisor.ex b/lib/absinthe/subscription/local_consumer_supervisor.ex deleted file mode 100644 index 31809a63..00000000 --- a/lib/absinthe/subscription/local_consumer_supervisor.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule Absinthe.Subscription.LocalConsumerSupervisor do - @moduledoc """ - Supervisor for consuming publish_mutation requests - """ - - use ConsumerSupervisor - - alias Absinthe.Subscription.LocalConsumer - - def start_link(args) do - ConsumerSupervisor.start_link(__MODULE__, args) - end - - def init([min_demand, max_demand, producer_name]) do - children = [ - %{id: LocalConsumer, start: {LocalConsumer, :start_link, []}, restart: :transient} - ] - - opts = [ - strategy: :one_for_one, - subscribe_to: [{producer_name, min_demand: min_demand, max_demand: max_demand}] - ] - - ConsumerSupervisor.init(children, opts) - end -end diff --git a/lib/absinthe/subscription/local_producer.ex b/lib/absinthe/subscription/local_producer.ex deleted file mode 100644 index 8a9c63bf..00000000 --- a/lib/absinthe/subscription/local_producer.ex +++ /dev/null @@ -1,40 +0,0 @@ -defmodule Absinthe.Subscription.LocalProducer do - @moduledoc """ - GenStage producer that listens for `publish_mutation` broadcasts - in order to process the subscriptions on this node. - """ - use GenStage - - def start_link([_pubsub, _shards, _buffer_size, name] = args) do - GenStage.start_link(__MODULE__, args, name: name) - end - - def topic(shard), do: "__absinthe__:proxy:#{shard}" - - def init([pubsub, shards, buffer_size, _name]) do - Enum.each(shards, fn shard -> - :ok = pubsub.subscribe(topic(shard)) - end) - - # default buffer_size - {:producer, %{pubsub: pubsub, node: pubsub.node_name()}, buffer_size: buffer_size} - end - - @doc """ - Callback for the consumer to ask for more - subscriptions to process. Since we will be sending - them immediately when we get a message from pubsub, - this just sends an empty list - """ - def handle_demand(_demand, state) do - {:noreply, [], state} - end - - def handle_info(payload, state) do - if payload.node == state.node do - {:noreply, [], state} - else - {:noreply, [{state.pubsub, payload.mutation_result, payload.subscribed_fields}], state} - end - end -end diff --git a/lib/absinthe/subscription/mutation_publish_listener.ex b/lib/absinthe/subscription/mutation_publish_listener.ex new file mode 100644 index 00000000..9c41f4ca --- /dev/null +++ b/lib/absinthe/subscription/mutation_publish_listener.ex @@ -0,0 +1,112 @@ +defmodule Absinthe.Subscription.MutationPublishListener do + @moduledoc """ + GenStage producer that listens for `publish_mutation` broadcasts + and buffers events to be processed by MutationPublishProcessor + """ + use GenStage + require Logger + + def start_link([_pubsub, max_queue_length, name] = args) do + GenStage.start_link(__MODULE__, args, name: name) + end + + def init([pubsub, max_queue_length, _name]) do + # publish_mutation callback implementation needs to be updated to use + # this topic + :ok = pubsub.subscribe("__absinthe__:subscription:mutation_published") + + {:producer, + %{ + pubsub: pubsub, + node: pubsub.node_name(), + queue: :queue.new(), + pending_demand: 0, + max_queue_length: max_queue_length + }} + end + + @doc """ + Callback for the consumer to ask for more + subscriptions to process. + """ + def handle_demand(demand, state) do + do_handle_demand(demand, state) + end + + def handle_info(%{node: payload_node}, %{node: current_node} = state) + when payload_node == current_node do + {:noreply, [], state} + end + + def handle_info( + %{mutation_result: mutation_result, subscribed_fields: subscribed_fields}, + state + ) do + queue = :queue.in({state.pubsub, mutation_result, subscribed_fields}, state.queue) + queue_length = :queue.len(queue) + + state = + if queue_length > state.max_queue_length do + Logger.warning( + "[Absinthe.Subscription.MutationPublishListener] Queue length (#{inspect(queue_length)}) exceeds max_queue_length (#{inspect(state.max_queue_length)}). Dropping oldest events until max_queue_length is reached" + ) + + queue = drop_oldest_events(queue, state.max_queue_length) + + Map.put(state, :queue, queue) + else + state + end + + do_handle_demand(0, state) + end + + def handle_info(_, state) do + {:noreply, [], state} + end + + defp do_handle_demand(demand, state) do + demand = demand + state.pending_demand + queue_length = :queue.len(state.queue) + + if queue_length < demand do + # if we don't have enough items to satisfy demand then + # send what we have, and save pending demand + events_to_send = :queue.to_list(state.queue) + + pending_demand = demand - length(events_to_send) + + state = + state + |> Map.put(:queue, :queue.new()) + |> Map.put(:pending_demand, pending_demand) + + {:no_reply, events_to_send, state} + else + # if we do have enough to satisfy demand, then send what's asked for + {events_to_send_queue, remaining_events_queue} = :queue.split(demand, state.queue) + events_to_send = :queue.to_list(events_to_send_queue) + + pending_demand = demand - length(events_to_send) + pending_demand = if pending_demand < 0, do: 0, else: pending_demand + + state = + state + |> Map.put(:queue, remaining_events_queue) + |> Map.put(:pending_demand, pending_demand) + + {:no_reply, events_to_send, state} + end + end + + # drop oldest events until we are under the max_queue_size + defp drop_oldest_events(queue, max_queue_length) do + if :queue.len(queue) > max_queue_length do + events_to_drop = :queue.len(queue) - max_queue_length + {_, new_queue} = :queue.split(events_to_drop, queue) + new_queue + else + queue + end + end +end diff --git a/lib/absinthe/subscription/mutation_publish_processor.ex b/lib/absinthe/subscription/mutation_publish_processor.ex new file mode 100644 index 00000000..3d5610ca --- /dev/null +++ b/lib/absinthe/subscription/mutation_publish_processor.ex @@ -0,0 +1,42 @@ +defmodule Absinthe.Subscription.MutationPublishProcessor do + @moduledoc """ + Processes the publish_mutation request on the + local node. + """ + + def start_link({pubsub, mutation_result, subscribed_fields}) do + Task.start_link(fn -> + id = :erlang.unique_integer() + system_time = System.system_time() + start_time_mono = System.monotonic_time() + + :telemetry.execute( + [:absinthe, :subscription, :publish_mutation, :start], + %{system_time: system_time}, + %{ + id: id, + telemetry_span_context: id, + mutation_result: mutation_result, + subscribed_fields: subscribed_fields + } + ) + + try do + Absinthe.Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields) + after + end_time_mono = System.monotonic_time() + + :telemetry.execute( + [:absinthe, :subscription, :publish_mutation, :stop], + %{duration: end_time_mono - start_time_mono, end_time_mono: end_time_mono}, + %{ + id: id, + telemetry_span_context: id, + mutation_result: mutation_result, + subscribed_fields: subscribed_fields + } + ) + end + end) + end +end diff --git a/lib/absinthe/subscription/mutation_publish_processor_supervisor.ex b/lib/absinthe/subscription/mutation_publish_processor_supervisor.ex new file mode 100644 index 00000000..cd53775c --- /dev/null +++ b/lib/absinthe/subscription/mutation_publish_processor_supervisor.ex @@ -0,0 +1,30 @@ +defmodule Absinthe.Subscription.MutationPublishProcessorSupervisor do + @moduledoc """ + Supervisor for consuming publish_mutation requests + """ + + use ConsumerSupervisor + + alias Absinthe.Subscription.MutationPublishProcessor + + def start_link(args) do + ConsumerSupervisor.start_link(__MODULE__, args) + end + + def init([max_demand, producer_name]) do + children = [ + %{ + id: MutationPublishProcessor, + start: {MutationPublishProcessor, :start_link, []}, + restart: :transient + } + ] + + opts = [ + strategy: :one_for_one, + subscribe_to: [{producer_name, max_demand: max_demand}] + ] + + ConsumerSupervisor.init(children, opts) + end +end diff --git a/lib/absinthe/subscription/mutation_publish_supervisor.ex b/lib/absinthe/subscription/mutation_publish_supervisor.ex new file mode 100644 index 00000000..374895b6 --- /dev/null +++ b/lib/absinthe/subscription/mutation_publish_supervisor.ex @@ -0,0 +1,22 @@ +defmodule Absinthe.Subscription.MutationPublishSupervisor do + @moduledoc false + + use Supervisor + + def start_link([pubsub, max_demand, max_queue_length]) do + Supervisor.start_link(__MODULE__, {pubsub, max_demand, max_queue_length}) + end + + def init({pubsub, max_demand, max_queue_length}) do + unique_producer_name = + :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" + + children = [ + {Absinthe.Subscription.MutationPublishListener, + [pubsub, max_queue_length, unique_producer_name]}, + {Absinthe.Subscription.LocalConsumerSupervisor, [max_demand, unique_producer_name]} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/absinthe/subscription/stage_supervisor.ex b/lib/absinthe/subscription/stage_supervisor.ex deleted file mode 100644 index 9ad17d4b..00000000 --- a/lib/absinthe/subscription/stage_supervisor.ex +++ /dev/null @@ -1,27 +0,0 @@ -defmodule Absinthe.Subscription.StageSupervisor do - @moduledoc false - - use Supervisor - - def start_link([pubsub, registry, pool_size]) do - Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size}) - end - - def init({pubsub, _registry, pool_size}) do - min_demand = 1 - max_demand = pool_size - shards = Enum.to_list(0..(pool_size - 1)) - buffer_size = 10_000 - - unique_producer_name = - :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" - - children = [ - {Absinthe.Subscription.LocalProducer, [pubsub, shards, buffer_size, unique_producer_name]}, - {Absinthe.Subscription.LocalConsumerSupervisor, - [min_demand, max_demand, unique_producer_name]} - ] - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index facd51bf..6486e9cd 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -21,23 +21,15 @@ defmodule Absinthe.Subscription.Supervisor do module end - pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) + max_demand = Keyword.get(opts, :max_demand, System.schedulers_online() * 2) + max_queue_length = Keyword.get(opts, :max_queue_length, 10_000) compress_registry? = Keyword.get(opts, :compress_registry?, true) - use_stage? = Keyword.get(opts, :use_stage?, false) - Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, use_stage?}) + Supervisor.start_link(__MODULE__, {pubsub, max_demand, max_queue_length, compress_registry?}) end - def init({pubsub, pool_size, compress_registry?, use_stage?}) do + def init({pubsub, max_demand, max_queue_length, compress_registry?}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size] - - supervisor = - if use_stage? do - Absinthe.Subscription.StageSupervisor - else - Absinthe.Subscription.ProxySupervisor - end children = [ {Registry, @@ -45,10 +37,10 @@ defmodule Absinthe.Subscription.Supervisor do keys: :duplicate, name: registry_name, partitions: System.schedulers_online(), - meta: meta, + meta: [], compressed: compress_registry? ]}, - {supervisor, [pubsub, registry_name, pool_size]} + {Absinthe.Subscription.MutationPublishSupervisor, [pubsub, max_demand, max_queue_length]} ] Supervisor.init(children, strategy: :one_for_one) From 4fce4d76be42079d0e9a1a6fb6fd108636af76c3 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 11:36:47 -0500 Subject: [PATCH 7/9] clean up --- .../subscription/mutation_publish_listener.ex | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/lib/absinthe/subscription/mutation_publish_listener.ex b/lib/absinthe/subscription/mutation_publish_listener.ex index 9c41f4ca..32fec62c 100644 --- a/lib/absinthe/subscription/mutation_publish_listener.ex +++ b/lib/absinthe/subscription/mutation_publish_listener.ex @@ -6,7 +6,7 @@ defmodule Absinthe.Subscription.MutationPublishListener do use GenStage require Logger - def start_link([_pubsub, max_queue_length, name] = args) do + def start_link([_pubsub, _max_queue_length, name] = args) do GenStage.start_link(__MODULE__, args, name: name) end @@ -43,20 +43,8 @@ defmodule Absinthe.Subscription.MutationPublishListener do state ) do queue = :queue.in({state.pubsub, mutation_result, subscribed_fields}, state.queue) - queue_length = :queue.len(queue) - - state = - if queue_length > state.max_queue_length do - Logger.warning( - "[Absinthe.Subscription.MutationPublishListener] Queue length (#{inspect(queue_length)}) exceeds max_queue_length (#{inspect(state.max_queue_length)}). Dropping oldest events until max_queue_length is reached" - ) - - queue = drop_oldest_events(queue, state.max_queue_length) - - Map.put(state, :queue, queue) - else - state - end + queue = drop_oldest_events(queue, state.max_queue_length) + state = Map.put(state, :queue, queue) do_handle_demand(0, state) end @@ -101,7 +89,13 @@ defmodule Absinthe.Subscription.MutationPublishListener do # drop oldest events until we are under the max_queue_size defp drop_oldest_events(queue, max_queue_length) do - if :queue.len(queue) > max_queue_length do + queue_length = :queue.len(queue) + + if queue_length > max_queue_length do + Logger.warning( + "[Absinthe.Subscription.MutationPublishListener] Queue length (#{inspect(queue_length)}) exceeds max_queue_length (#{inspect(max_queue_length)}). Dropping oldest events until max_queue_length is reached" + ) + events_to_drop = :queue.len(queue) - max_queue_length {_, new_queue} = :queue.split(events_to_drop, queue) new_queue From ee5bcea1f368b19476d3528250fd44d3404faed3 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 11:56:28 -0500 Subject: [PATCH 8/9] Use updated name for ConsumerSupervisor --- lib/absinthe/subscription/mutation_publish_supervisor.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/absinthe/subscription/mutation_publish_supervisor.ex b/lib/absinthe/subscription/mutation_publish_supervisor.ex index 374895b6..fd32c875 100644 --- a/lib/absinthe/subscription/mutation_publish_supervisor.ex +++ b/lib/absinthe/subscription/mutation_publish_supervisor.ex @@ -14,7 +14,8 @@ defmodule Absinthe.Subscription.MutationPublishSupervisor do children = [ {Absinthe.Subscription.MutationPublishListener, [pubsub, max_queue_length, unique_producer_name]}, - {Absinthe.Subscription.LocalConsumerSupervisor, [max_demand, unique_producer_name]} + {Absinthe.Subscription.MutationPublishProcessorSupervisor, + [max_demand, unique_producer_name]} ] Supervisor.init(children, strategy: :one_for_one) From 402712a859a70f971e63389928157ce85be2dd64 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 12:11:45 -0500 Subject: [PATCH 9/9] Fix bad value from publish listener --- lib/absinthe/subscription.ex | 16 ++++++---------- .../subscription/mutation_publish_listener.ex | 4 ++-- .../subscription/mutation_publish_supervisor.ex | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 6eed4613..ce69beb3 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -214,16 +214,12 @@ defmodule Absinthe.Subscription do @doc false def publish_remote(pubsub, mutation_result, subscribed_fields) do - {:ok, pool_size} = - pubsub - |> registry_name - |> Registry.meta(:pool_size) - - shard = :erlang.phash2(mutation_result, pool_size) - - proxy_topic = Subscription.Proxy.topic(shard) - - :ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields) + :ok = + pubsub.publish_mutation( + "__absinthe__:subscription:mutation_published", + mutation_result, + subscribed_fields + ) end ## Middleware callback diff --git a/lib/absinthe/subscription/mutation_publish_listener.ex b/lib/absinthe/subscription/mutation_publish_listener.ex index 32fec62c..e8cd924b 100644 --- a/lib/absinthe/subscription/mutation_publish_listener.ex +++ b/lib/absinthe/subscription/mutation_publish_listener.ex @@ -69,7 +69,7 @@ defmodule Absinthe.Subscription.MutationPublishListener do |> Map.put(:queue, :queue.new()) |> Map.put(:pending_demand, pending_demand) - {:no_reply, events_to_send, state} + {:noreply, events_to_send, state} else # if we do have enough to satisfy demand, then send what's asked for {events_to_send_queue, remaining_events_queue} = :queue.split(demand, state.queue) @@ -83,7 +83,7 @@ defmodule Absinthe.Subscription.MutationPublishListener do |> Map.put(:queue, remaining_events_queue) |> Map.put(:pending_demand, pending_demand) - {:no_reply, events_to_send, state} + {:noreply, events_to_send, state} end end diff --git a/lib/absinthe/subscription/mutation_publish_supervisor.ex b/lib/absinthe/subscription/mutation_publish_supervisor.ex index fd32c875..08fbe2b4 100644 --- a/lib/absinthe/subscription/mutation_publish_supervisor.ex +++ b/lib/absinthe/subscription/mutation_publish_supervisor.ex @@ -9,7 +9,7 @@ defmodule Absinthe.Subscription.MutationPublishSupervisor do def init({pubsub, max_demand, max_queue_length}) do unique_producer_name = - :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" + :"#{Absinthe.Subscription.MutationPublishListener}.#{:erlang.unique_integer([:monotonic])}" children = [ {Absinthe.Subscription.MutationPublishListener,