From 5d7617720eaab9af0edfae0d11544a73e25640fd Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 14:35:11 -0500 Subject: [PATCH 01/15] [GRAPH-1102] Customizable doc storage with default implementation --- lib/absinthe/subscription.ex | 72 +++++++------------ .../subscription/default_document_storage.ex | 70 ++++++++++++++++++ lib/absinthe/subscription/document_storage.ex | 41 +++++++++++ lib/absinthe/subscription/supervisor.ex | 25 +++++-- 4 files changed, 155 insertions(+), 53 deletions(-) create mode 100644 lib/absinthe/subscription/default_document_storage.ex create mode 100644 lib/absinthe/subscription/document_storage.ex diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index fb3c8928..105f78af 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -143,66 +143,32 @@ defmodule Absinthe.Subscription do def subscribe(pubsub, field_keys, doc_id, doc) do field_keys = List.wrap(field_keys) - registry = pubsub |> registry_name - doc_value = %{ initial_phases: PipelineSerializer.pack(doc.initial_phases), source: doc.source } - pdict_add_fields(doc_id, field_keys) - - for field_key <- field_keys do - {:ok, _} = Registry.register(registry, field_key, doc_id) - end - - {:ok, _} = Registry.register(registry, doc_id, doc_value) - end - - defp pdict_fields(doc_id) do - Process.get({__MODULE__, doc_id}, []) - end - - defp pdict_add_fields(doc_id, field_keys) do - Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id)) - end - - defp pdict_delete_fields(doc_id) do - Process.delete({__MODULE__, doc_id}) + storage_implementation = storage_implementation(pubsub) + storage_implementation.subscribe(pubsub, doc_id, doc_value, field_keys) end @doc false def unsubscribe(pubsub, doc_id) do - registry = pubsub |> registry_name - - for field_key <- pdict_fields(doc_id) do - Registry.unregister(registry, field_key) - end - - Registry.unregister(registry, doc_id) - - pdict_delete_fields(doc_id) - :ok + storage_implementation = storage_implementation(pubsub) + storage_implementation.unsubscribe(pubsub, doc_id) end @doc false def get(pubsub, key) do - name = registry_name(pubsub) - - name - |> Registry.lookup(key) - |> MapSet.new(fn {_pid, doc_id} -> doc_id end) - |> Enum.reduce(%{}, fn doc_id, acc -> - case Registry.lookup(name, doc_id) do - [] -> - acc - - [{_pid, doc} | _rest] -> - Map.put_new_lazy(acc, doc_id, fn -> - Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1) - end) - end + storage_implementation = storage_implementation(pubsub) + + pubsub + |> storage_implementation.get_docs_by_field_key(key) + |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> + initial_phases = PipelineSerializer.unpack(initial_phases) + {doc_id, Map.put(doc, :initial_phases, initial_phases)} end) + |> Map.new() end @doc false @@ -210,6 +176,20 @@ defmodule Absinthe.Subscription do Module.concat([pubsub, :Registry]) end + @doc false + def storage_name(pubsub) do + Module.concat([pubsub, :Storage]) + end + + def storage_implementation(pubsub) do + {:ok, storage_implementation} = + pubsub + |> registry_name + |> Registry.meta(:storage_implementation) + + storage_implementation + end + @doc false def publish_remote(pubsub, mutation_result, subscribed_fields) do {:ok, pool_size} = diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex new file mode 100644 index 00000000..ae64cb16 --- /dev/null +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -0,0 +1,70 @@ +defmodule Absinthe.Subscription.DefaultDocumentStorage do + @behaviour Absinthe.Subscription.DocumentStorage + + @moduledoc """ + Default document storage for Absinthe. Stores subscription + documents and field keys in a Registry process + """ + + @impl Absinthe.Subscription.DocumentStorage + def child_spec(opts) do + {Registry, opts} + end + + @impl Absinthe.Subscription.DocumentStorage + def subscribe(pubsub, doc_id, doc_value, field_keys) do + storage = Absinthe.Subscription.storage_name(pubsub) + + pdict_add_fields(doc_id, field_keys) + + for field_key <- field_keys do + {:ok, _} = Registry.register(storage, field_key, doc_id) + end + + {:ok, _} = Registry.register(storage, doc_id, doc_value) + end + + @impl Absinthe.Subscription.DocumentStorage + def unsubscribe(pubsub, doc_id) do + storage = Absinthe.Subscription.storage_name(pubsub) + + for field_key <- pdict_fields(doc_id) do + Registry.unregister(storage, field_key) + end + + Registry.unregister(storage, doc_id) + + pdict_delete_fields(doc_id) + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def get_docs_by_field_key(pubsub, field_key) do + storage = Absinthe.Subscription.storage_name(pubsub) + + storage + |> Registry.lookup(field_key) + |> MapSet.new(fn {_pid, doc_id} -> doc_id end) + |> Enum.reduce(%{}, fn doc_id, acc -> + case Registry.lookup(storage, doc_id) do + [] -> + acc + + [{_pid, doc} | _rest] -> + Map.put_new(acc, doc_id, doc) + end + end) + end + + defp pdict_fields(doc_id) do + Process.get({__MODULE__, doc_id}, []) + end + + defp pdict_add_fields(doc_id, field_keys) do + Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id)) + end + + defp pdict_delete_fields(doc_id) do + Process.delete({__MODULE__, doc_id}) + end +end diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex new file mode 100644 index 00000000..4dcb1234 --- /dev/null +++ b/lib/absinthe/subscription/document_storage.ex @@ -0,0 +1,41 @@ +defmodule Absinthe.Subscription.DocumentStorage do + @moduledoc """ + Behaviour for storing subscription documents. Used to tell + Absinthe how to store documents and the field keys subcribed to those + documents. + """ + alias Module.Behaviour + + @doc """ + Child spec to determine how to start the + Document storage process + """ + @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() + + @doc """ + Adds `doc` to storage with `doc_id` as the key if not already in storage. Also + associates each `{field, key}` pair in `field_keys` to the `doc_id` + """ + @callback subscribe( + pubsub :: atom, + doc_id :: term, + doc :: %{ + initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), + source: binary() + }, + field_keys :: [{field :: term, key :: term}] + ) :: + {:ok, term} | {:error, :reason} + + @doc """ + Removes the document and field_keys associated with `doc_id` from + storage + """ + @callback unsubscribe(pubsub :: atom, doc_id :: term) :: :ok + + @doc """ + Get all docs associated with the field_key + """ + @callback get_docs_by_field_key(pubsub :: atom, field_key :: {field :: term, key :: term}) :: + map() +end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 7b63ae4a..fc206d37 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -24,22 +24,33 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) - Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?}) + storage_implementation = + Keyword.get(opts, :storage_implementation, Absinthe.Subscription.DefaultDocumentStorage) + + Supervisor.start_link( + __MODULE__, + {pubsub, pool_size, compress_registry?, storage_implementation} + ) end - def init({pubsub, pool_size, compress_registry?}) do + def init({pubsub, pool_size, compress_registry?, storage_implementation}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size] + storage_name = Absinthe.Subscription.storage_name(pubsub) + meta = [pool_size: pool_size, storage_implementation: storage_implementation] children = [ {Registry, [ - keys: :duplicate, + keys: :unique, name: registry_name, - partitions: System.schedulers_online(), - meta: meta, - compressed: compress_registry? + meta: meta ]}, + storage_implementation.child_spec( + keys: :duplicate, + name: storage_name, + partitions: System.schedulers_online(), + compressed: compress_registry? + ), {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} ] From 7cc430cafe4359f9e289108c187a480b106f1aa2 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 14:47:26 -0500 Subject: [PATCH 02/15] Fix dialyzer error --- lib/absinthe/subscription/default_document_storage.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index ae64cb16..f6053415 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -3,12 +3,12 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do @moduledoc """ Default document storage for Absinthe. Stores subscription - documents and field keys in a Registry process + documents and field keys in a Registry process. """ @impl Absinthe.Subscription.DocumentStorage def child_spec(opts) do - {Registry, opts} + Registry.child_spec(opts) end @impl Absinthe.Subscription.DocumentStorage From e2f05c4bbb11f72bffc8f0b285f5f1e23551760c Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 15:08:25 -0500 Subject: [PATCH 03/15] Some refactoring --- lib/absinthe/subscription/supervisor.ex | 28 +++++++++++++++++-------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index fc206d37..71e1f7f0 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -27,17 +27,32 @@ defmodule Absinthe.Subscription.Supervisor do storage_implementation = Keyword.get(opts, :storage_implementation, Absinthe.Subscription.DefaultDocumentStorage) + storage_opts = + case storage_implementation do + Absinthe.Subscription.DefaultDocumentStorage -> + [ + keys: :duplicate, + partitions: System.schedulers_online(), + compressed: compress_registry? + ] + + _ -> + opts + |> Keyword.get(opts, :storage_opts, Keyword.new()) + end + Supervisor.start_link( __MODULE__, - {pubsub, pool_size, compress_registry?, storage_implementation} + {pubsub, pool_size, storage_implementation, storage_opts} ) end - def init({pubsub, pool_size, compress_registry?, storage_implementation}) do + def init({pubsub, pool_size, storage_implementation, storage_opts}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - storage_name = Absinthe.Subscription.storage_name(pubsub) meta = [pool_size: pool_size, storage_implementation: storage_implementation] + storage_opts = Keyword.put(storage_opts, :name, Absinthe.Subscription.storage_name(pubsub)) + children = [ {Registry, [ @@ -45,12 +60,7 @@ defmodule Absinthe.Subscription.Supervisor do name: registry_name, meta: meta ]}, - storage_implementation.child_spec( - keys: :duplicate, - name: storage_name, - partitions: System.schedulers_online(), - compressed: compress_registry? - ), + storage_implementation.child_spec(storage_opts), {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} ] From 44ea71860715d337e61eed1aa4c1f51d1ecf7157 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 15:24:37 -0500 Subject: [PATCH 04/15] Fix code error caught by dialzyer --- lib/absinthe/subscription/supervisor.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 71e1f7f0..6aa2ea52 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -37,8 +37,7 @@ defmodule Absinthe.Subscription.Supervisor do ] _ -> - opts - |> Keyword.get(opts, :storage_opts, Keyword.new()) + Keyword.get(opts, :storage_opts, Keyword.new()) end Supervisor.start_link( From f4db5d9762069d29a533f942d2dc090afcce0bde Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 15:30:37 -0500 Subject: [PATCH 05/15] telemetry --- lib/absinthe/subscription.ex | 70 ++++++++++++++++--- lib/absinthe/subscription/document_storage.ex | 1 - 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 105f78af..3a31c810 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -149,26 +149,78 @@ defmodule Absinthe.Subscription do } storage_implementation = storage_implementation(pubsub) - storage_implementation.subscribe(pubsub, doc_id, doc_value, field_keys) + + :telemetry.span( + [:absinthe, :subscription, :storage, :subscribe], + %{ + doc_id: doc_id, + doc: doc, + field_keys: field_keys, + storage_implementation: storage_implementation + }, + fn -> + result = storage_implementation.subscribe(pubsub, doc_id, doc_value, field_keys) + + {result, + %{ + doc_id: doc_id, + doc: doc, + field_keys: field_keys, + storage_implementation: storage_implementation + }} + end + ) end @doc false def unsubscribe(pubsub, doc_id) do storage_implementation = storage_implementation(pubsub) - storage_implementation.unsubscribe(pubsub, doc_id) + + :telemetry.span( + [:absinthe, :subscription, :storage, :unsubscribe], + %{ + doc_id: doc_id, + storage_implementation: storage_implementation + }, + fn -> + result = storage_implementation.unsubscribe(pubsub, doc_id) + + {result, + %{ + doc_id: doc_id, + storage_implementation: storage_implementation + }} + end + ) end @doc false def get(pubsub, key) do storage_implementation = storage_implementation(pubsub) - pubsub - |> storage_implementation.get_docs_by_field_key(key) - |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> - initial_phases = PipelineSerializer.unpack(initial_phases) - {doc_id, Map.put(doc, :initial_phases, initial_phases)} - end) - |> Map.new() + :telemetry.span( + [:absinthe, :subscription, :storage, :get], + %{ + key: key, + storage_implementation: storage_implementation + }, + fn -> + result = + pubsub + |> storage_implementation.get_docs_by_field_key(key) + |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> + initial_phases = PipelineSerializer.unpack(initial_phases) + {doc_id, Map.put(doc, :initial_phases, initial_phases)} + end) + |> Map.new() + + {result, + %{ + key: key, + storage_implementation: storage_implementation + }} + end + ) end @doc false diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index 4dcb1234..f565816c 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -4,7 +4,6 @@ defmodule Absinthe.Subscription.DocumentStorage do Absinthe how to store documents and the field keys subcribed to those documents. """ - alias Module.Behaviour @doc """ Child spec to determine how to start the From bec0e4a8778867636aa64bbf2e3a3353a2d267c9 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 17:26:35 -0500 Subject: [PATCH 06/15] split up operations --- lib/absinthe/subscription.ex | 32 +++++++++++-------- .../subscription/default_document_storage.ex | 30 +++++++++-------- lib/absinthe/subscription/document_storage.ex | 32 ++++++++++++++----- 3 files changed, 59 insertions(+), 35 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 3a31c810..608ce11c 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -148,7 +148,7 @@ defmodule Absinthe.Subscription do source: doc.source } - storage_implementation = storage_implementation(pubsub) + storage_module = storage_implementation(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :subscribe], @@ -156,17 +156,19 @@ defmodule Absinthe.Subscription do doc_id: doc_id, doc: doc, field_keys: field_keys, - storage_implementation: storage_implementation + storage_module: storage_module }, fn -> - result = storage_implementation.subscribe(pubsub, doc_id, doc_value, field_keys) + storage_process_name = storage_name(pubsub) + storage_module.put(storage_process_name, doc_id, doc_value) + result = storage_module.subscribe(storage_process_name, doc_id, field_keys) {result, %{ doc_id: doc_id, doc: doc, field_keys: field_keys, - storage_implementation: storage_implementation + storage_module: storage_module }} end ) @@ -174,21 +176,23 @@ defmodule Absinthe.Subscription do @doc false def unsubscribe(pubsub, doc_id) do - storage_implementation = storage_implementation(pubsub) + storage_module = storage_implementation(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :unsubscribe], %{ doc_id: doc_id, - storage_implementation: storage_implementation + storage_module: storage_module }, fn -> - result = storage_implementation.unsubscribe(pubsub, doc_id) + storage_process_name = storage_name(pubsub) + storage_module.unsubscribe(storage_process_name, doc_id) + result = storage_module.delete(storage_process_name, doc_id) {result, %{ doc_id: doc_id, - storage_implementation: storage_implementation + storage_module: storage_module }} end ) @@ -196,18 +200,20 @@ defmodule Absinthe.Subscription do @doc false def get(pubsub, key) do - storage_implementation = storage_implementation(pubsub) + storage_module = storage_implementation(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :get], %{ key: key, - storage_implementation: storage_implementation + storage_module: storage_module }, fn -> + storage_process_name = storage_name(pubsub) + result = - pubsub - |> storage_implementation.get_docs_by_field_key(key) + storage_process_name + |> storage_module.get_docs_by_field_key(key) |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> initial_phases = PipelineSerializer.unpack(initial_phases) {doc_id, Map.put(doc, :initial_phases, initial_phases)} @@ -217,7 +223,7 @@ defmodule Absinthe.Subscription do {result, %{ key: key, - storage_implementation: storage_implementation + storage_module: storage_module }} end ) diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index f6053415..feb17460 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -12,41 +12,43 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do end @impl Absinthe.Subscription.DocumentStorage - def subscribe(pubsub, doc_id, doc_value, field_keys) do - storage = Absinthe.Subscription.storage_name(pubsub) + def put(storage_process_name, doc_id, doc_value) do + {:ok, _} = Registry.register(storage_process_name, doc_id, doc_value) + end + @impl Absinthe.Subscription.DocumentStorage + def subscribe(storage_process_name, doc_id, field_keys) do pdict_add_fields(doc_id, field_keys) for field_key <- field_keys do - {:ok, _} = Registry.register(storage, field_key, doc_id) + {:ok, _} = Registry.register(storage_process_name, field_key, doc_id) end - {:ok, _} = Registry.register(storage, doc_id, doc_value) + {:ok, field_keys} end @impl Absinthe.Subscription.DocumentStorage - def unsubscribe(pubsub, doc_id) do - storage = Absinthe.Subscription.storage_name(pubsub) + def delete(storage_process_name, doc_id) do + Registry.unregister(storage_process_name, doc_id) + end + @impl Absinthe.Subscription.DocumentStorage + def unsubscribe(storage_process_name, doc_id) do for field_key <- pdict_fields(doc_id) do - Registry.unregister(storage, field_key) + Registry.unregister(storage_process_name, field_key) end - Registry.unregister(storage, doc_id) - pdict_delete_fields(doc_id) :ok end @impl Absinthe.Subscription.DocumentStorage - def get_docs_by_field_key(pubsub, field_key) do - storage = Absinthe.Subscription.storage_name(pubsub) - - storage + def get_docs_by_field_key(storage_process_name, field_key) do + storage_process_name |> Registry.lookup(field_key) |> MapSet.new(fn {_pid, doc_id} -> doc_id end) |> Enum.reduce(%{}, fn doc_id, acc -> - case Registry.lookup(storage, doc_id) do + case Registry.lookup(storage_process_name, doc_id) do [] -> acc diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index f565816c..5384b0ee 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -12,29 +12,45 @@ defmodule Absinthe.Subscription.DocumentStorage do @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() @doc """ - Adds `doc` to storage with `doc_id` as the key if not already in storage. Also - associates each `{field, key}` pair in `field_keys` to the `doc_id` + Adds `doc` to storage with `doc_id` as the key if not already in storage. """ - @callback subscribe( - pubsub :: atom, + @callback put( + storage_process_name :: atom, doc_id :: term, doc :: %{ initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), source: binary() - }, + } + ) :: + {:ok, term} | {:error, :reason} + + @doc """ + Associates each `{field, key}` pair in `field_keys` to the `doc_id` + """ + @callback subscribe( + storage_process_name :: atom, + doc_id :: term, field_keys :: [{field :: term, key :: term}] ) :: {:ok, term} | {:error, :reason} @doc """ - Removes the document and field_keys associated with `doc_id` from + Removes the document + """ + @callback delete(storage_process_name :: atom, doc_id :: term) :: :ok + + @doc """ + Removes the field_keys associated with `doc_id` from storage """ - @callback unsubscribe(pubsub :: atom, doc_id :: term) :: :ok + @callback unsubscribe(storage_process_name :: atom, doc_id :: term) :: :ok @doc """ Get all docs associated with the field_key """ - @callback get_docs_by_field_key(pubsub :: atom, field_key :: {field :: term, key :: term}) :: + @callback get_docs_by_field_key( + storage_process_name :: atom, + field_key :: {field :: term, key :: term} + ) :: map() end From bfc4657bdfac04a1614dccda471a041a0b7a6450 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 17:36:11 -0500 Subject: [PATCH 07/15] change telemetry id --- lib/absinthe/subscription.ex | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 608ce11c..8d259b6e 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -151,10 +151,9 @@ defmodule Absinthe.Subscription do storage_module = storage_implementation(pubsub) :telemetry.span( - [:absinthe, :subscription, :storage, :subscribe], + [:absinthe, :subscription, :document, :subscribe], %{ doc_id: doc_id, - doc: doc, field_keys: field_keys, storage_module: storage_module }, @@ -166,7 +165,6 @@ defmodule Absinthe.Subscription do {result, %{ doc_id: doc_id, - doc: doc, field_keys: field_keys, storage_module: storage_module }} @@ -179,7 +177,7 @@ defmodule Absinthe.Subscription do storage_module = storage_implementation(pubsub) :telemetry.span( - [:absinthe, :subscription, :storage, :unsubscribe], + [:absinthe, :subscription, :document, :unsubscribe], %{ doc_id: doc_id, storage_module: storage_module @@ -203,9 +201,9 @@ defmodule Absinthe.Subscription do storage_module = storage_implementation(pubsub) :telemetry.span( - [:absinthe, :subscription, :storage, :get], + [:absinthe, :subscription, :document, :get], %{ - key: key, + field_key: key, storage_module: storage_module }, fn -> @@ -222,7 +220,7 @@ defmodule Absinthe.Subscription do {result, %{ - key: key, + field_key: key, storage_module: storage_module }} end From 583c46f57412d826576baf6699cf8d1c8cbbe45a Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 21 Jun 2024 11:55:11 -0500 Subject: [PATCH 08/15] Refactoring --- lib/absinthe/subscription.ex | 99 +++++-------------- lib/absinthe/subscription/document_storage.ex | 17 ++-- lib/absinthe/subscription/supervisor.ex | 17 ++-- 3 files changed, 43 insertions(+), 90 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 8d259b6e..13a4b6e4 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -148,83 +148,34 @@ defmodule Absinthe.Subscription do source: doc.source } - storage_module = storage_implementation(pubsub) - - :telemetry.span( - [:absinthe, :subscription, :document, :subscribe], - %{ - doc_id: doc_id, - field_keys: field_keys, - storage_module: storage_module - }, - fn -> - storage_process_name = storage_name(pubsub) - storage_module.put(storage_process_name, doc_id, doc_value) - result = storage_module.subscribe(storage_process_name, doc_id, field_keys) - - {result, - %{ - doc_id: doc_id, - field_keys: field_keys, - storage_module: storage_module - }} - end - ) + storage_module = document_storage(pubsub) + storage_process_name = document_storage_name(pubsub) + + storage_module.put(storage_process_name, doc_id, doc_value) + storage_module.subscribe(storage_process_name, doc_id, field_keys) end @doc false def unsubscribe(pubsub, doc_id) do - storage_module = storage_implementation(pubsub) - - :telemetry.span( - [:absinthe, :subscription, :document, :unsubscribe], - %{ - doc_id: doc_id, - storage_module: storage_module - }, - fn -> - storage_process_name = storage_name(pubsub) - storage_module.unsubscribe(storage_process_name, doc_id) - result = storage_module.delete(storage_process_name, doc_id) - - {result, - %{ - doc_id: doc_id, - storage_module: storage_module - }} - end - ) + storage_module = document_storage(pubsub) + storage_process_name = document_storage_name(pubsub) + + storage_module.unsubscribe(storage_process_name, doc_id) + storage_module.delete(storage_process_name, doc_id) end @doc false def get(pubsub, key) do - storage_module = storage_implementation(pubsub) - - :telemetry.span( - [:absinthe, :subscription, :document, :get], - %{ - field_key: key, - storage_module: storage_module - }, - fn -> - storage_process_name = storage_name(pubsub) - - result = - storage_process_name - |> storage_module.get_docs_by_field_key(key) - |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> - initial_phases = PipelineSerializer.unpack(initial_phases) - {doc_id, Map.put(doc, :initial_phases, initial_phases)} - end) - |> Map.new() - - {result, - %{ - field_key: key, - storage_module: storage_module - }} - end - ) + storage_module = document_storage(pubsub) + storage_process_name = document_storage_name(pubsub) + + storage_process_name + |> storage_module.get_docs_by_field_key(key) + |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> + initial_phases = PipelineSerializer.unpack(initial_phases) + {doc_id, Map.put(doc, :initial_phases, initial_phases)} + end) + |> Map.new() end @doc false @@ -233,17 +184,17 @@ defmodule Absinthe.Subscription do end @doc false - def storage_name(pubsub) do + def document_storage_name(pubsub) do Module.concat([pubsub, :Storage]) end - def storage_implementation(pubsub) do - {:ok, storage_implementation} = + def document_storage(pubsub) do + {:ok, document_storage} = pubsub |> registry_name - |> Registry.meta(:storage_implementation) + |> Registry.meta(:document_storage) - storage_implementation + document_storage end @doc false diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index 5384b0ee..a0c921bd 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -1,18 +1,20 @@ defmodule Absinthe.Subscription.DocumentStorage do @moduledoc """ Behaviour for storing subscription documents. Used to tell - Absinthe how to store documents and the field keys subcribed to those + Absinthe how to store documents and the field keys associated with those documents. """ @doc """ Child spec to determine how to start the - Document storage process + Document storage process. This will be supervised. Absinthe will give + the process a name and that name will be passed in the other callbacks + in order to reference it there. """ @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() @doc """ - Adds `doc` to storage with `doc_id` as the key if not already in storage. + Adds `doc` to storage with `doc_id` as the key. """ @callback put( storage_process_name :: atom, @@ -25,7 +27,7 @@ defmodule Absinthe.Subscription.DocumentStorage do {:ok, term} | {:error, :reason} @doc """ - Associates each `{field, key}` pair in `field_keys` to the `doc_id` + Associates each `{field, key}` pair in `field_keys` to `doc_id`. """ @callback subscribe( storage_process_name :: atom, @@ -35,18 +37,17 @@ defmodule Absinthe.Subscription.DocumentStorage do {:ok, term} | {:error, :reason} @doc """ - Removes the document + Removes the document. """ @callback delete(storage_process_name :: atom, doc_id :: term) :: :ok @doc """ - Removes the field_keys associated with `doc_id` from - storage + Removes the field_keys associated with `doc_id`. """ @callback unsubscribe(storage_process_name :: atom, doc_id :: term) :: :ok @doc """ - Get all docs associated with the field_key + Get all docs associated with `field_key` """ @callback get_docs_by_field_key( storage_process_name :: atom, diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 6aa2ea52..1d85cd7a 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -24,11 +24,11 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) - storage_implementation = - Keyword.get(opts, :storage_implementation, Absinthe.Subscription.DefaultDocumentStorage) + document_storage = + Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage) storage_opts = - case storage_implementation do + case document_storage do Absinthe.Subscription.DefaultDocumentStorage -> [ keys: :duplicate, @@ -42,15 +42,16 @@ defmodule Absinthe.Subscription.Supervisor do Supervisor.start_link( __MODULE__, - {pubsub, pool_size, storage_implementation, storage_opts} + {pubsub, pool_size, document_storage, storage_opts} ) end - def init({pubsub, pool_size, storage_implementation, storage_opts}) do + def init({pubsub, pool_size, document_storage, storage_opts}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size, storage_implementation: storage_implementation] + meta = [pool_size: pool_size, document_storage: document_storage] - storage_opts = Keyword.put(storage_opts, :name, Absinthe.Subscription.storage_name(pubsub)) + storage_opts = + Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub)) children = [ {Registry, @@ -59,7 +60,7 @@ defmodule Absinthe.Subscription.Supervisor do name: registry_name, meta: meta ]}, - storage_implementation.child_spec(storage_opts), + document_storage.child_spec(storage_opts), {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} ] From 6e1255fe175a51c9d66f199bcb8adec7d4463704 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 21 Jun 2024 12:03:51 -0500 Subject: [PATCH 09/15] Refactoring --- lib/absinthe/subscription.ex | 6 +----- .../subscription/default_document_storage.ex | 17 +++++------------ lib/absinthe/subscription/document_storage.ex | 18 ++---------------- 3 files changed, 8 insertions(+), 33 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 13a4b6e4..61a90e09 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -150,17 +150,13 @@ defmodule Absinthe.Subscription do storage_module = document_storage(pubsub) storage_process_name = document_storage_name(pubsub) - - storage_module.put(storage_process_name, doc_id, doc_value) - storage_module.subscribe(storage_process_name, doc_id, field_keys) + storage_module.put(storage_process_name, doc_id, doc_value, field_keys) end @doc false def unsubscribe(pubsub, doc_id) do storage_module = document_storage(pubsub) storage_process_name = document_storage_name(pubsub) - - storage_module.unsubscribe(storage_process_name, doc_id) storage_module.delete(storage_process_name, doc_id) end diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index feb17460..b36e3fe4 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -12,33 +12,26 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do end @impl Absinthe.Subscription.DocumentStorage - def put(storage_process_name, doc_id, doc_value) do - {:ok, _} = Registry.register(storage_process_name, doc_id, doc_value) - end - - @impl Absinthe.Subscription.DocumentStorage - def subscribe(storage_process_name, doc_id, field_keys) do + def put(storage_process_name, doc_id, doc_value, field_keys) do pdict_add_fields(doc_id, field_keys) for field_key <- field_keys do {:ok, _} = Registry.register(storage_process_name, field_key, doc_id) end - {:ok, field_keys} + {:ok, _} = Registry.register(storage_process_name, doc_id, doc_value) end @impl Absinthe.Subscription.DocumentStorage def delete(storage_process_name, doc_id) do - Registry.unregister(storage_process_name, doc_id) - end - - @impl Absinthe.Subscription.DocumentStorage - def unsubscribe(storage_process_name, doc_id) do for field_key <- pdict_fields(doc_id) do Registry.unregister(storage_process_name, field_key) end pdict_delete_fields(doc_id) + + Registry.unregister(storage_process_name, doc_id) + :ok end diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index a0c921bd..f0e738df 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -22,30 +22,16 @@ defmodule Absinthe.Subscription.DocumentStorage do doc :: %{ initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), source: binary() - } - ) :: - {:ok, term} | {:error, :reason} - - @doc """ - Associates each `{field, key}` pair in `field_keys` to `doc_id`. - """ - @callback subscribe( - storage_process_name :: atom, - doc_id :: term, + }, field_keys :: [{field :: term, key :: term}] ) :: {:ok, term} | {:error, :reason} @doc """ - Removes the document. + Removes the document. Along with any field_keys associated with it """ @callback delete(storage_process_name :: atom, doc_id :: term) :: :ok - @doc """ - Removes the field_keys associated with `doc_id`. - """ - @callback unsubscribe(storage_process_name :: atom, doc_id :: term) :: :ok - @doc """ Get all docs associated with `field_key` """ From 10a17ae52a28ec6882edf8b063448faadbe02d82 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 21 Jun 2024 12:04:57 -0500 Subject: [PATCH 10/15] Update documentation --- lib/absinthe/subscription/document_storage.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index f0e738df..e1b7cd46 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -14,7 +14,8 @@ defmodule Absinthe.Subscription.DocumentStorage do @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() @doc """ - Adds `doc` to storage with `doc_id` as the key. + Adds `doc` to storage with `doc_id` as the key. Associates the given + `field_keys` with `doc_id`. """ @callback put( storage_process_name :: atom, From 6f8dbdd1d80e5df0d51332fa6519ccdf36cc133a Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Mon, 24 Jun 2024 11:49:26 -0500 Subject: [PATCH 11/15] Add wrapper functions in DocumentStorage to add telemetry --- lib/absinthe/subscription.ex | 28 +----- lib/absinthe/subscription/document_storage.ex | 93 +++++++++++++++++++ 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 61a90e09..c1ec45e6 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -30,7 +30,7 @@ defmodule Absinthe.Subscription do alias __MODULE__ - alias Absinthe.Subscription.PipelineSerializer + alias Absinthe.Subscription.DocumentStorage @doc """ Add Absinthe.Subscription to your process tree. @@ -141,37 +141,17 @@ defmodule Absinthe.Subscription do @doc false def subscribe(pubsub, field_keys, doc_id, doc) do - field_keys = List.wrap(field_keys) - - doc_value = %{ - initial_phases: PipelineSerializer.pack(doc.initial_phases), - source: doc.source - } - - storage_module = document_storage(pubsub) - storage_process_name = document_storage_name(pubsub) - storage_module.put(storage_process_name, doc_id, doc_value, field_keys) + DocumentStorage.put(pubsub, doc_id, doc, field_keys) end @doc false def unsubscribe(pubsub, doc_id) do - storage_module = document_storage(pubsub) - storage_process_name = document_storage_name(pubsub) - storage_module.delete(storage_process_name, doc_id) + DocumentStorage.delete(pubsub, doc_id) end @doc false def get(pubsub, key) do - storage_module = document_storage(pubsub) - storage_process_name = document_storage_name(pubsub) - - storage_process_name - |> storage_module.get_docs_by_field_key(key) - |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> - initial_phases = PipelineSerializer.unpack(initial_phases) - {doc_id, Map.put(doc, :initial_phases, initial_phases)} - end) - |> Map.new() + DocumentStorage.get_docs_by_field_key(pubsub, key) end @doc false diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index e1b7cd46..da71812e 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -5,6 +5,9 @@ defmodule Absinthe.Subscription.DocumentStorage do documents. """ + alias Absinthe.Subscription + alias Absinthe.Subscription.PipelineSerializer + @doc """ Child spec to determine how to start the Document storage process. This will be supervised. Absinthe will give @@ -41,4 +44,94 @@ defmodule Absinthe.Subscription.DocumentStorage do field_key :: {field :: term, key :: term} ) :: map() + + @doc false + def put(pubsub, doc_id, doc, field_keys) do + {storage_module, storage_process_name} = storage_info(pubsub) + + :telemetry.span( + [:absinthe, :subscription, :storage, :put], + %{ + doc_id: doc_id, + doc: doc, + field_keys: field_keys, + storage_module: storage_module + }, + fn -> + field_keys = List.wrap(field_keys) + + doc_value = %{ + initial_phases: PipelineSerializer.pack(doc.initial_phases), + source: doc.source + } + + result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys) + + {result, + %{ + doc_id: doc_id, + doc: doc, + field_keys: field_keys, + storage_module: storage_module + }} + end + ) + end + + @doc false + def delete(pubsub, doc_id) do + {storage_module, storage_process_name} = storage_info(pubsub) + + :telemetry.span( + [:absinthe, :subscription, :storage, :delete], + %{ + doc_id: doc_id, + storage_module: storage_module + }, + fn -> + result = storage_module.delete(storage_process_name, doc_id) + + {result, + %{ + doc_id: doc_id, + storage_module: storage_module + }} + end + ) + end + + @doc false + def get_docs_by_field_key(pubsub, field_key) do + {storage_module, storage_process_name} = storage_info(pubsub) + + :telemetry.span( + [:absinthe, :subscription, :storage, :get_docs_by_field_key], + %{ + field_key: field_key, + storage_module: storage_module + }, + fn -> + result = + storage_process_name + |> storage_module.get_docs_by_field_key(field_key) + |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> + initial_phases = PipelineSerializer.unpack(initial_phases) + {doc_id, Map.put(doc, :initial_phases, initial_phases)} + end) + |> Map.new() + + {result, + %{ + field_key: field_key, + storage_module: storage_module + }} + end + ) + end + + defp storage_info(pubsub) do + storage_module = Subscription.document_storage(pubsub) + storage_process_name = Subscription.document_storage_name(pubsub) + {storage_module, storage_process_name} + end end From 079e3c5ac8354a93d591a197e71965b07a52d58b Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Tue, 25 Jun 2024 08:53:02 -0500 Subject: [PATCH 12/15] Documentation update --- lib/absinthe/subscription/default_document_storage.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index b36e3fe4..d6ffea10 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -3,7 +3,7 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do @moduledoc """ Default document storage for Absinthe. Stores subscription - documents and field keys in a Registry process. + documents and field keys in a Registry. """ @impl Absinthe.Subscription.DocumentStorage From 12979d784d08145e285ba2d5c9d8129b9bebc169 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Tue, 25 Jun 2024 09:35:08 -0500 Subject: [PATCH 13/15] Update documentation --- lib/absinthe/subscription/document_storage.ex | 27 +++++++++++++++++++ mix.exs | 3 ++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index da71812e..17821a3b 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -3,6 +3,33 @@ defmodule Absinthe.Subscription.DocumentStorage do Behaviour for storing subscription documents. Used to tell Absinthe how to store documents and the field keys associated with those documents. + + By default, Absinthe uses `Absinthe.Subscription.DefaultDocumentStorage` as + the storage for subscription documents. This behaviour can be implemented to + allow for a custom storage solution if needed. + + The `child_spec` is used so that Absinthe can start your process when starting `Absinthe.Subscription`. + + To tell `Absinthe.Subscription` to use your custom storage, make sure to pass in `document_storage` and `storage_opts` + when adding `Absinthe.Subscription` to your application supervisor. + + ```elixir + {Absinthe.Subscription, pubsub: MyApp.Pubsub, document_storage: MyApp.DocumentStorage, storage_opts: [key1: value1, key2: value2]} + ``` + + Absinthe.Subscription will update `storage_opts` to include a `name` key. This will be the name `Absinthe.Subscription` uses to + reference the process. + + ```elixir + @impl Absinthe.Subscription.DocumentStorage + def child_spec(opts) do + # opts is the `storage_opts` with the `name` key added + { + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]} + } + end + ``` """ alias Absinthe.Subscription diff --git a/mix.exs b/mix.exs index 61d5c58b..0b1bc693 100644 --- a/mix.exs +++ b/mix.exs @@ -184,7 +184,8 @@ defmodule Absinthe.Mixfile do Absinthe.Subscription, Absinthe.Subscription.Pubsub, Absinthe.Subscription.Local, - Absinthe.Subscription.PipelineSerializer + Absinthe.Subscription.PipelineSerializer, + Absinthe.Subscription.DocumentStorage ], Extensibility: [ Absinthe.Pipeline, From 793ed69bf2dd032fab08004236447c80611299b0 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 28 Jun 2024 11:14:22 -0500 Subject: [PATCH 14/15] Let user start their own storage if needed --- lib/absinthe/subscription.ex | 13 +- .../subscription/default_document_storage.ex | 30 +- lib/absinthe/subscription/document_storage.ex | 53 +-- lib/absinthe/subscription/supervisor.ex | 36 +- .../subscription/document_storage_test.exs | 373 ++++++++++++++++++ 5 files changed, 412 insertions(+), 93 deletions(-) create mode 100644 test/absinthe/subscription/document_storage_test.exs diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index c1ec45e6..c360a5d6 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -159,18 +159,13 @@ defmodule Absinthe.Subscription do Module.concat([pubsub, :Registry]) end - @doc false - def document_storage_name(pubsub) do - Module.concat([pubsub, :Storage]) - end - - def document_storage(pubsub) do - {:ok, document_storage} = + def storage_module(pubsub) do + {:ok, storage} = pubsub |> registry_name - |> Registry.meta(:document_storage) + |> Registry.meta(:storage) - document_storage + storage end @doc false diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index d6ffea10..ee48af54 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -1,47 +1,49 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do @behaviour Absinthe.Subscription.DocumentStorage - @moduledoc """ Default document storage for Absinthe. Stores subscription documents and field keys in a Registry. """ - @impl Absinthe.Subscription.DocumentStorage - def child_spec(opts) do - Registry.child_spec(opts) - end + alias Absinthe.Subscription @impl Absinthe.Subscription.DocumentStorage - def put(storage_process_name, doc_id, doc_value, field_keys) do + def put(pubsub, doc_id, doc_value, field_keys) do + registry = Subscription.registry_name(pubsub) + pdict_add_fields(doc_id, field_keys) for field_key <- field_keys do - {:ok, _} = Registry.register(storage_process_name, field_key, doc_id) + {:ok, _} = Registry.register(registry, field_key, doc_id) end - {:ok, _} = Registry.register(storage_process_name, doc_id, doc_value) + {:ok, _} = Registry.register(registry, doc_id, doc_value) end @impl Absinthe.Subscription.DocumentStorage - def delete(storage_process_name, doc_id) do + def delete(pubsub, doc_id) do + registry = Subscription.registry_name(pubsub) + for field_key <- pdict_fields(doc_id) do - Registry.unregister(storage_process_name, field_key) + Registry.unregister(registry, field_key) end pdict_delete_fields(doc_id) - Registry.unregister(storage_process_name, doc_id) + Registry.unregister(registry, doc_id) :ok end @impl Absinthe.Subscription.DocumentStorage - def get_docs_by_field_key(storage_process_name, field_key) do - storage_process_name + def get_docs_by_field_key(pubsub, field_key) do + registry = Subscription.registry_name(pubsub) + + registry |> Registry.lookup(field_key) |> MapSet.new(fn {_pid, doc_id} -> doc_id end) |> Enum.reduce(%{}, fn doc_id, acc -> - case Registry.lookup(storage_process_name, doc_id) do + case Registry.lookup(registry, doc_id) do [] -> acc diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index 17821a3b..7f20f7b1 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -8,47 +8,22 @@ defmodule Absinthe.Subscription.DocumentStorage do the storage for subscription documents. This behaviour can be implemented to allow for a custom storage solution if needed. - The `child_spec` is used so that Absinthe can start your process when starting `Absinthe.Subscription`. - - To tell `Absinthe.Subscription` to use your custom storage, make sure to pass in `document_storage` and `storage_opts` - when adding `Absinthe.Subscription` to your application supervisor. - - ```elixir - {Absinthe.Subscription, pubsub: MyApp.Pubsub, document_storage: MyApp.DocumentStorage, storage_opts: [key1: value1, key2: value2]} - ``` - - Absinthe.Subscription will update `storage_opts` to include a `name` key. This will be the name `Absinthe.Subscription` uses to - reference the process. + When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage` ```elixir - @impl Absinthe.Subscription.DocumentStorage - def child_spec(opts) do - # opts is the `storage_opts` with the `name` key added - { - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]} - } - end + {Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage} ``` """ alias Absinthe.Subscription alias Absinthe.Subscription.PipelineSerializer - @doc """ - Child spec to determine how to start the - Document storage process. This will be supervised. Absinthe will give - the process a name and that name will be passed in the other callbacks - in order to reference it there. - """ - @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() - @doc """ Adds `doc` to storage with `doc_id` as the key. Associates the given `field_keys` with `doc_id`. """ @callback put( - storage_process_name :: atom, + pubsub :: atom, doc_id :: term, doc :: %{ initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), @@ -61,20 +36,20 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc """ Removes the document. Along with any field_keys associated with it """ - @callback delete(storage_process_name :: atom, doc_id :: term) :: :ok + @callback delete(pubsub :: atom, doc_id :: term) :: :ok @doc """ Get all docs associated with `field_key` """ @callback get_docs_by_field_key( - storage_process_name :: atom, + pubsub :: atom, field_key :: {field :: term, key :: term} ) :: map() @doc false def put(pubsub, doc_id, doc, field_keys) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :put], @@ -92,7 +67,7 @@ defmodule Absinthe.Subscription.DocumentStorage do source: doc.source } - result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys) + result = storage_module.put(pubsub, doc_id, doc_value, field_keys) {result, %{ @@ -107,7 +82,7 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc false def delete(pubsub, doc_id) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :delete], @@ -116,7 +91,7 @@ defmodule Absinthe.Subscription.DocumentStorage do storage_module: storage_module }, fn -> - result = storage_module.delete(storage_process_name, doc_id) + result = storage_module.delete(pubsub, doc_id) {result, %{ @@ -129,7 +104,7 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc false def get_docs_by_field_key(pubsub, field_key) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :get_docs_by_field_key], @@ -139,7 +114,7 @@ defmodule Absinthe.Subscription.DocumentStorage do }, fn -> result = - storage_process_name + pubsub |> storage_module.get_docs_by_field_key(field_key) |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> initial_phases = PipelineSerializer.unpack(initial_phases) @@ -155,10 +130,4 @@ defmodule Absinthe.Subscription.DocumentStorage do end ) end - - defp storage_info(pubsub) do - storage_module = Subscription.document_storage(pubsub) - storage_process_name = Subscription.document_storage_name(pubsub) - {storage_module, storage_process_name} - end end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 1d85cd7a..d56ae97a 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -23,44 +23,24 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) + storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage) - document_storage = - Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage) - - storage_opts = - case document_storage do - Absinthe.Subscription.DefaultDocumentStorage -> - [ - keys: :duplicate, - partitions: System.schedulers_online(), - compressed: compress_registry? - ] - - _ -> - Keyword.get(opts, :storage_opts, Keyword.new()) - end - - Supervisor.start_link( - __MODULE__, - {pubsub, pool_size, document_storage, storage_opts} - ) + Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage}) end - def init({pubsub, pool_size, document_storage, storage_opts}) do + def init({pubsub, pool_size, compress_registry?, storage}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size, document_storage: document_storage] - - storage_opts = - Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub)) + meta = [pool_size: pool_size, storage: storage] children = [ {Registry, [ - keys: :unique, + keys: :duplicate, name: registry_name, - meta: meta + partitions: System.schedulers_online(), + meta: meta, + compressed: compress_registry? ]}, - document_storage.child_spec(storage_opts), {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} ] diff --git a/test/absinthe/subscription/document_storage_test.exs b/test/absinthe/subscription/document_storage_test.exs new file mode 100644 index 00000000..970a909b --- /dev/null +++ b/test/absinthe/subscription/document_storage_test.exs @@ -0,0 +1,373 @@ +defmodule Absinthe.Subscription.DocumentStorageTest do + use Absinthe.Case + + defmodule TestDocumentStorageSchema do + use Absinthe.Schema + + query do + field :foo, :string + end + + object :user do + field :id, :id + field :name, :string + + field :group, :group do + resolve fn user, _, %{context: %{test_pid: pid}} -> + batch({__MODULE__, :batch_get_group, pid}, nil, fn _results -> + {:ok, user.group} + end) + end + end + end + + object :group do + field :name, :string + end + + def batch_get_group(test_pid, _) do + # send a message to the test process every time we access this function. + # if batching is working properly, it should only happen once. + send(test_pid, :batch_get_group) + %{} + end + + subscription do + field :raises, :string do + config fn _, _ -> + {:ok, topic: "*"} + end + + resolve fn _, _, _ -> + raise "boom" + end + end + + field :user, :user do + arg :id, :id + + config fn args, _ -> + {:ok, topic: args[:id] || "*"} + end + + trigger :update_user, + topic: fn user -> + [user.id, "*"] + end + end + + field :thing, :string do + arg :client_id, non_null(:id) + + config fn + _args, %{context: %{authorized: false}} -> + {:error, "unauthorized"} + + args, _ -> + { + :ok, + topic: args.client_id + } + end + end + + field :multiple_topics, :string do + config fn _, _ -> + {:ok, topic: ["topic_1", "topic_2", "topic_3"]} + end + end + + field :other_user, :user do + arg :id, :id + + config fn + args, %{context: %{context_id: context_id, document_id: document_id}} -> + {:ok, topic: args[:id] || "*", context_id: context_id, document_id: document_id} + + args, %{context: %{context_id: context_id}} -> + {:ok, topic: args[:id] || "*", context_id: context_id} + end + end + + field :relies_on_document, :string do + config fn _, %{document: %Absinthe.Blueprint{} = document} -> + %{type: :subscription, name: op_name} = Absinthe.Blueprint.current_operation(document) + {:ok, topic: "*", context_id: "*", document_id: op_name} + end + end + end + + mutation do + field :update_user, :user do + arg :id, non_null(:id) + + resolve fn _, %{id: id}, _ -> + {:ok, %{id: id, name: "foo"}} + end + end + end + end + + defmodule TestDocumentStoragePubSub do + @behaviour Absinthe.Subscription.Pubsub + + def start_link() do + Registry.start_link(keys: :duplicate, name: __MODULE__) + end + + def node_name() do + node() + end + + def subscribe(topic) do + Registry.register(__MODULE__, topic, []) + :ok + end + + def publish_subscription(topic, data) do + message = %{ + topic: topic, + event: "subscription:data", + result: data + } + + Registry.dispatch(__MODULE__, topic, fn entries -> + for {pid, _} <- entries, do: send(pid, {:broadcast, message}) + end) + end + + def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do + # this pubsub is local and doesn't support clusters + :ok + end + end + + defmodule TestDocumentStorage do + @behaviour Absinthe.Subscription.DocumentStorage + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl GenServer + def init(_) do + {:ok, %{docs: %{}, field_keys: %{}}} + end + + @impl GenServer + def handle_cast({:put, doc_id, doc_value, field_keys}, %{ + docs: docs, + field_keys: field_keys_map + }) do + docs = Map.put_new(docs, doc_id, doc_value) + + field_keys_map = + Enum.reduce(field_keys, field_keys_map, fn field_key, field_keys_map -> + Map.update(field_keys_map, field_key, [doc_id], fn doc_ids -> [doc_id | doc_ids] end) + end) + + {:noreply, %{docs: docs, field_keys: field_keys_map}} + end + + @impl GenServer + def handle_cast({:delete, doc_id}, %{ + docs: docs, + field_keys: field_keys_map + }) do + docs = Map.delete(docs, doc_id) + + field_keys_map = + Enum.map(field_keys_map, fn {field_key, doc_ids} -> + doc_ids = List.delete(doc_ids, doc_id) + {field_key, doc_ids} + end) + |> Map.new() + + {:noreply, %{docs: docs, field_keys: field_keys_map}} + end + + @impl GenServer + def handle_call( + {:get_docs_by_field_key, field_key}, + _from, + %{ + docs: docs, + field_keys: field_keys_map + } = state + ) do + doc_ids = Map.get(field_keys_map, field_key, []) + + docs_to_return = + docs + |> Enum.filter(fn {doc_id, _} -> doc_id in doc_ids end) + |> Map.new() + + {:reply, docs_to_return, state} + end + + @impl Absinthe.Subscription.DocumentStorage + def put(_pubsub, doc_id, doc_value, field_keys) do + GenServer.cast(__MODULE__, {:put, doc_id, doc_value, field_keys}) + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def delete(_pubsub, doc_id) do + GenServer.cast(__MODULE__, {:delete, doc_id}) + + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def get_docs_by_field_key(_pubsub, field_key) do + GenServer.call(__MODULE__, {:get_docs_by_field_key, field_key}) + end + end + + def run_subscription(query, schema, opts \\ []) do + opts = + Keyword.update( + opts, + :context, + %{pubsub: TestDocumentStoragePubSub}, + &Map.put(&1, :pubsub, opts[:context][:pubsub] || TestDocumentStoragePubSub) + ) + + case run(query, schema, opts) do + {:ok, %{"subscribed" => topic}} = val -> + opts[:context][:pubsub].subscribe(topic) + val + + val -> + val + end + end + + setup do + start_supervised(TestDocumentStorage) + + start_supervised(%{ + id: TestDocumentStoragePubSub, + start: {TestDocumentStoragePubSub, :start_link, []} + }) + + start_supervised( + {Absinthe.Subscription, pubsub: TestDocumentStoragePubSub, storage: TestDocumentStorage} + ) + + :ok + end + + @query """ + subscription ($clientId: ID!) { + thing(clientId: $clientId) + } + """ + test "can subscribe the current process" do + client_id = "abc" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{"clientId" => client_id}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", thing: client_id) + + assert_receive({:broadcast, msg}) + + assert %{ + event: "subscription:data", + result: %{data: %{"thing" => "foo"}}, + topic: topic + } == msg + end + + @query """ + subscription ($clientId: ID!) { + thing(clientId: $clientId) + } + """ + test "can unsubscribe the current process" do + client_id = "abc" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{"clientId" => client_id}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.unsubscribe(TestDocumentStoragePubSub, topic) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", thing: client_id) + + refute_receive({:broadcast, _}) + end + + @query """ + subscription { + multipleTopics + } + """ + test "schema can provide multiple topics to subscribe to" do + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + msg = %{ + event: "subscription:data", + result: %{data: %{"multipleTopics" => "foo"}}, + topic: topic + } + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_1") + + assert_receive({:broadcast, ^msg}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_2") + + assert_receive({:broadcast, ^msg}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_3") + + assert_receive({:broadcast, ^msg}) + end + + @query """ + subscription { + multipleTopics + } + """ + test "unsubscription works when multiple topics are provided" do + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.unsubscribe(TestDocumentStoragePubSub, topic) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_1") + + refute_receive({:broadcast, _}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_2") + + refute_receive({:broadcast, _}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_3") + + refute_receive({:broadcast, _}) + end +end From a55d6a38b37a4ea977a68c59427ad63cb5f2a3db Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 28 Jun 2024 12:38:09 -0500 Subject: [PATCH 15/15] Update docs --- guides/telemetry.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/guides/telemetry.md b/guides/telemetry.md index 304691ef..fc94dc83 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -13,6 +13,12 @@ handler function to any of the following event names: - `[:absinthe, :resolve, :field, :stop]` when field resolution finishes - `[:absinthe, :middleware, :batch, :start]` when the batch processing starts - `[:absinthe, :middleware, :batch, :stop]` when the batch processing finishes +- `[:absinthe, :subscription, :storage, :put, :start]` when document subscription storage starts +- `[:absinthe, :subscription, :storage, :put, :stop]` when document subscription storage finishes +- `[:absinthe, :subscription, :storage, :delete, :start]` when document subscription storage deletion starts +- `[:absinthe, :subscription, :storage, :delete, :stop]` when document subscription storage deletion finishes +- `[:absinthe, :subscription, :storage, :get_docs_by_field_key, :start]` when document subscription storage retrieval starts +- `[:absinthe, :subscription, :storage, :get_docs_by_field_key, :stop]` when document subscription storage retrieval finishes Telemetry handlers are called with `measurements` and `metadata`. For details on what is passed, checkout `Absinthe.Phase.Telemetry`, `Absinthe.Middleware.Telemetry`,