Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Compiler/src/tfuncs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ end
@nospecs function pointerset_tfunc(𝕃::AbstractLattice, a, v, i, align)
return a
end
@nospecs function atomic_fence_tfunc(𝕃::AbstractLattice, order)
@nospecs function atomic_fence_tfunc(𝕃::AbstractLattice, order, syncscope)
return Nothing
end
@nospecs function atomic_pointerref_tfunc(𝕃::AbstractLattice, a, order)
Expand Down Expand Up @@ -757,7 +757,7 @@ add_tfunc(add_ptr, 2, 2, pointerarith_tfunc, 1)
add_tfunc(sub_ptr, 2, 2, pointerarith_tfunc, 1)
add_tfunc(pointerref, 3, 3, pointerref_tfunc, 4)
add_tfunc(pointerset, 4, 4, pointerset_tfunc, 5)
add_tfunc(atomic_fence, 1, 1, atomic_fence_tfunc, 4)
add_tfunc(atomic_fence, 2, 2, atomic_fence_tfunc, 4)
add_tfunc(atomic_pointerref, 2, 2, atomic_pointerref_tfunc, 4)
add_tfunc(atomic_pointerset, 3, 3, atomic_pointerset_tfunc, 5)
add_tfunc(atomic_pointerswap, 3, 3, atomic_pointerswap_tfunc, 5)
Expand Down
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ Command-line option changes
Multi-threading changes
-----------------------

- New functions `Threads.atomic_fence_heavy` and `Threads.atomic_fence_light` provide support for
asymmetric atomic fences, speeding up atomic synchronization where one side of the synchronization
runs significantly less often than the other ([#60311]).

Build system changes
--------------------

Expand Down
2 changes: 1 addition & 1 deletion base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ function _trywait(t::Union{Timer, AsyncCondition})
set = t.set
if set
# full barrier now for AsyncCondition
t isa Timer || Core.Intrinsics.atomic_fence(:acquire_release)
t isa Timer || Core.Intrinsics.atomic_fence(:acquire_release, :system)
else
if !isopen(t)
set = t.set
Expand Down
28 changes: 26 additions & 2 deletions base/atomics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export
atomic_add!, atomic_sub!,
atomic_and!, atomic_nand!, atomic_or!, atomic_xor!,
atomic_max!, atomic_min!,
atomic_fence
atomic_fence, atomic_fence_light, atomic_fence_heavy

"""
Threads.Atomic{T}
Expand Down Expand Up @@ -329,4 +329,28 @@ fences should not be necessary in most cases.

For further details, see LLVM's `fence` instruction.
"""
atomic_fence() = Core.Intrinsics.atomic_fence(:sequentially_consistent)
atomic_fence() = Core.Intrinsics.atomic_fence(:sequentially_consistent, :system)

"""
Threads.atomic_fence_light()

Insert the light side of an asymmetric sequential-consistency memory fence.
Asymmetric memory fences are useful in scenarios where one side of the
synchronization runs significantly less often than the other side. Use this
function on the side that runs often and [`atomic_fence_heavy`](@ref) on the
side that runs rarely.

On supported operating systems and architectures this fence is cheaper than
`Threads.atomic_fence()`, but synchronizes only with [`atomic_fence_heavy`](@ref)
calls from other threads.
"""
atomic_fence_light() = Core.Intrinsics.atomic_fence(:sequentially_consistent, :singlethread)

"""
Threads.atomic_fence_heavy()

Insert the heavy side of an asymmetric sequential-consistency memory fence.
Use this function on the side that runs rarely.
See [`atomic_fence_light`](@ref) for more details.
"""
atomic_fence_heavy() = ccall(:jl_membarrier, Cvoid, ())
2 changes: 2 additions & 0 deletions doc/src/base/multi-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Base.Threads.atomic_xor!
Base.Threads.atomic_max!
Base.Threads.atomic_min!
Base.Threads.atomic_fence
Base.Threads.atomic_fence_heavy
Base.Threads.atomic_fence_light
```

## ccall using a libuv threadpool (Experimental)
Expand Down
2 changes: 2 additions & 0 deletions src/ast.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ void jl_init_common_symbols(void)
jl_atomic_sym = jl_symbol("atomic");
jl_not_atomic_sym = jl_symbol("not_atomic");
jl_unordered_sym = jl_symbol("unordered");
jl_singlethread_sym = jl_symbol("singlethread");
jl_system_sym = jl_symbol("system");
jl_monotonic_sym = jl_symbol("monotonic");
jl_acquire_sym = jl_symbol("acquire");
jl_release_sym = jl_symbol("release");
Expand Down
15 changes: 12 additions & 3 deletions src/intrinsics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,26 @@ static jl_cgval_t emit_pointerarith(jl_codectx_t &ctx, intrinsic f,
static jl_cgval_t emit_atomicfence(jl_codectx_t &ctx, ArrayRef<jl_cgval_t> argv)
{
const jl_cgval_t &ord = argv[0];
const jl_cgval_t &ssid_arg = argv[1];
llvm::SyncScope::ID ssid = llvm::SyncScope::System;
if (!ssid_arg.constant || !jl_is_symbol(ssid_arg.constant) ||
((jl_sym_t*)ssid_arg.constant != jl_singlethread_sym &&
(jl_sym_t*)ssid_arg.constant != jl_system_sym)) {
return emit_runtime_call(ctx, atomic_fence, argv, 2);
}
if ((jl_sym_t*)ssid_arg.constant == jl_singlethread_sym)
ssid = llvm::SyncScope::SingleThread;
if (ord.constant && jl_is_symbol(ord.constant)) {
enum jl_memory_order order = jl_get_atomic_order((jl_sym_t*)ord.constant, true, true);
if (order == jl_memory_order_invalid) {
emit_atomic_error(ctx, "invalid atomic ordering");
return jl_cgval_t(); // unreachable
}
if (order > jl_memory_order_monotonic)
ctx.builder.CreateFence(get_llvm_atomic_order(order));
ctx.builder.CreateFence(get_llvm_atomic_order(order), ssid);
return ghostValue(ctx, jl_nothing_type);
}
return emit_runtime_call(ctx, atomic_fence, argv, 1);
return emit_runtime_call(ctx, atomic_fence, argv, 2);
}

static jl_cgval_t emit_atomic_pointerref(jl_codectx_t &ctx, ArrayRef<jl_cgval_t> argv)
Expand Down Expand Up @@ -1339,7 +1348,7 @@ static jl_cgval_t emit_intrinsic(jl_codectx_t &ctx, intrinsic f, jl_value_t **ar

case atomic_fence:
++Emitted_atomic_fence;
assert(nargs == 1);
assert(nargs == 2);
return emit_atomicfence(ctx, argv);
case atomic_pointerref:
++Emitted_atomic_pointerref;
Expand Down
2 changes: 1 addition & 1 deletion src/intrinsics.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
ADD_I(pointerref, 3) \
ADD_I(pointerset, 4) \
/* pointer atomics */ \
ADD_I(atomic_fence, 1) \
ADD_I(atomic_fence, 2) \
ADD_I(atomic_pointerref, 2) \
ADD_I(atomic_pointerset, 3) \
ADD_I(atomic_pointerswap, 3) \
Expand Down
1 change: 1 addition & 0 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@
XX(jl_vprintf) \
XX(jl_wakeup_thread) \
XX(jl_write_compiler_output) \
XX(jl_membarrier) \

#define JL_RUNTIME_EXPORTED_FUNCS_WIN(XX) \
XX(jl_setjmp) \
Expand Down
4 changes: 3 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,7 @@ STATIC_INLINE int is_valid_intrinsic_elptr(jl_value_t *ety)
JL_DLLEXPORT jl_value_t *jl_bitcast(jl_value_t *ty, jl_value_t *v);
JL_DLLEXPORT jl_value_t *jl_pointerref(jl_value_t *p, jl_value_t *i, jl_value_t *align);
JL_DLLEXPORT jl_value_t *jl_pointerset(jl_value_t *p, jl_value_t *x, jl_value_t *align, jl_value_t *i);
JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order);
JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order, jl_value_t *syncscope);
JL_DLLEXPORT jl_value_t *jl_atomic_pointerref(jl_value_t *p, jl_value_t *order);
JL_DLLEXPORT jl_value_t *jl_atomic_pointerset(jl_value_t *p, jl_value_t *x, jl_value_t *order);
JL_DLLEXPORT jl_value_t *jl_atomic_pointerswap(jl_value_t *p, jl_value_t *x, jl_value_t *order);
Expand Down Expand Up @@ -2010,6 +2010,8 @@ JL_DLLEXPORT int jl_isabspath(const char *in) JL_NOTSAFEPOINT;
XX(uninferred_sym) \
XX(unordered_sym) \
XX(unused_sym) \
XX(singlethread_sym) \
XX(system_sym)

#define XX(name) extern JL_DLLEXPORT jl_sym_t *jl_##name;
JL_COMMON_SYMBOLS(XX)
Expand Down
9 changes: 8 additions & 1 deletion src/runtime_intrinsics.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,10 +622,17 @@ JL_DLLEXPORT jl_value_t *jl_atomic_pointerreplace(jl_value_t *p, jl_value_t *exp
return result;
}

JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order_sym)
JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order_sym, jl_value_t *syncscope_sym)
{
JL_TYPECHK(fence, symbol, order_sym);
JL_TYPECHK(fence, symbol, syncscope_sym);
enum jl_memory_order order = jl_get_atomic_order_checked((jl_sym_t*)order_sym, 1, 1);
if ((jl_sym_t*)syncscope_sym == jl_singlethread_sym) {
asm volatile ("" : : : "memory");
return jl_nothing;
} else if ((jl_sym_t*)syncscope_sym != jl_system_sym) {
jl_error("atomic_fence: invalid syncscope");
}
if (order > jl_memory_order_monotonic)
jl_fence();
return jl_nothing;
Expand Down
44 changes: 44 additions & 0 deletions src/signals-mach.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <mach/clock.h>
#include <mach/clock_types.h>
#include <mach/clock_reply.h>
#include <mach/thread_state.h>
#include <mach/mach_traps.h>
#include <mach/task.h>
#include <mach/mig_errors.h>
Expand Down Expand Up @@ -891,3 +892,46 @@ JL_DLLEXPORT void jl_profile_stop_timer(void)
profile_all_tasks = 0;
uv_mutex_unlock(&bt_data_prof_lock);
}

// The mprotect implementation in signals-unix.c does not work on macOS/aarch64, as mentioned.
// This implementation comes from dotnet, but is similarly dependent on undocumented behavior of the OS.
// Copyright (c) .NET Foundation and Contributors
// MIT LICENSE
JL_DLLEXPORT void jl_membarrier(void) {
uintptr_t sp;
uintptr_t registerValues[128];
kern_return_t machret;

// Iterate through each of the threads in the list.
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
for (int tid = 0; tid < nthreads; tid++) {
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
thread_act_t thread = pthread_mach_thread_np(ptls2->system_id);
if (__builtin_available (macOS 10.14, iOS 12, tvOS 9, *))
{
// Request the threads pointer values to force the thread to emit a memory barrier
size_t registers = 128;
machret = thread_get_register_pointer_values(thread, &sp, &registers, registerValues);
}
else
{
// fallback implementation for older OS versions
#if defined(_CPU_X86_64_)
x86_thread_state64_t threadState;
mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
machret = thread_get_state(thread, x86_THREAD_STATE64, (thread_state_t)&threadState, &count);
#elif defined(_CPU_AARCH64_)
arm_thread_state64_t threadState;
mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
machret = thread_get_state(thread, ARM_THREAD_STATE64, (thread_state_t)&threadState, &count);
#else
#error Unexpected architecture
#endif
}

if (machret == KERN_INSUFFICIENT_BUFFER_SIZE)
{
HANDLE_MACH_ERROR("thread_get_register_pointer_values()", machret);
}
}
}
126 changes: 126 additions & 0 deletions src/signals-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -1274,3 +1274,129 @@ JL_DLLEXPORT int jl_repl_raise_sigtstp(void)
{
return raise(SIGTSTP);
}

#if !defined(_OS_DARWIN_)
// Implementation of the `mprotect` based membarrier fallback.
// This is a common fallback based on the observation that `mprotect` happens to
// issue the necessary memory barriers. However, there is no spec that
// guarantees this behavior, and indeed AArch64 Darwin does not (so we don't use it
// there). However, we only use it as a fallback here for older versions of
// Linux and FreeBSD where we know that it happens to work. We also use it as a
// fallback for unknown Unix systems under the assumption that it will work,
// but this is not guaranteed.
static pthread_mutex_t mprotect_barrier_lock = PTHREAD_MUTEX_INITIALIZER;
static _Atomic(uint64_t) *mprotect_barrier_page = NULL;
static void jl_init_mprotect_membarrier(void)
{
int result = pthread_mutex_lock(&mprotect_barrier_lock);
assert(result == 0);
if (mprotect_barrier_page == NULL) {
size_t pagesize = jl_getpagesize();

mprotect_barrier_page = (_Atomic(uint64_t) *)
mmap(NULL, pagesize, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (mprotect_barrier_page == MAP_FAILED) {
jl_safe_printf("fatal: failed to allocate barrier page.\n");
abort();
}
result = mlock(mprotect_barrier_page, pagesize);
if (result != 0) {
jl_safe_printf("fatal: failed to mlock barrier page (try increasing RLIMIT_MEMLOCK with `ulimit -l`).\n");
abort();
}
}
result = pthread_mutex_unlock(&mprotect_barrier_lock);
assert(result == 0);
(void)result;
}

static void jl_mprotect_membarrier(void)
{
int result = pthread_mutex_lock(&mprotect_barrier_lock);
assert(result == 0);
size_t pagesize = jl_getpagesize();
result = mprotect(mprotect_barrier_page, pagesize, PROT_READ | PROT_WRITE);
jl_atomic_fetch_add_relaxed(mprotect_barrier_page, 1);
assert(result == 0);
result = mprotect(mprotect_barrier_page, pagesize, PROT_NONE);
assert(result == 0);
result = pthread_mutex_unlock(&mprotect_barrier_lock);
assert(result == 0);
(void)result;
}
#endif

// Linux and FreeBSD have compatible membarrier support
#if defined(_OS_LINUX_) || defined(_OS_FREEBSD_)
#if defined(_OS_LINUX_)
# include <sys/syscall.h>
# if defined(__NR_membarrier)
enum membarrier_cmd {
MEMBARRIER_CMD_QUERY = 0,
MEMBARRIER_CMD_PRIVATE_EXPEDITED = (1 << 3),
MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED = (1 << 4),
};
# define membarrier(...) syscall(__NR_membarrier, __VA_ARGS__)
# else
# warning "Missing linux kernel headers for membarrier syscall, support disabled"
# define membarrier(...) (errno = ENOSYS, -1)
# endif
#elif defined(_OS_FREEBSD_)
# include <sys/param.h>
# if __FreeBSD_version >= 1401500
# include <sys/membarrier.h>
# else
# define MEMBARRIER_CMD_QUERY 0x00
# define MEMBARRIER_CMD_PRIVATE_EXPEDITED 0x08
# define MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED 0x10
# define membarrier(...) (errno = ENOSYS, -1)
# endif
#endif

// Implementation of `jl_membarrier`
enum membarrier_implementation {
MEMBARRIER_IMPLEMENTATION_UNKNOWN = 0,
MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER = 1,
MEMBARRIER_IMPLEMENTATION_MPROTECT = 2
};

static _Atomic(enum membarrier_implementation) membarrier_impl = MEMBARRIER_IMPLEMENTATION_UNKNOWN;

static enum membarrier_implementation jl_init_membarrier(void) {
int ret = membarrier(MEMBARRIER_CMD_QUERY, 0);
int needed = MEMBARRIER_CMD_PRIVATE_EXPEDITED | MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED;
if (ret > 0 && ((ret & needed) == needed)) {
// supported
if (membarrier(MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0) == 0) {
// working
jl_atomic_store_relaxed(&membarrier_impl, MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER);
return MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER;
}
}
jl_init_mprotect_membarrier();
jl_atomic_store_relaxed(&membarrier_impl, MEMBARRIER_IMPLEMENTATION_MPROTECT);
return MEMBARRIER_IMPLEMENTATION_MPROTECT;
}

JL_DLLEXPORT void jl_membarrier(void) {
enum membarrier_implementation impl = jl_atomic_load_relaxed(&membarrier_impl);
if (impl == MEMBARRIER_IMPLEMENTATION_UNKNOWN) {
impl = jl_init_membarrier();
}
if (impl == MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER) {
int ret = membarrier(MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0);
assert(ret == 0);
(void)ret;
} else {
assert(impl == MEMBARRIER_IMPLEMENTATION_MPROTECT);
jl_mprotect_membarrier();
}
}
#elif !defined(_OS_DARWIN_)
JL_DLLEXPORT void jl_membarrier(void) {
if (!mprotect_barrier_page)
jl_init_mprotect_membarrier();
jl_mprotect_membarrier();
}
#endif
4 changes: 4 additions & 0 deletions src/signals-win.c
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,7 @@ void jl_install_thread_signal_handler(jl_ptls_t ptls)
have_backtrace_fiber = 1;
}
}

JL_DLLEXPORT void jl_membarrier(void) {
FlushProcessWriteBuffers();
}
12 changes: 6 additions & 6 deletions test/intrinsics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,13 @@ end
end

using Base.Experimental: @force_compile
@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(:u)) === nothing
@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(Symbol("u", "x"))) === nothing
@test_throws ConcurrencyViolationError("invalid atomic ordering") Core.Intrinsics.atomic_fence(Symbol("u", "x")) === nothing
@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(:u, :system)) === nothing
@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(Symbol("u", "x"), :system)) === nothing
@test_throws ConcurrencyViolationError("invalid atomic ordering") Core.Intrinsics.atomic_fence(Symbol("u", "x"), :system) === nothing
for order in (:not_atomic, :monotonic, :acquire, :release, :acquire_release, :sequentially_consistent)
@test Core.Intrinsics.atomic_fence(order) === nothing
@test (order -> Core.Intrinsics.atomic_fence(order))(order) === nothing
@test Base.invokelatest(@eval () -> Core.Intrinsics.atomic_fence($(QuoteNode(order)))) === nothing
@test Core.Intrinsics.atomic_fence(order, :system) === nothing
@test (order -> Core.Intrinsics.atomic_fence(order, :system))(order) === nothing
@test Base.invokelatest(@eval () -> Core.Intrinsics.atomic_fence($(QuoteNode(order)), :system)) === nothing
end
@test Core.Intrinsics.atomic_pointerref(C_NULL, :sequentially_consistent) === nothing
@test (@force_compile; Core.Intrinsics.atomic_pointerref(C_NULL, :sequentially_consistent)) === nothing
Expand Down
Loading