@@ -174,8 +174,9 @@ defmodule GenEvent do
174174 * `:id` - the event stream id for cancellation
175175 * `:timeout` - the timeout in between events, defaults to `:infinity`
176176 * `:duration` - the duration of the subscription, defaults to `:infinity`
177+ * `:mode` - if the subscription mode is sync or async, defaults to `:sync`
177178 """
178- defstruct manager: nil , id: nil , timeout: :infinity , duration: :infinity
179+ defstruct manager: nil , id: nil , timeout: :infinity , duration: :infinity , mode: :sync
179180
180181 @ doc false
181182 defmacro __using__ ( _ ) do
@@ -267,18 +268,24 @@ defmodule GenEvent do
267268 protocol. The supported options are:
268269
269270 * `:id` - an id to identify all live stream instances; when an `:id` is
270- given, existing streams can be called with via `cancel_streams`
271+ given, existing streams can be called with via `cancel_streams`.
271272
272- * `:timeout` (Enumerable) - raises if no event arrives in X milliseconds
273+ * `:timeout` (Enumerable) - raises if no event arrives in X milliseconds.
273274
274275 * `:duration` (Enumerable) - only consume events during the X milliseconds
275- from the streaming start
276+ from the streaming start.
277+
278+ * `:mode` - the mode to consume events, can be `:sync` (default) or
279+ `:async`. On sync, the event manager waits for the event to be consumed
280+ before moving on to the next event handler.
281+
276282 """
277283 def stream ( manager , options \\ [ ] ) do
278284 % GenEvent { manager: manager ,
279285 id: Keyword . get ( options , :id ) ,
280286 timeout: Keyword . get ( options , :timeout , :infinity ) ,
281- duration: Keyword . get ( options , :duration , :infinity ) }
287+ duration: Keyword . get ( options , :duration , :infinity ) ,
288+ mode: Keyword . get ( options , :mode , :sync ) }
282289 end
283290
284291 @ doc """
@@ -449,24 +456,34 @@ defimpl Enumerable, for: GenEvent do
449456 use GenEvent
450457
451458 @ doc false
452- def init ( { mon_pid , pid , ref } ) do
459+ def init ( { _mode , mon_pid , _pid , ref } = state ) do
453460 # Tell the mon_pid we are good to go, and send self() so that this handler
454461 # can be removed later without using the managers name.
455462 send ( mon_pid , { :UP , ref , self ( ) } )
456- { :ok , { pid , ref } }
463+ { :ok , state }
457464 end
458465
459466 @ doc false
460- def handle_event ( event , { pid , ref } = state ) do
461- send pid , { ref , event }
467+ def handle_event ( event , { :sync , mon_pid , pid , ref } = state ) do
468+ sync = Process . monitor ( mon_pid )
469+ send pid , { ref , sync , event }
470+ receive do
471+ { ^ sync , :done } -> Process . demonitor ( sync , [ :flush ] )
472+ { :DOWN , ^ sync , _ , _ , _ } -> :ok
473+ end
474+ { :ok , state }
475+ end
476+
477+ def handle_event ( event , { :async , _mon_pid , pid , ref } = state ) do
478+ send pid , { ref , nil , event }
462479 { :ok , state }
463480 end
464481
465482 def reduce ( stream , acc , fun ) do
466483 start_fun = fn ( ) -> start ( stream ) end
467484 next_fun = & next ( stream , & 1 )
468485 stop_fun = & stop ( stream , & 1 )
469- Stream . resource ( start_fun , next_fun , stop_fun ) . ( acc , fun )
486+ Stream . resource ( start_fun , next_fun , stop_fun ) . ( acc , wrap_reducer ( fun ) )
470487 end
471488
472489 def count ( _stream ) do
@@ -477,24 +494,37 @@ defimpl Enumerable, for: GenEvent do
477494 { :error , __MODULE__ }
478495 end
479496
480- defp start ( % { manager: manager , id: id , duration: duration } = stream ) do
481- { mon_pid , mon_ref } = add_handler ( manager , id , duration )
497+ defp wrap_reducer ( fun ) do
498+ fn
499+ { nil , _manager , event } , acc ->
500+ fun . ( event , acc )
501+ { ref , manager , event } , acc ->
502+ acc = fun . ( event , acc )
503+ send manager , { ref , :done }
504+ acc
505+ end
506+ end
507+
508+ defp start ( % { manager: manager , id: id , duration: duration , mode: mode } = stream ) do
509+ { mon_pid , mon_ref } = add_handler ( mode , manager , id , duration )
482510 send mon_pid , { :UP , mon_ref , self ( ) }
483511
484512 receive do
485513 # The subscription process gave us a go.
486514 { :UP , ^ mon_ref , manager_pid } ->
487515 { mon_ref , manager_pid }
488- # The subscription process died due to an abnormal reason.
516+ # The subscription process died due to an abnormal reason.
489517 { :DOWN , ^ mon_ref , _ , _ , reason } ->
490518 exit ( { reason , { __MODULE__ , :start , [ stream ] } } )
491519 end
492520 end
493521
494- defp next ( % { timeout: timeout } = stream , { mon_ref , _manager_pid } = acc ) do
522+ defp next ( % { timeout: timeout } = stream , { mon_ref , manager_pid } = acc ) do
495523 receive do
496- { ^ mon_ref , event } -> { event , acc }
497- { :DOWN , ^ mon_ref , _ , _ , :normal } -> nil
524+ { ^ mon_ref , sync_ref , event } ->
525+ { { sync_ref , manager_pid , event } , acc }
526+ { :DOWN , ^ mon_ref , _ , _ , :normal } ->
527+ nil
498528 { :DOWN , ^ mon_ref , _ , _ , reason } ->
499529 exit ( { reason , { __MODULE__ , :next , [ stream , acc ] } } )
500530 after
@@ -508,7 +538,7 @@ defimpl Enumerable, for: GenEvent do
508538 flush_events ( mon_ref )
509539 end
510540
511- defp add_handler ( manager , id , duration ) do
541+ defp add_handler ( mode , manager , id , duration ) do
512542 parent = self ( )
513543
514544 # The subscription is managed by another process, that dies if
@@ -530,7 +560,7 @@ defimpl Enumerable, for: GenEvent do
530560
531561 cancel = cancel_ref ( id , mon_ref )
532562 :ok = :gen_event . add_sup_handler ( manager , { __MODULE__ , cancel } ,
533- { self ( ) , parent , mon_ref } )
563+ { mode , self ( ) , parent , mon_ref } )
534564
535565 receive do
536566 # This message is already in the mailbox if we got this far.
@@ -581,7 +611,8 @@ defimpl Enumerable, for: GenEvent do
581611
582612 defp flush_events ( mon_ref ) do
583613 receive do
584- { ^ mon_ref , _ } -> flush_events ( mon_ref )
614+ { ^ mon_ref , _ , _ } ->
615+ flush_events ( mon_ref )
585616 after
586617 0 -> :ok
587618 end
0 commit comments