Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 63 additions & 65 deletions lib/std/Thread.zig
Original file line number Diff line number Diff line change
Expand Up @@ -904,10 +904,42 @@ const WasiThreadImpl = struct {
allocator: std.mem.Allocator,
/// The current state of the thread.
state: State = State.init(.running),

pub fn finalize(self: *@This(), state: @FieldType(State, "raw")) void {
switch (self.state.swap(state, .seq_cst)) {
.running => {
// reset the Thread ID
asm volatile (
\\ local.get %[ptr]
\\ i32.const 0
\\ i32.atomic.store 0
:
: [ptr] "r" (&self.tid.raw),
);

// Wake the main thread listening to this thread
asm volatile (
\\ local.get %[ptr]
\\ i32.const 1 # waiters
\\ memory.atomic.notify 0
\\ drop # no need to know the waiters
:
: [ptr] "r" (&self.tid.raw),
);
},
.completed => unreachable,
.detached => {
// use free in the vtable so the stack doesn't get set to undefined when optimize = Debug
const free = self.allocator.vtable.free;
const ptr = self.allocator.ptr;
free(ptr, self.memory, std.mem.Alignment.@"1", 0);
},
}
}
};

/// A meta-data structure used to bootstrap a thread
const Instance = struct {
pub const Instance = struct {
thread: WasiThread,
/// Contains the offset to the new __tls_base.
/// The offset starting from the memory's base.
Expand All @@ -922,11 +954,6 @@ const WasiThreadImpl = struct {
/// function upon thread spawn. The above mentioned pointer will be passed
/// to this function pointer as its argument.
call_back: *const fn (usize) void,
/// When a thread is in `detached` state, we must free all of its memory
/// upon thread completion. However, as this is done while still within
/// the thread, we must first jump back to the main thread's stack or else
/// we end up freeing the stack that we're currently using.
original_stack_pointer: [*]u8,
};

const State = std.atomic.Value(enum(u8) { running, completed, detached });
Expand Down Expand Up @@ -1070,7 +1097,6 @@ const WasiThreadImpl = struct {
.stack_offset = stack_offset,
.raw_ptr = @intFromPtr(wrapper),
.call_back = &Wrapper.entry,
.original_stack_pointer = __get_stack_pointer(),
};

const tid = spawnWasiThread(instance);
Expand All @@ -1091,46 +1117,37 @@ const WasiThreadImpl = struct {
}

/// Called by the host environment after thread creation.
fn wasi_thread_start(tid: i32, arg: *Instance) callconv(.c) void {
fn wasi_thread_start(_: i32, _: *Instance) callconv(.naked) void {
comptime assert(!builtin.single_threaded);
__set_stack_pointer(arg.thread.memory.ptr + arg.stack_offset);
__wasm_init_tls(arg.thread.memory.ptr + arg.tls_offset);
@atomicStore(u32, &WasiThreadImpl.tls_thread_id, @intCast(tid), .seq_cst);

// Finished bootstrapping, call user's procedure.
arg.call_back(arg.raw_ptr);

switch (arg.thread.state.swap(.completed, .seq_cst)) {
.running => {
// reset the Thread ID
asm volatile (
\\ local.get %[ptr]
\\ i32.const 0
\\ i32.atomic.store 0
:
: [ptr] "r" (&arg.thread.tid.raw),
);

// Wake the main thread listening to this thread
asm volatile (
\\ local.get %[ptr]
\\ i32.const 1 # waiters
\\ memory.atomic.notify 0
\\ drop # no need to know the waiters
:
: [ptr] "r" (&arg.thread.tid.raw),
);
},
.completed => unreachable,
.detached => {
// restore the original stack pointer so we can free the memory
// without having to worry about freeing the stack
__set_stack_pointer(arg.original_stack_pointer);
// Ensure a copy so we don't free the allocator reference itself
var allocator = arg.thread.allocator;
allocator.free(arg.thread.memory);
},
}
const clothed = struct {
fn run(tid: i32, arg: *Instance) callconv(.c) void {
__wasm_init_tls(arg.thread.memory.ptr + arg.tls_offset);
@atomicStore(u32, &WasiThreadImpl.tls_thread_id, @intCast(tid), .seq_cst);

// Finished bootstrapping, call user's procedure.
arg.call_back(arg.raw_ptr);

// Set thread state and free memory allocated for thread.
arg.thread.finalize(.completed);
}
};
// Set the stack pointer then jump to the "clothed" portion of the function.
asm volatile (
\\ local.get 1
\\ i32.load %[thread_memory]
\\ local.get 1
\\ i32.load %[stack_offset]
\\ i32.add
\\ global.set __stack_pointer
\\ local.get 0
\\ local.get 1
\\ call %[cont]
\\ return
:
: [thread_memory] "X" (@offsetOf(Instance, "thread") + @offsetOf(WasiThread, "memory")),
[stack_offset] "X" (@offsetOf(Instance, "stack_offset")),
[cont] "X" (&clothed.run),
);
}

/// Asks the host to create a new thread for us.
Expand Down Expand Up @@ -1172,25 +1189,6 @@ const WasiThreadImpl = struct {
: [ret] "=r" (-> u32),
);
}

/// Allows for setting the stack pointer in the WebAssembly module.
inline fn __set_stack_pointer(addr: [*]u8) void {
asm volatile (
\\ local.get %[ptr]
\\ global.set __stack_pointer
:
: [ptr] "r" (addr),
);
}

/// Returns the current value of the stack pointer
inline fn __get_stack_pointer() [*]u8 {
return asm (
\\ global.get __stack_pointer
\\ local.set %[stack_ptr]
: [stack_ptr] "=r" (-> [*]u8),
);
}
};

const LinuxThreadImpl = struct {
Expand Down
73 changes: 73 additions & 0 deletions lib/std/Thread/Mutex.zig
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ else if (builtin.os.tag == .windows)
WindowsImpl
else if (builtin.os.tag.isDarwin())
DarwinImpl
else if (builtin.os.tag == .wasi)
WasiImpl
else
FutexImpl;

Expand Down Expand Up @@ -208,6 +210,77 @@ const FutexImpl = struct {
}
};

const WasiImpl = struct {
status: std.atomic.Value(u32) = .{ .raw = free },
wait_count: std.atomic.Value(u32) = .{ .raw = 0 },

const free: u32 = 0; // no one owns the lock
const owned: u32 = 1; // a worker thread has the lock
const seized: u32 = 2; // the main thread either has the lock already or is about to get it
const forfeited: u32 = 3; // the main thread has received the lock from the previous owner

pub fn lock(self: *@This()) void {
if (inMainThread()) {
// announce that the lock will be taken by the main thread
switch (self.status.swap(seized, .acquire)) {
// seizing a free lock
free => {},
// keep spinning until the current owner surrenders it
owned => while (self.status.load(.monotonic) != forfeited) {},
else => unreachable,
}
} else {
while (true) {
// try to get the lock
if (self.status.cmpxchgWeak(free, owned, .acquire, .monotonic)) |status| {
// pause the worker when the lock is not free
if (status != free) {
_ = self.wait_count.fetchAdd(1, .monotonic);
Thread.Futex.wait(&self.status, status);
_ = self.wait_count.fetchSub(1, .monotonic);
}
} else break;
}
}
}

pub fn unlock(self: *@This()) void {
if (inMainThread()) {
// just release the lock
self.status.store(free, .release);
} else {
// release the lock if the worker thread still owns it
if (self.status.cmpxchgStrong(owned, free, .release, .monotonic)) |status| {
switch (status) {
seized => {
// let the spinning main thread take the lock
self.status.store(forfeited, .release);
return;
},
else => unreachable,
}
}
}
if (self.wait_count.load(.monotonic) > 0) {
// awaken a waiting worker thread
Thread.Futex.wake(&self.status, 1);
}
}

pub fn tryLock(self: *@This()) bool {
const new_status: u32 = if (inMainThread()) seized else owned;
return self.status.cmpxchgStrong(free, new_status, .acquire, .monotonic) == null;
}

fn inMainThread() bool {
const root = @import("root");
if (@hasDecl(root, "std_options") and root.std_options.wasi_main_thread_wait) {
return false;
}
return Thread.getCurrentId() == 0;
}
};

test "smoke test" {
var mutex = Mutex{};

Expand Down
14 changes: 11 additions & 3 deletions lib/std/heap/WasmAllocator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ comptime {
if (!builtin.target.cpu.arch.isWasm()) {
@compileError("only available for wasm32 arch");
}
if (!builtin.single_threaded) {
@compileError("TODO implement support for multi-threaded wasm");
}
}

pub const vtable: Allocator.VTable = .{
Expand Down Expand Up @@ -44,10 +41,19 @@ var next_addrs: [size_class_count]usize = @splat(0);
var frees: [size_class_count]usize = @splat(0);
/// For each big size class, points to the freed pointer.
var big_frees: [big_size_class_count]usize = @splat(0);
var mutex: switch (builtin.single_threaded) {
false => std.Thread.Mutex,
true => struct {
inline fn lock(_: *@This()) void {}
inline fn unlock(_: *@This()) void {}
},
} = .{};

fn alloc(ctx: *anyopaque, len: usize, alignment: mem.Alignment, return_address: usize) ?[*]u8 {
_ = ctx;
_ = return_address;
mutex.lock();
defer mutex.unlock();
// Make room for the freelist next pointer.
const actual_len = @max(len +| @sizeOf(usize), alignment.toByteUnits());
const slot_size = math.ceilPowerOfTwo(usize, actual_len) catch return null;
Expand Down Expand Up @@ -127,6 +133,8 @@ fn free(
) void {
_ = ctx;
_ = return_address;
mutex.lock();
defer mutex.unlock();
const buf_align = alignment.toByteUnits();
const actual_len = @max(buf.len + @sizeOf(usize), buf_align);
const slot_size = math.ceilPowerOfTwoAssert(usize, actual_len);
Expand Down
2 changes: 2 additions & 0 deletions lib/std/std.zig
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ pub const Options = struct {

/// Function used to implement `std.fs.cwd` for WASI.
wasiCwd: fn () os.wasi.fd_t = fs.defaultWasiCwd,
/// Availability of synchronous wait in the main thread
wasi_main_thread_wait: bool = false,

/// The current log level.
log_level: log.Level = log.default_level,
Expand Down