Skip to content
Open
6 changes: 6 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ handler function to any of the following event names:
- `[:absinthe, :resolve, :field, :stop]` when field resolution finishes
- `[:absinthe, :middleware, :batch, :start]` when the batch processing starts
- `[:absinthe, :middleware, :batch, :stop]` when the batch processing finishes
- `[:absinthe, :subscription, :storage, :put, :start]` when document subscription storage starts
- `[:absinthe, :subscription, :storage, :put, :stop]` when document subscription storage finishes
- `[:absinthe, :subscription, :storage, :delete, :start]` when document subscription storage deletion starts
- `[:absinthe, :subscription, :storage, :delete, :stop]` when document subscription storage deletion finishes
- `[:absinthe, :subscription, :storage, :get_docs_by_field_key, :start]` when document subscription storage retrieval starts
- `[:absinthe, :subscription, :storage, :get_docs_by_field_key, :stop]` when document subscription storage retrieval finishes

Telemetry handlers are called with `measurements` and `metadata`. For details on
what is passed, checkout `Absinthe.Phase.Telemetry`, `Absinthe.Middleware.Telemetry`,
Expand Down
68 changes: 13 additions & 55 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Absinthe.Subscription do

alias __MODULE__

alias Absinthe.Subscription.PipelineSerializer
alias Absinthe.Subscription.DocumentStorage

@doc """
Add Absinthe.Subscription to your process tree.
Expand Down Expand Up @@ -141,75 +141,33 @@ defmodule Absinthe.Subscription do

@doc false
def subscribe(pubsub, field_keys, doc_id, doc) do
field_keys = List.wrap(field_keys)

registry = pubsub |> registry_name

doc_value = %{
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}

pdict_add_fields(doc_id, field_keys)

for field_key <- field_keys do
{:ok, _} = Registry.register(registry, field_key, doc_id)
end

{:ok, _} = Registry.register(registry, doc_id, doc_value)
end

defp pdict_fields(doc_id) do
Process.get({__MODULE__, doc_id}, [])
end

defp pdict_add_fields(doc_id, field_keys) do
Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id))
end

defp pdict_delete_fields(doc_id) do
Process.delete({__MODULE__, doc_id})
DocumentStorage.put(pubsub, doc_id, doc, field_keys)
end

@doc false
def unsubscribe(pubsub, doc_id) do
registry = pubsub |> registry_name

for field_key <- pdict_fields(doc_id) do
Registry.unregister(registry, field_key)
end

Registry.unregister(registry, doc_id)

pdict_delete_fields(doc_id)
:ok
DocumentStorage.delete(pubsub, doc_id)
end

@doc false
def get(pubsub, key) do
name = registry_name(pubsub)

name
|> Registry.lookup(key)
|> MapSet.new(fn {_pid, doc_id} -> doc_id end)
|> Enum.reduce(%{}, fn doc_id, acc ->
case Registry.lookup(name, doc_id) do
[] ->
acc

[{_pid, doc} | _rest] ->
Map.put_new_lazy(acc, doc_id, fn ->
Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)
end)
end
end)
DocumentStorage.get_docs_by_field_key(pubsub, key)
end

@doc false
def registry_name(pubsub) do
Module.concat([pubsub, :Registry])
end

def storage_module(pubsub) do
{:ok, storage} =
pubsub
|> registry_name
|> Registry.meta(:storage)

storage
end

@doc false
def publish_remote(pubsub, mutation_result, subscribed_fields) do
{:ok, pool_size} =
Expand Down
67 changes: 67 additions & 0 deletions lib/absinthe/subscription/default_document_storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Absinthe.Subscription.DefaultDocumentStorage do
@behaviour Absinthe.Subscription.DocumentStorage
@moduledoc """
Default document storage for Absinthe. Stores subscription
documents and field keys in a Registry.
"""

alias Absinthe.Subscription

@impl Absinthe.Subscription.DocumentStorage
def put(pubsub, doc_id, doc_value, field_keys) do
registry = Subscription.registry_name(pubsub)

pdict_add_fields(doc_id, field_keys)

for field_key <- field_keys do
{:ok, _} = Registry.register(registry, field_key, doc_id)
end

{:ok, _} = Registry.register(registry, doc_id, doc_value)
end

@impl Absinthe.Subscription.DocumentStorage
def delete(pubsub, doc_id) do
registry = Subscription.registry_name(pubsub)

for field_key <- pdict_fields(doc_id) do
Registry.unregister(registry, field_key)
end

pdict_delete_fields(doc_id)

Registry.unregister(registry, doc_id)

:ok
end

@impl Absinthe.Subscription.DocumentStorage
def get_docs_by_field_key(pubsub, field_key) do
registry = Subscription.registry_name(pubsub)

registry
|> Registry.lookup(field_key)
|> MapSet.new(fn {_pid, doc_id} -> doc_id end)
|> Enum.reduce(%{}, fn doc_id, acc ->
case Registry.lookup(registry, doc_id) do
[] ->
acc

[{_pid, doc} | _rest] ->
Map.put_new(acc, doc_id, doc)
end
end)
end

defp pdict_fields(doc_id) do
Process.get({__MODULE__, doc_id}, [])
end

defp pdict_add_fields(doc_id, field_keys) do
Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id))
end

defp pdict_delete_fields(doc_id) do
Process.delete({__MODULE__, doc_id})
end
end
133 changes: 133 additions & 0 deletions lib/absinthe/subscription/document_storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
defmodule Absinthe.Subscription.DocumentStorage do
@moduledoc """
Behaviour for storing subscription documents. Used to tell
Absinthe how to store documents and the field keys associated with those
documents.

By default, Absinthe uses `Absinthe.Subscription.DefaultDocumentStorage` as
the storage for subscription documents. This behaviour can be implemented to
allow for a custom storage solution if needed.

When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage`

```elixir
{Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage}
```
"""

alias Absinthe.Subscription
alias Absinthe.Subscription.PipelineSerializer

@doc """
Adds `doc` to storage with `doc_id` as the key. Associates the given
`field_keys` with `doc_id`.
"""
@callback put(
pubsub :: atom,
doc_id :: term,
doc :: %{
initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(),
source: binary()
},
field_keys :: [{field :: term, key :: term}]
) ::
{:ok, term} | {:error, :reason}

@doc """
Removes the document. Along with any field_keys associated with it
"""
@callback delete(pubsub :: atom, doc_id :: term) :: :ok

@doc """
Get all docs associated with `field_key`
"""
@callback get_docs_by_field_key(
pubsub :: atom,
field_key :: {field :: term, key :: term}
) ::
map()

@doc false
def put(pubsub, doc_id, doc, field_keys) do
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :put],
%{
doc_id: doc_id,
doc: doc,
field_keys: field_keys,
storage_module: storage_module
},
fn ->
field_keys = List.wrap(field_keys)

doc_value = %{
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}

result = storage_module.put(pubsub, doc_id, doc_value, field_keys)

{result,
%{
doc_id: doc_id,
doc: doc,
field_keys: field_keys,
storage_module: storage_module
}}
end
)
end

@doc false
def delete(pubsub, doc_id) do
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :delete],
%{
doc_id: doc_id,
storage_module: storage_module
},
fn ->
result = storage_module.delete(pubsub, doc_id)

{result,
%{
doc_id: doc_id,
storage_module: storage_module
}}
end
)
end

@doc false
def get_docs_by_field_key(pubsub, field_key) do
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :get_docs_by_field_key],
%{
field_key: field_key,
storage_module: storage_module
},
fn ->
result =
pubsub
|> storage_module.get_docs_by_field_key(field_key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()

{result,
%{
field_key: field_key,
storage_module: storage_module
}}
end
)
end
end
7 changes: 4 additions & 3 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ defmodule Absinthe.Subscription.Supervisor do

pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
compress_registry? = Keyword.get(opts, :compress_registry?, true)
storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage)

Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?})
Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage})
end

def init({pubsub, pool_size, compress_registry?}) do
def init({pubsub, pool_size, compress_registry?, storage}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size]
meta = [pool_size: pool_size, storage: storage]

children = [
{Registry,
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ defmodule Absinthe.Mixfile do
Absinthe.Subscription,
Absinthe.Subscription.Pubsub,
Absinthe.Subscription.Local,
Absinthe.Subscription.PipelineSerializer
Absinthe.Subscription.PipelineSerializer,
Absinthe.Subscription.DocumentStorage
],
Extensibility: [
Absinthe.Pipeline,
Expand Down
Loading