|
1 | 1 | use crate::async_functions::{PollOnce, execute_across_threads}; |
2 | 2 | use anyhow::Result; |
| 3 | +use std::pin::Pin; |
| 4 | +use std::task::{Context, Poll}; |
3 | 5 | use wasmtime::{AsContextMut, Config, component::*}; |
4 | 6 | use wasmtime::{Engine, Store, StoreContextMut, Trap}; |
5 | 7 | use wasmtime_component_util::REALLOC_AND_FREE; |
@@ -670,3 +672,98 @@ async fn task_deletion() -> Result<()> { |
670 | 672 |
|
671 | 673 | Ok(()) |
672 | 674 | } |
| 675 | + |
| 676 | +#[tokio::test] |
| 677 | +#[cfg_attr(miri, ignore)] |
| 678 | +async fn cancel_host_future() -> Result<()> { |
| 679 | + let mut config = Config::new(); |
| 680 | + config.async_support(true); |
| 681 | + config.wasm_component_model_async(true); |
| 682 | + let engine = Engine::new(&config)?; |
| 683 | + |
| 684 | + let component = Component::new( |
| 685 | + &engine, |
| 686 | + r#" |
| 687 | +(component |
| 688 | + (core module $libc (memory (export "memory") 1)) |
| 689 | + (core instance $libc (instantiate $libc)) |
| 690 | + (core module $m |
| 691 | + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) |
| 692 | + (import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32))) |
| 693 | + (memory (export "memory") 1) |
| 694 | +
|
| 695 | + (func (export "run") (param i32) |
| 696 | + ;; read/cancel attempt 1 |
| 697 | + (call $future.read (local.get 0) (i32.const 100)) |
| 698 | + i32.const -1 ;; BLOCKED |
| 699 | + i32.ne |
| 700 | + if unreachable end |
| 701 | +
|
| 702 | + (call $future.cancel-read (local.get 0)) |
| 703 | + i32.const 2 ;; CANCELLED |
| 704 | + i32.ne |
| 705 | + if unreachable end |
| 706 | +
|
| 707 | + ;; read/cancel attempt 2 |
| 708 | + (call $future.read (local.get 0) (i32.const 100)) |
| 709 | + i32.const -1 ;; BLOCKED |
| 710 | + i32.ne |
| 711 | + if unreachable end |
| 712 | +
|
| 713 | + (call $future.cancel-read (local.get 0)) |
| 714 | + i32.const 2 ;; CANCELLED |
| 715 | + i32.ne |
| 716 | + if unreachable end |
| 717 | + ) |
| 718 | + ) |
| 719 | +
|
| 720 | + (type $f (future u32)) |
| 721 | + (core func $future.read (canon future.read $f async (memory $libc "memory"))) |
| 722 | + (core func $future.cancel-read (canon future.cancel-read $f)) |
| 723 | +
|
| 724 | + (core instance $i (instantiate $m |
| 725 | + (with "" (instance |
| 726 | + (export "future.read" (func $future.read)) |
| 727 | + (export "future.cancel-read" (func $future.cancel-read)) |
| 728 | + )) |
| 729 | + )) |
| 730 | +
|
| 731 | + (func (export "run") (param "f" $f) |
| 732 | + (canon lift |
| 733 | + (core func $i "run") |
| 734 | + (memory $libc "memory") |
| 735 | + ) |
| 736 | + ) |
| 737 | +) |
| 738 | + "#, |
| 739 | + )?; |
| 740 | + |
| 741 | + let mut store = Store::new(&engine, ()); |
| 742 | + let instance = Linker::new(&engine) |
| 743 | + .instantiate_async(&mut store, &component) |
| 744 | + .await?; |
| 745 | + let func = instance.get_typed_func::<(FutureReader<u32>,), ()>(&mut store, "run")?; |
| 746 | + let reader = FutureReader::new(&mut store, MyFutureReader); |
| 747 | + func.call_async(&mut store, (reader,)).await?; |
| 748 | + |
| 749 | + return Ok(()); |
| 750 | + |
| 751 | + struct MyFutureReader; |
| 752 | + |
| 753 | + impl FutureProducer<()> for MyFutureReader { |
| 754 | + type Item = u32; |
| 755 | + |
| 756 | + fn poll_produce( |
| 757 | + self: Pin<&mut Self>, |
| 758 | + _cx: &mut Context<'_>, |
| 759 | + _store: StoreContextMut<()>, |
| 760 | + finish: bool, |
| 761 | + ) -> Poll<Result<Option<Self::Item>>> { |
| 762 | + if finish { |
| 763 | + Poll::Ready(Ok(None)) |
| 764 | + } else { |
| 765 | + Poll::Pending |
| 766 | + } |
| 767 | + } |
| 768 | + } |
| 769 | +} |
0 commit comments