@@ -109,8 +109,7 @@ defmodule Registry do
109109
110110 In this example, we will also set the number of partitions to the number of
111111 schedulers online, which will make the registry more performant on highly
112- concurrent environments as each partition will spawn a new process, allowing
113- dispatching to happen in parallel:
112+ concurrent environments:
114113
115114 {:ok, _} = Registry.start_link(keys: :duplicate, name: Registry.PubSubTest,
116115 partitions: System.schedulers_online)
@@ -362,17 +361,16 @@ defmodule Registry do
362361 associated to the pid. If there are no entries for the given key,
363362 the callback is never invoked.
364363
365- If the registry is not partitioned, the callback is invoked in the process
366- that calls `dispatch/3`. If the registry is partitioned, the callback is
367- invoked concurrently per partition by starting a task linked to the
368- caller. The callback, however, is only invoked if there are entries for that
369- partition.
364+ If the registry is partitioned, the callback is invoked multiple times
365+ per partition. If the registry is partitioned and `parallel: true` is
366+ given as an option, the dispatching happens in parallel. In both cases,
367+ the callback is only invoked if there are entries for that partition.
370368
371369 See the module documentation for examples of using the `dispatch/3`
372370 function for building custom dispatching or a pubsub system.
373371 """
374- @ spec dispatch ( registry , key , ( entries :: [ { pid , value } ] -> term ) ) :: :ok
375- def dispatch ( registry , key , mfa_or_fun )
372+ @ spec dispatch ( registry , key , ( entries :: [ { pid , value } ] -> term ) , keyword ) :: :ok
373+ def dispatch ( registry , key , mfa_or_fun , opts \\ [ ] )
376374 when is_atom ( registry ) and is_function ( mfa_or_fun , 1 )
377375 when is_atom ( registry ) and tuple_size ( mfa_or_fun ) == 3 do
378376 case key_info! ( registry ) do
@@ -386,17 +384,33 @@ defmodule Registry do
386384 |> safe_lookup_second ( key )
387385 |> apply_non_empty_to_mfa_or_fun ( mfa_or_fun )
388386 { :duplicate , partitions , _ } ->
389- registry
390- |> dispatch_task ( key , mfa_or_fun , partitions )
391- |> Enum . each ( & Task . await ( & 1 , :infinity ) )
387+ if Keyword . get ( opts , :parallel , false ) do
388+ registry
389+ |> dispatch_parallel ( key , mfa_or_fun , partitions )
390+ |> Enum . each ( & Task . await ( & 1 , :infinity ) )
391+ else
392+ dispatch_serial ( registry , key , mfa_or_fun , partitions )
393+ end
392394 end
393395 :ok
394396 end
395397
396- defp dispatch_task ( _registry , _key , _mfa_or_fun , 0 ) do
398+ defp dispatch_serial ( _registry , _key , _mfa_or_fun , 0 ) do
399+ :ok
400+ end
401+ defp dispatch_serial ( registry , key , mfa_or_fun , partition ) do
402+ partition = partition - 1
403+ registry
404+ |> key_ets! ( partition )
405+ |> safe_lookup_second ( key )
406+ |> apply_non_empty_to_mfa_or_fun ( mfa_or_fun )
407+ dispatch_serial ( registry , key , mfa_or_fun , partition )
408+ end
409+
410+ defp dispatch_parallel ( _registry , _key , _mfa_or_fun , 0 ) do
397411 [ ]
398412 end
399- defp dispatch_task ( registry , key , mfa_or_fun , partition ) do
413+ defp dispatch_parallel ( registry , key , mfa_or_fun , partition ) do
400414 partition = partition - 1
401415 parent = self ( )
402416 task = Task . async ( fn ->
@@ -407,7 +421,7 @@ defmodule Registry do
407421 Process . unlink ( parent )
408422 :ok
409423 end )
410- [ task | dispatch_task ( registry , key , mfa_or_fun , partition ) ]
424+ [ task | dispatch_parallel ( registry , key , mfa_or_fun , partition ) ]
411425 end
412426
413427 defp apply_non_empty_to_mfa_or_fun ( [ ] , _mfa_or_fun ) do
0 commit comments