diff --git a/guides/telemetry.md b/guides/telemetry.md index 304691ef..fc94dc83 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -13,6 +13,12 @@ handler function to any of the following event names: - `[:absinthe, :resolve, :field, :stop]` when field resolution finishes - `[:absinthe, :middleware, :batch, :start]` when the batch processing starts - `[:absinthe, :middleware, :batch, :stop]` when the batch processing finishes +- `[:absinthe, :subscription, :storage, :put, :start]` when document subscription storage starts +- `[:absinthe, :subscription, :storage, :put, :stop]` when document subscription storage finishes +- `[:absinthe, :subscription, :storage, :delete, :start]` when document subscription storage deletion starts +- `[:absinthe, :subscription, :storage, :delete, :stop]` when document subscription storage deletion finishes +- `[:absinthe, :subscription, :storage, :get_docs_by_field_key, :start]` when document subscription storage retrieval starts +- `[:absinthe, :subscription, :storage, :get_docs_by_field_key, :stop]` when document subscription storage retrieval finishes Telemetry handlers are called with `measurements` and `metadata`. For details on what is passed, checkout `Absinthe.Phase.Telemetry`, `Absinthe.Middleware.Telemetry`, diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index fb3c8928..c360a5d6 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -30,7 +30,7 @@ defmodule Absinthe.Subscription do alias __MODULE__ - alias Absinthe.Subscription.PipelineSerializer + alias Absinthe.Subscription.DocumentStorage @doc """ Add Absinthe.Subscription to your process tree. @@ -141,68 +141,17 @@ defmodule Absinthe.Subscription do @doc false def subscribe(pubsub, field_keys, doc_id, doc) do - field_keys = List.wrap(field_keys) - - registry = pubsub |> registry_name - - doc_value = %{ - initial_phases: PipelineSerializer.pack(doc.initial_phases), - source: doc.source - } - - pdict_add_fields(doc_id, field_keys) - - for field_key <- field_keys do - {:ok, _} = Registry.register(registry, field_key, doc_id) - end - - {:ok, _} = Registry.register(registry, doc_id, doc_value) - end - - defp pdict_fields(doc_id) do - Process.get({__MODULE__, doc_id}, []) - end - - defp pdict_add_fields(doc_id, field_keys) do - Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id)) - end - - defp pdict_delete_fields(doc_id) do - Process.delete({__MODULE__, doc_id}) + DocumentStorage.put(pubsub, doc_id, doc, field_keys) end @doc false def unsubscribe(pubsub, doc_id) do - registry = pubsub |> registry_name - - for field_key <- pdict_fields(doc_id) do - Registry.unregister(registry, field_key) - end - - Registry.unregister(registry, doc_id) - - pdict_delete_fields(doc_id) - :ok + DocumentStorage.delete(pubsub, doc_id) end @doc false def get(pubsub, key) do - name = registry_name(pubsub) - - name - |> Registry.lookup(key) - |> MapSet.new(fn {_pid, doc_id} -> doc_id end) - |> Enum.reduce(%{}, fn doc_id, acc -> - case Registry.lookup(name, doc_id) do - [] -> - acc - - [{_pid, doc} | _rest] -> - Map.put_new_lazy(acc, doc_id, fn -> - Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1) - end) - end - end) + DocumentStorage.get_docs_by_field_key(pubsub, key) end @doc false @@ -210,6 +159,15 @@ defmodule Absinthe.Subscription do Module.concat([pubsub, :Registry]) end + def storage_module(pubsub) do + {:ok, storage} = + pubsub + |> registry_name + |> Registry.meta(:storage) + + storage + end + @doc false def publish_remote(pubsub, mutation_result, subscribed_fields) do {:ok, pool_size} = diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex new file mode 100644 index 00000000..ee48af54 --- /dev/null +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -0,0 +1,67 @@ +defmodule Absinthe.Subscription.DefaultDocumentStorage do + @behaviour Absinthe.Subscription.DocumentStorage + @moduledoc """ + Default document storage for Absinthe. Stores subscription + documents and field keys in a Registry. + """ + + alias Absinthe.Subscription + + @impl Absinthe.Subscription.DocumentStorage + def put(pubsub, doc_id, doc_value, field_keys) do + registry = Subscription.registry_name(pubsub) + + pdict_add_fields(doc_id, field_keys) + + for field_key <- field_keys do + {:ok, _} = Registry.register(registry, field_key, doc_id) + end + + {:ok, _} = Registry.register(registry, doc_id, doc_value) + end + + @impl Absinthe.Subscription.DocumentStorage + def delete(pubsub, doc_id) do + registry = Subscription.registry_name(pubsub) + + for field_key <- pdict_fields(doc_id) do + Registry.unregister(registry, field_key) + end + + pdict_delete_fields(doc_id) + + Registry.unregister(registry, doc_id) + + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def get_docs_by_field_key(pubsub, field_key) do + registry = Subscription.registry_name(pubsub) + + registry + |> Registry.lookup(field_key) + |> MapSet.new(fn {_pid, doc_id} -> doc_id end) + |> Enum.reduce(%{}, fn doc_id, acc -> + case Registry.lookup(registry, doc_id) do + [] -> + acc + + [{_pid, doc} | _rest] -> + Map.put_new(acc, doc_id, doc) + end + end) + end + + defp pdict_fields(doc_id) do + Process.get({__MODULE__, doc_id}, []) + end + + defp pdict_add_fields(doc_id, field_keys) do + Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id)) + end + + defp pdict_delete_fields(doc_id) do + Process.delete({__MODULE__, doc_id}) + end +end diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex new file mode 100644 index 00000000..7f20f7b1 --- /dev/null +++ b/lib/absinthe/subscription/document_storage.ex @@ -0,0 +1,133 @@ +defmodule Absinthe.Subscription.DocumentStorage do + @moduledoc """ + Behaviour for storing subscription documents. Used to tell + Absinthe how to store documents and the field keys associated with those + documents. + + By default, Absinthe uses `Absinthe.Subscription.DefaultDocumentStorage` as + the storage for subscription documents. This behaviour can be implemented to + allow for a custom storage solution if needed. + + When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage` + + ```elixir + {Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage} + ``` + """ + + alias Absinthe.Subscription + alias Absinthe.Subscription.PipelineSerializer + + @doc """ + Adds `doc` to storage with `doc_id` as the key. Associates the given + `field_keys` with `doc_id`. + """ + @callback put( + pubsub :: atom, + doc_id :: term, + doc :: %{ + initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), + source: binary() + }, + field_keys :: [{field :: term, key :: term}] + ) :: + {:ok, term} | {:error, :reason} + + @doc """ + Removes the document. Along with any field_keys associated with it + """ + @callback delete(pubsub :: atom, doc_id :: term) :: :ok + + @doc """ + Get all docs associated with `field_key` + """ + @callback get_docs_by_field_key( + pubsub :: atom, + field_key :: {field :: term, key :: term} + ) :: + map() + + @doc false + def put(pubsub, doc_id, doc, field_keys) do + storage_module = Subscription.storage_module(pubsub) + + :telemetry.span( + [:absinthe, :subscription, :storage, :put], + %{ + doc_id: doc_id, + doc: doc, + field_keys: field_keys, + storage_module: storage_module + }, + fn -> + field_keys = List.wrap(field_keys) + + doc_value = %{ + initial_phases: PipelineSerializer.pack(doc.initial_phases), + source: doc.source + } + + result = storage_module.put(pubsub, doc_id, doc_value, field_keys) + + {result, + %{ + doc_id: doc_id, + doc: doc, + field_keys: field_keys, + storage_module: storage_module + }} + end + ) + end + + @doc false + def delete(pubsub, doc_id) do + storage_module = Subscription.storage_module(pubsub) + + :telemetry.span( + [:absinthe, :subscription, :storage, :delete], + %{ + doc_id: doc_id, + storage_module: storage_module + }, + fn -> + result = storage_module.delete(pubsub, doc_id) + + {result, + %{ + doc_id: doc_id, + storage_module: storage_module + }} + end + ) + end + + @doc false + def get_docs_by_field_key(pubsub, field_key) do + storage_module = Subscription.storage_module(pubsub) + + :telemetry.span( + [:absinthe, :subscription, :storage, :get_docs_by_field_key], + %{ + field_key: field_key, + storage_module: storage_module + }, + fn -> + result = + pubsub + |> storage_module.get_docs_by_field_key(field_key) + |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> + initial_phases = PipelineSerializer.unpack(initial_phases) + {doc_id, Map.put(doc, :initial_phases, initial_phases)} + end) + |> Map.new() + + {result, + %{ + field_key: field_key, + storage_module: storage_module + }} + end + ) + end +end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 7b63ae4a..d56ae97a 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -23,13 +23,14 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) + storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage) - Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?}) + Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage}) end - def init({pubsub, pool_size, compress_registry?}) do + def init({pubsub, pool_size, compress_registry?, storage}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size] + meta = [pool_size: pool_size, storage: storage] children = [ {Registry, diff --git a/mix.exs b/mix.exs index 61d5c58b..0b1bc693 100644 --- a/mix.exs +++ b/mix.exs @@ -184,7 +184,8 @@ defmodule Absinthe.Mixfile do Absinthe.Subscription, Absinthe.Subscription.Pubsub, Absinthe.Subscription.Local, - Absinthe.Subscription.PipelineSerializer + Absinthe.Subscription.PipelineSerializer, + Absinthe.Subscription.DocumentStorage ], Extensibility: [ Absinthe.Pipeline, diff --git a/test/absinthe/subscription/document_storage_test.exs b/test/absinthe/subscription/document_storage_test.exs new file mode 100644 index 00000000..970a909b --- /dev/null +++ b/test/absinthe/subscription/document_storage_test.exs @@ -0,0 +1,373 @@ +defmodule Absinthe.Subscription.DocumentStorageTest do + use Absinthe.Case + + defmodule TestDocumentStorageSchema do + use Absinthe.Schema + + query do + field :foo, :string + end + + object :user do + field :id, :id + field :name, :string + + field :group, :group do + resolve fn user, _, %{context: %{test_pid: pid}} -> + batch({__MODULE__, :batch_get_group, pid}, nil, fn _results -> + {:ok, user.group} + end) + end + end + end + + object :group do + field :name, :string + end + + def batch_get_group(test_pid, _) do + # send a message to the test process every time we access this function. + # if batching is working properly, it should only happen once. + send(test_pid, :batch_get_group) + %{} + end + + subscription do + field :raises, :string do + config fn _, _ -> + {:ok, topic: "*"} + end + + resolve fn _, _, _ -> + raise "boom" + end + end + + field :user, :user do + arg :id, :id + + config fn args, _ -> + {:ok, topic: args[:id] || "*"} + end + + trigger :update_user, + topic: fn user -> + [user.id, "*"] + end + end + + field :thing, :string do + arg :client_id, non_null(:id) + + config fn + _args, %{context: %{authorized: false}} -> + {:error, "unauthorized"} + + args, _ -> + { + :ok, + topic: args.client_id + } + end + end + + field :multiple_topics, :string do + config fn _, _ -> + {:ok, topic: ["topic_1", "topic_2", "topic_3"]} + end + end + + field :other_user, :user do + arg :id, :id + + config fn + args, %{context: %{context_id: context_id, document_id: document_id}} -> + {:ok, topic: args[:id] || "*", context_id: context_id, document_id: document_id} + + args, %{context: %{context_id: context_id}} -> + {:ok, topic: args[:id] || "*", context_id: context_id} + end + end + + field :relies_on_document, :string do + config fn _, %{document: %Absinthe.Blueprint{} = document} -> + %{type: :subscription, name: op_name} = Absinthe.Blueprint.current_operation(document) + {:ok, topic: "*", context_id: "*", document_id: op_name} + end + end + end + + mutation do + field :update_user, :user do + arg :id, non_null(:id) + + resolve fn _, %{id: id}, _ -> + {:ok, %{id: id, name: "foo"}} + end + end + end + end + + defmodule TestDocumentStoragePubSub do + @behaviour Absinthe.Subscription.Pubsub + + def start_link() do + Registry.start_link(keys: :duplicate, name: __MODULE__) + end + + def node_name() do + node() + end + + def subscribe(topic) do + Registry.register(__MODULE__, topic, []) + :ok + end + + def publish_subscription(topic, data) do + message = %{ + topic: topic, + event: "subscription:data", + result: data + } + + Registry.dispatch(__MODULE__, topic, fn entries -> + for {pid, _} <- entries, do: send(pid, {:broadcast, message}) + end) + end + + def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do + # this pubsub is local and doesn't support clusters + :ok + end + end + + defmodule TestDocumentStorage do + @behaviour Absinthe.Subscription.DocumentStorage + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl GenServer + def init(_) do + {:ok, %{docs: %{}, field_keys: %{}}} + end + + @impl GenServer + def handle_cast({:put, doc_id, doc_value, field_keys}, %{ + docs: docs, + field_keys: field_keys_map + }) do + docs = Map.put_new(docs, doc_id, doc_value) + + field_keys_map = + Enum.reduce(field_keys, field_keys_map, fn field_key, field_keys_map -> + Map.update(field_keys_map, field_key, [doc_id], fn doc_ids -> [doc_id | doc_ids] end) + end) + + {:noreply, %{docs: docs, field_keys: field_keys_map}} + end + + @impl GenServer + def handle_cast({:delete, doc_id}, %{ + docs: docs, + field_keys: field_keys_map + }) do + docs = Map.delete(docs, doc_id) + + field_keys_map = + Enum.map(field_keys_map, fn {field_key, doc_ids} -> + doc_ids = List.delete(doc_ids, doc_id) + {field_key, doc_ids} + end) + |> Map.new() + + {:noreply, %{docs: docs, field_keys: field_keys_map}} + end + + @impl GenServer + def handle_call( + {:get_docs_by_field_key, field_key}, + _from, + %{ + docs: docs, + field_keys: field_keys_map + } = state + ) do + doc_ids = Map.get(field_keys_map, field_key, []) + + docs_to_return = + docs + |> Enum.filter(fn {doc_id, _} -> doc_id in doc_ids end) + |> Map.new() + + {:reply, docs_to_return, state} + end + + @impl Absinthe.Subscription.DocumentStorage + def put(_pubsub, doc_id, doc_value, field_keys) do + GenServer.cast(__MODULE__, {:put, doc_id, doc_value, field_keys}) + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def delete(_pubsub, doc_id) do + GenServer.cast(__MODULE__, {:delete, doc_id}) + + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def get_docs_by_field_key(_pubsub, field_key) do + GenServer.call(__MODULE__, {:get_docs_by_field_key, field_key}) + end + end + + def run_subscription(query, schema, opts \\ []) do + opts = + Keyword.update( + opts, + :context, + %{pubsub: TestDocumentStoragePubSub}, + &Map.put(&1, :pubsub, opts[:context][:pubsub] || TestDocumentStoragePubSub) + ) + + case run(query, schema, opts) do + {:ok, %{"subscribed" => topic}} = val -> + opts[:context][:pubsub].subscribe(topic) + val + + val -> + val + end + end + + setup do + start_supervised(TestDocumentStorage) + + start_supervised(%{ + id: TestDocumentStoragePubSub, + start: {TestDocumentStoragePubSub, :start_link, []} + }) + + start_supervised( + {Absinthe.Subscription, pubsub: TestDocumentStoragePubSub, storage: TestDocumentStorage} + ) + + :ok + end + + @query """ + subscription ($clientId: ID!) { + thing(clientId: $clientId) + } + """ + test "can subscribe the current process" do + client_id = "abc" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{"clientId" => client_id}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", thing: client_id) + + assert_receive({:broadcast, msg}) + + assert %{ + event: "subscription:data", + result: %{data: %{"thing" => "foo"}}, + topic: topic + } == msg + end + + @query """ + subscription ($clientId: ID!) { + thing(clientId: $clientId) + } + """ + test "can unsubscribe the current process" do + client_id = "abc" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{"clientId" => client_id}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.unsubscribe(TestDocumentStoragePubSub, topic) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", thing: client_id) + + refute_receive({:broadcast, _}) + end + + @query """ + subscription { + multipleTopics + } + """ + test "schema can provide multiple topics to subscribe to" do + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + msg = %{ + event: "subscription:data", + result: %{data: %{"multipleTopics" => "foo"}}, + topic: topic + } + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_1") + + assert_receive({:broadcast, ^msg}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_2") + + assert_receive({:broadcast, ^msg}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_3") + + assert_receive({:broadcast, ^msg}) + end + + @query """ + subscription { + multipleTopics + } + """ + test "unsubscription works when multiple topics are provided" do + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.unsubscribe(TestDocumentStoragePubSub, topic) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_1") + + refute_receive({:broadcast, _}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_2") + + refute_receive({:broadcast, _}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_3") + + refute_receive({:broadcast, _}) + end +end