Skip to content

Commit 0be9946

Browse files
committed
trap on blocking call in sync task before return
This implements a spec change (PR pending) such that tasks created for calls to synchronous exports may not call potentially-blocking imports or return `wait` or `poll` callback codes prior to returning a value. Specifically, the following are prohibited in that scenario: - returning callback-code.{wait,poll} - sync calling an async import - sync calling subtask.cancel - sync calling {stream,future}.{read,write} - sync calling {stream,future}.cancel-{read,write} - calling waitable-set.{wait,poll} - calling thread.suspend This breaks a number of tests, which will be addressed in follow-up commits: - The `{tcp,udp}-socket.bind` implementation in `wasmtime-wasi` is implemented using `Linker::func_wrap_concurrent` and thus assumed to be async, whereas the WIT interface says they're sync, leading to a type mismatch error at runtime. Alex and I have discussed this and have a general plan to address it. - A number of tests in the tests/component-model submodule that points to the spec repo are failing. Those will presumably be fixed as part of the upcoming spec PR (although some could be due to bugs in this implementation, in which case I'll fix them). - A number of tests in tests/misc_testsuite are failing. I'll address those in a follow-up commit. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent a7edc9b commit 0be9946

File tree

10 files changed

+141
-21
lines changed

10 files changed

+141
-21
lines changed

crates/environ/src/component.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ macro_rules! foreach_builtin_component_function {
132132
caller_instance: u32,
133133
callee_instance: u32,
134134
task_return_type: u32,
135+
callee_async: u32,
135136
string_encoding: u32,
136137
result_count_or_max_if_async: u32,
137138
storage: ptr_u8,

crates/environ/src/fact.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub static PREPARE_CALL_FIXED_PARAMS: &[ValType] = &[
4747
ValType::I32, // caller_instance
4848
ValType::I32, // callee_instance
4949
ValType::I32, // task_return_type
50+
ValType::I32, // callee_async
5051
ValType::I32, // string_encoding
5152
ValType::I32, // result_count_or_max_if_async
5253
];

crates/environ/src/fact/trampoline.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,11 @@ impl<'a, 'b> Compiler<'a, 'b> {
544544
self.instruction(I32Const(
545545
i32::try_from(self.types[adapter.lift.ty].results.as_u32()).unwrap(),
546546
));
547+
self.instruction(I32Const(if self.types[adapter.lift.ty].async_ {
548+
1
549+
} else {
550+
0
551+
}));
547552
self.instruction(I32Const(i32::from(
548553
adapter.lift.options.string_encoding as u8,
549554
)));

crates/environ/src/trap_encoding.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ pub enum Trap {
112112
/// scenario where a component instance tried to call an import or intrinsic
113113
/// when it wasn't allowed to, e.g. from a post-return function.
114114
CannotLeaveComponent,
115+
116+
/// A synchronous task attempted to make a potentially blocking call.
117+
CannotBlockSyncTask,
115118
// if adding a variant here be sure to update the `check!` macro below
116119
}
117120

@@ -154,6 +157,7 @@ impl Trap {
154157
DisabledOpcode
155158
AsyncDeadlock
156159
CannotLeaveComponent
160+
CannotBlockSyncTask
157161
}
158162

159163
None
@@ -190,6 +194,7 @@ impl fmt::Display for Trap {
190194
DisabledOpcode => "pulley opcode disabled at compile time was executed",
191195
AsyncDeadlock => "deadlock detected: event loop cannot make further progress",
192196
CannotLeaveComponent => "cannot leave component instance",
197+
CannotBlockSyncTask => "cannot block a synchronous task",
193198
};
194199
write!(f, "wasm trap: {desc}")
195200
}

crates/misc/component-async-tests/wit/test.wit

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ interface resource-stream {
122122
foo: func();
123123
}
124124

125-
foo: func(count: u32) -> stream<x>;
125+
foo: async func(count: u32) -> stream<x>;
126126
}
127127

128128
interface closed {
@@ -157,7 +157,7 @@ interface cancel {
157157
leak-task-after-cancel,
158158
}
159159

160-
run: func(mode: mode, cancel-delay-millis: u64);
160+
run: async func(mode: mode, cancel-delay-millis: u64);
161161
}
162162

163163
interface intertask {

crates/test-programs/src/bin/async_read_resource_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct Component;
1515
impl Guest for Component {
1616
async fn run() {
1717
let mut count = 7;
18-
let mut stream = resource_stream::foo(count);
18+
let mut stream = resource_stream::foo(count).await;
1919

2020
while let Some(x) = stream.next().await {
2121
if count > 0 {

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,12 @@ pub(crate) enum WaitResult {
705705
Completed,
706706
}
707707

708+
/// Raise a trap if the calling task is synchronous and trying to block prior to
709+
/// returning a value.
710+
pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
711+
store.concurrent_state_mut().check_blocking()
712+
}
713+
708714
/// Poll the specified future until it completes on behalf of a guest->host call
709715
/// using a sync-lowered import.
710716
///
@@ -1643,6 +1649,8 @@ impl Instance {
16431649
}));
16441650
}
16451651
callback_code::WAIT | callback_code::POLL => {
1652+
state.check_blocking_for(guest_thread.task)?;
1653+
16461654
let set = get_set(store, set)?;
16471655
let state = store.concurrent_state_mut();
16481656

@@ -2038,6 +2046,7 @@ impl Instance {
20382046
caller_instance: RuntimeComponentInstanceIndex,
20392047
callee_instance: RuntimeComponentInstanceIndex,
20402048
task_return_type: TypeTupleIndex,
2049+
callee_async: bool,
20412050
memory: *mut VMMemoryDefinition,
20422051
string_encoding: u8,
20432052
caller_info: CallerInfo,
@@ -2182,6 +2191,7 @@ impl Instance {
21822191
},
21832192
None,
21842193
callee_instance,
2194+
callee_async,
21852195
)?;
21862196

21872197
let guest_task = state.push(new_task)?;
@@ -2849,6 +2859,10 @@ impl Instance {
28492859
set: u32,
28502860
payload: u32,
28512861
) -> Result<u32> {
2862+
if !self.options(store, options).async_ {
2863+
store.concurrent_state_mut().check_blocking()?;
2864+
}
2865+
28522866
self.id().get(store).check_may_leave(caller)?;
28532867
let &CanonicalOptions {
28542868
cancellable,
@@ -2878,6 +2892,10 @@ impl Instance {
28782892
set: u32,
28792893
payload: u32,
28802894
) -> Result<u32> {
2895+
if !self.options(store, options).async_ {
2896+
store.concurrent_state_mut().check_blocking()?;
2897+
}
2898+
28812899
self.id().get(store).check_may_leave(caller)?;
28822900
let &CanonicalOptions {
28832901
cancellable,
@@ -3057,6 +3075,11 @@ impl Instance {
30573075
yielding: bool,
30583076
to_thread: Option<u32>,
30593077
) -> Result<WaitResult> {
3078+
if to_thread.is_none() && !yielding {
3079+
// This is a `thread.suspend` call
3080+
store.concurrent_state_mut().check_blocking()?;
3081+
}
3082+
30603083
// There could be a pending cancellation from a previous uncancellable wait
30613084
if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
30623085
return Ok(WaitResult::Cancelled);
@@ -3186,6 +3209,10 @@ impl Instance {
31863209
async_: bool,
31873210
task_id: u32,
31883211
) -> Result<u32> {
3212+
if !async_ {
3213+
store.concurrent_state_mut().check_blocking()?;
3214+
}
3215+
31893216
self.id().get(store).check_may_leave(caller_instance)?;
31903217
let (rep, is_host) =
31913218
self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
@@ -3346,6 +3373,7 @@ pub trait VMComponentAsyncStore {
33463373
caller_instance: RuntimeComponentInstanceIndex,
33473374
callee_instance: RuntimeComponentInstanceIndex,
33483375
task_return_type: TypeTupleIndex,
3376+
callee_async: bool,
33493377
string_encoding: u8,
33503378
result_count: u32,
33513379
storage: *mut ValRaw,
@@ -3505,6 +3533,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
35053533
caller_instance: RuntimeComponentInstanceIndex,
35063534
callee_instance: RuntimeComponentInstanceIndex,
35073535
task_return_type: TypeTupleIndex,
3536+
callee_async: bool,
35083537
string_encoding: u8,
35093538
result_count_or_max_if_async: u32,
35103539
storage: *mut ValRaw,
@@ -3523,6 +3552,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
35233552
caller_instance,
35243553
callee_instance,
35253554
task_return_type,
3555+
callee_async,
35263556
memory,
35273557
string_encoding,
35283558
match result_count_or_max_if_async {
@@ -4063,6 +4093,9 @@ pub(crate) struct GuestTask {
40634093
/// The state of the host future that represents an async task, which must
40644094
/// be dropped before we can delete the task.
40654095
host_future_state: HostFutureState,
4096+
/// Indicates whether this task was created for a call to an async-lifted
4097+
/// export.
4098+
async_function: bool,
40664099
}
40674100

40684101
impl GuestTask {
@@ -4103,6 +4136,7 @@ impl GuestTask {
41034136
caller: Caller,
41044137
callback: Option<CallbackFn>,
41054138
component_instance: RuntimeComponentInstanceIndex,
4139+
async_function: bool,
41064140
) -> Result<Self> {
41074141
let sync_call_set = state.push(WaitableSet::default())?;
41084142
let host_future_state = match &caller {
@@ -4137,6 +4171,7 @@ impl GuestTask {
41374171
exited: false,
41384172
threads: HashSet::new(),
41394173
host_future_state,
4174+
async_function,
41404175
})
41414176
}
41424177

@@ -4750,6 +4785,20 @@ impl ConcurrentState {
47504785
false
47514786
}
47524787
}
4788+
4789+
fn check_blocking(&mut self) -> Result<()> {
4790+
let task = self.guest_thread.unwrap().task;
4791+
self.check_blocking_for(task)
4792+
}
4793+
4794+
fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4795+
let task = self.get_mut(task).unwrap();
4796+
if !(task.async_function || task.returned_or_cancelled()) {
4797+
Err(Trap::CannotBlockSyncTask.into())
4798+
} else {
4799+
Ok(())
4800+
}
4801+
}
47534802
}
47544803

47554804
/// Provide a type hint to compiler about the shape of a parameter lower
@@ -4908,7 +4957,9 @@ pub(crate) fn prepare_call<T, R>(
49084957

49094958
let instance = handle.instance().id().get(store.0);
49104959
let options = &instance.component().env_component().options[options];
4911-
let task_return_type = instance.component().types()[ty].results;
4960+
let ty = &instance.component().types()[ty];
4961+
let async_function = ty.async_;
4962+
let task_return_type = ty.results;
49124963
let component_instance = raw_options.instance;
49134964
let callback = options.callback.map(|i| instance.runtime_callback(i));
49144965
let memory = options
@@ -4965,6 +5016,7 @@ pub(crate) fn prepare_call<T, R>(
49655016
) as CallbackFn
49665017
}),
49675018
component_instance,
5019+
async_function,
49685020
)?;
49695021
task.function_index = Some(handle.index());
49705022

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3093,6 +3093,10 @@ impl Instance {
30933093
address: u32,
30943094
count: u32,
30953095
) -> Result<ReturnCode> {
3096+
if !self.options(store.0, options).async_ {
3097+
store.0.concurrent_state_mut().check_blocking()?;
3098+
}
3099+
30963100
let address = usize::try_from(address).unwrap();
30973101
let count = usize::try_from(count).unwrap();
30983102
self.check_bounds(store.0, options, ty, address, count)?;
@@ -3315,6 +3319,10 @@ impl Instance {
33153319
address: u32,
33163320
count: u32,
33173321
) -> Result<ReturnCode> {
3322+
if !self.options(store.0, options).async_ {
3323+
store.0.concurrent_state_mut().check_blocking()?;
3324+
}
3325+
33183326
let address = usize::try_from(address).unwrap();
33193327
let count = usize::try_from(count).unwrap();
33203328
self.check_bounds(store.0, options, ty, address, count)?;
@@ -3689,6 +3697,10 @@ impl Instance {
36893697
async_: bool,
36903698
writer: u32,
36913699
) -> Result<ReturnCode> {
3700+
if !async_ {
3701+
store.concurrent_state_mut().check_blocking()?;
3702+
}
3703+
36923704
let (rep, state) =
36933705
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
36943706
let id = TableId::<TransmitHandle>::new(rep);
@@ -3723,6 +3735,10 @@ impl Instance {
37233735
async_: bool,
37243736
reader: u32,
37253737
) -> Result<ReturnCode> {
3738+
if !async_ {
3739+
store.concurrent_state_mut().check_blocking()?;
3740+
}
3741+
37263742
let (rep, state) =
37273743
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
37283744
let id = TableId::<TransmitHandle>::new(rep);

0 commit comments

Comments
 (0)