Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1337,19 +1337,34 @@ end

using Random: randstring

# Exit handler state
const shutting_down = Threads.Atomic{Bool}(false)

function atexit_handler()
if inited[]
terminate_all_workers()
end

shutting_down[] = true
@lock any_gc_flag notify(any_gc_flag)
if !isnothing(gc_msgs_task)
wait(gc_msgs_task::Task)
end
end

# do initialization that's only needed when there is more than 1 processor
const inited = Threads.Atomic{Bool}(false)
function init_multi()
if !Threads.atomic_cas!(inited, false, true)
push!(Base.package_callbacks, _require_callback)
atexit(terminate_all_workers)
init_bind_addr()
cluster_cookie(randstring(HDR_COOKIE_LEN))
end
return nothing
end

function init_parallel()
atexit(atexit_handler)
start_gc_msgs_task()

# start in "head node" mode, if worker, will override later.
Expand Down
6 changes: 4 additions & 2 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,12 @@ end
# XXX: Is this worth the additional complexity?
# `flush_gc_msgs` has to iterate over all connected workers.
const any_gc_flag = Threads.Condition()
gc_msgs_task::Union{Task, Nothing} = nothing

function start_gc_msgs_task()
errormonitor(
global gc_msgs_task = errormonitor(
@async begin
while true
while !shutting_down[]
lock(any_gc_flag) do
# this might miss events
wait(any_gc_flag)
Expand Down
Loading