@@ -468,10 +468,15 @@ defimpl Enumerable, for: GenEvent do
468468 sync = Process . monitor ( mon_pid )
469469 send pid , { ref , sync , event }
470470 receive do
471- { ^ sync , :done } -> Process . demonitor ( sync , [ :flush ] )
472- { :DOWN , ^ sync , _ , _ , _ } -> :ok
471+ { ^ sync , :done } ->
472+ Process . demonitor ( sync , [ :flush ] )
473+ :remove_handler
474+ { ^ sync , :next } ->
475+ Process . demonitor ( sync , [ :flush ] )
476+ { :ok , state }
477+ { :DOWN , ^ sync , _ , _ , _ } ->
478+ { :ok , state }
473479 end
474- { :ok , state }
475480 end
476481
477482 def handle_event ( event , { :async , _mon_pid , pid , ref } = state ) do
@@ -499,9 +504,11 @@ defimpl Enumerable, for: GenEvent do
499504 { nil , _manager , event } , acc ->
500505 fun . ( event , acc )
501506 { ref , manager , event } , acc ->
502- acc = fun . ( event , acc )
503- send manager , { ref , :done }
504- acc
507+ try do
508+ fun . ( event , acc )
509+ after
510+ send manager , { ref , :next }
511+ end
505512 end
506513 end
507514
@@ -521,12 +528,12 @@ defimpl Enumerable, for: GenEvent do
521528
522529 defp next ( % { timeout: timeout } = stream , { mon_ref , manager_pid } = acc ) do
523530 receive do
524- { ^ mon_ref , sync_ref , event } ->
525- { { sync_ref , manager_pid , event } , acc }
526531 { :DOWN , ^ mon_ref , _ , _ , :normal } ->
527532 nil
528533 { :DOWN , ^ mon_ref , _ , _ , reason } ->
529534 exit ( { reason , { __MODULE__ , :next , [ stream , acc ] } } )
535+ { ^ mon_ref , sync_ref , event } ->
536+ { { sync_ref , manager_pid , event } , acc }
530537 after
531538 timeout ->
532539 exit ( { :timeout , { __MODULE__ , :next , [ stream , acc ] } } )
@@ -595,18 +602,34 @@ defimpl Enumerable, for: GenEvent do
595602 defp remove_handler ( mon_ref , manager_pid , id ) do
596603 Process . demonitor ( mon_ref , [ :flush ] )
597604 handler = { __MODULE__ , cancel_ref ( id , mon_ref ) }
598- # handler may nolonger be there, if it is the removal will cause the monitor
599- # process to exit. If this returns successfuly then no more events will be
600- # forwarded.
601- _ = :gen_event . delete_handler ( manager_pid , handler , :remove_handler )
602- catch
603- # Do not want to overide the exit reason of the mon_pid so catch errors.
604- # However if the exit is due to a disconnection, exit because messages could
605- # leak if the nodes are reconnected before the manager on the other node
606- # removes the handler. In this case it is very likely that the mon_pid
607- # exited with the same reason.
608- :exit , reason when reason !== { :nodedown , node ( manager_pid ) } ->
609- :ok
605+
606+ { _pid , ref } = spawn_monitor fn ->
607+ try do
608+ # handler may nolonger be there, if it is the removal will cause the monitor
609+ # process to exit. If this returns successfuly then no more events will be
610+ # forwarded.
611+ _ = :gen_event . delete_handler ( manager_pid , handler , :remove_handler )
612+ catch
613+ # Do not want to overide the exit reason of the mon_pid so catch errors.
614+ # However if the exit is due to a disconnection, exit because messages could
615+ # leak if the nodes are reconnected before the manager on the other node
616+ # removes the handler. In this case it is very likely that the mon_pid
617+ # exited with the same reason.
618+ :exit , reason when reason != { :nodedown , node ( manager_pid ) } ->
619+ :ok
620+ end
621+ end
622+
623+ receive do
624+ { ^ mon_ref , sync , _ } when sync != nil ->
625+ send ( manager_pid , { sync , :done } )
626+ Process . demonitor ( ref , [ :flush ] )
627+ :ok
628+ { :DOWN , ^ ref , _ , _ , :normal } ->
629+ :ok
630+ { :DOWN , ^ ref , _ , _ , other } ->
631+ exit ( other )
632+ end
610633 end
611634
612635 defp flush_events ( mon_ref ) do
0 commit comments