From 145269d9d38506c3c45cc632bcd397d90a9fe201 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Mon, 29 Sep 2025 22:53:33 +0200 Subject: [PATCH 01/13] wip - still deadlock due to missing event loop --- Cargo.lock | 423 +++++++++++++++++++++++++++++++++++++++ Cargo.toml | 8 +- src/lib.rs | 41 ++++ src/pyo3_runner.rs | 95 ++++++++- src/rustpython_runner.rs | 3 + src/set_venv.py | 2 +- 6 files changed, 568 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e5aed5..fdc5888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,12 +57,163 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497c00e0fd83a72a79a39fcbd8e3e2f055d6f6c7e025f3b3d91f4f8e76527fb8" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "pin-project-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.5.0", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix 1.1.2", + "slab", + "windows-sys 0.61.1", +] + +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener 5.4.1", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-process" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" +dependencies = [ + "async-channel 2.5.0", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener 5.4.1", + "futures-lite", + "rustix 1.1.2", +] + +[[package]] +name = "async-signal" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 1.1.2", + "signal-hook-registry", + "slab", + "windows-sys 0.61.1", +] + +[[package]] +name = "async-std" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8e079a4ab67ae52b7403632e4618815d6db36d2a010cfe41b02c1b1578f93b" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io", + "async-lock", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async_py" version = "0.2.1" dependencies = [ "dunce", "pyo3", + "pyo3-async-runtimes", "rustpython-stdlib", "rustpython-vm", "serde_json", @@ -80,6 +231,12 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -148,6 +305,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel 2.5.0", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bstr" version = "0.2.17" @@ -236,6 +406,15 @@ dependencies = [ "error-code", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -391,6 +570,33 @@ version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener 5.4.1", + "pin-project-lite", +] + [[package]] name = "exitcode" version = "1.1.2" @@ -430,6 +636,108 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -496,6 +804,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "half" version = "1.8.3" @@ -672,6 +992,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lalrpop-util" version = "0.20.2" @@ -764,6 +1093,9 @@ name = "log" version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +dependencies = [ + "value-bag", +] [[package]] name = "lz4_flex" @@ -1053,6 +1385,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.4" @@ -1126,6 +1464,23 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -1143,6 +1498,20 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi 0.5.2", + "pin-project-lite", + "rustix 1.1.2", + "windows-sys 0.61.1", +] + [[package]] name = "portable-atomic" version = "1.11.1" @@ -1199,6 +1568,31 @@ dependencies = [ "unindent", ] +[[package]] +name = "pyo3-async-runtimes" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6ee6d4cb3e8d5b925f5cdb38da183e0ff18122eb2048d4041c9e7034d026e23" +dependencies = [ + "async-std", + "futures", + "once_cell", + "pin-project-lite", + "pyo3", + "pyo3-async-runtimes-macros", +] + +[[package]] +name = "pyo3-async-runtimes-macros" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c29bc5c673e36a8102d0b9179149c1bb59990d8db4f3ae58bd7dceccab90b951" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pyo3-build-config" version = "0.26.0" @@ -2462,6 +2856,12 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "value-bag" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2531,6 +2931,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e038d41e478cc73bae0ff9b36c60cff1c98b8f38f8d7e8061e79ee63608ac5c" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.104" @@ -2563,6 +2976,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9367c417a924a74cae129e6a2ae3b47fabb1f8995595ab474029da749a8be120" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "which" version = "4.4.2" diff --git a/Cargo.toml b/Cargo.toml index 0a4a628..41e6ea8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ tokio = { version = "1.36.0", features = ["sync", "macros"] } [features] default = ["pyo3"] # default = ["rustpython"] -pyo3 = ["dep:pyo3"] +pyo3 = ["dep:pyo3", "dep:pyo3-async-runtimes"] rustpython = ["dep:rustpython-vm", "dep:rustpython-stdlib"] [dependencies.pyo3] @@ -27,6 +27,12 @@ version = "0.26.0" features = ["auto-initialize"] optional = true + +[dependencies.pyo3-async-runtimes] +version = "0.26" +features = ["attributes", "async-std-runtime"] +optional = true + [dev-dependencies] tempfile = "3" [dependencies.rustpython-vm] diff --git a/src/lib.rs b/src/lib.rs index 28e5f84..8c182df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub(crate) enum CmdType { EvalCode(String), ReadVariable(String), CallFunction { name: String, args: Vec }, + CallAsyncFunction { name: String, args: Vec }, Stop, } /// Represents a command to be sent to the Python execution thread. It includes the @@ -163,6 +164,7 @@ impl PyRunner { /// to access functions within modules (e.g., "my_module.my_function"). /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. /// Returns the function's return value as a `serde_json::Value` on success. + /// Does not release GIL during await. pub async fn call_function( &self, name: &str, @@ -175,6 +177,25 @@ impl PyRunner { .await } + /// Asynchronously calls an async Python function in the interpreter's global scope. + /// + /// * `name`: The name of the function to call. It can be a dot-separated path + /// to access functions within modules (e.g., "my_module.my_function"). + /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. + /// Returns the function's return value as a `serde_json::Value` on success. + /// Will release GIL during await. + pub async fn call_async_function( + &self, + name: &str, + args: Vec, + ) -> Result { + self.send_command(CmdType::CallAsyncFunction { + name: name.into(), + args, + }) + .await + } + /// Stops the Python execution thread gracefully. pub async fn stop(&self) -> Result<(), PyRunnerError> { // We can ignore the `Ok(Value::Null)` result. @@ -267,6 +288,26 @@ def add(a, b): assert_eq!(result, Value::Number(14.into())); } + #[cfg(feature = "pyo3")] + #[tokio::test] + async fn test_run_with_async_function() { + let executor = PyRunner::new(); + let code = r#" +import asyncio + +async def async_add(a, b): + await asyncio.sleep(0.1) + return a + b +"#; + + executor.run(code).await.unwrap(); + let result = executor + .call_async_function("async_add", vec![5.into(), 10.into()]) + .await + .unwrap(); + assert_eq!(result, Value::Number(15.into())); + } + #[tokio::test] async fn test_concurrent_calls() { let executor = PyRunner::new(); diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index 5c867e0..facffa7 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -11,15 +11,34 @@ use pyo3::{ IntoPyObjectExt, }; use serde_json::Value; -use std::ffi::CString; -use tokio::sync::mpsc; +use std::{ffi::CString, future::Future}; +use tokio::{ + runtime::Runtime, + sync::{mpsc, oneshot}, +}; /// The main loop for the Python thread. This function is spawned in a new /// thread and is responsible for all Python interaction. pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { Python::initialize(); Python::attach(|py| { + // Setup and run the asyncio event loop for the current thread. + let asyncio = py.import("asyncio").expect("Failed to import asyncio"); + let event_loop = asyncio + .call_method0("new_event_loop") + .expect("Failed to create new event loop"); + asyncio + .call_method1("set_event_loop", (event_loop.clone(),)) + .expect("Failed to set event loop"); + let locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone()) + .copy_context(py) + .unwrap(); let globals = PyDict::new(py); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime for async Python execution"); + let rt_future = rt.spawn(async {}); while let Some(mut cmd) = py.detach(|| receiver.blocking_recv()) { let result = match std::mem::replace(&mut cmd.cmd_type, CmdType::Stop) { CmdType::RunCode(code) => { @@ -38,6 +57,19 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { CmdType::CallFunction { name, args } => { handle_call_function(py, &globals, name, args) } + CmdType::CallAsyncFunction { name, args } => { + let locals = locals.clone_ref(py); + handle_call_async_function( + py, + &globals, + name, + args, + cmd.responder, + &rt, + locals, + ); + continue; + } CmdType::Stop => break, }; @@ -48,6 +80,10 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { }; let _ = cmd.responder.send(response); } + rt_future.abort(); // Cleanly stop the asyncio event loop before the thread exits. + event_loop + .call_method0("stop") + .expect("Failed to stop event loop"); // After the loop, we can send a final confirmation for the Stop command if needed, // but the current implementation in lib.rs handles the channel closing. }); @@ -116,6 +152,61 @@ fn handle_call_function( py_any_to_json(py, &result) } +fn handle_call_async_pre_await( + py: &Python, + globals: &pyo3::Bound<'_, PyDict>, + name: String, + args: Vec, + locals: pyo3_async_runtimes::TaskLocals, +) -> PyResult>> + Send> { + let func = get_py_object(globals, &name)?; + + if !func.is_callable() { + return Err(PyErr::new::(format!( + "'{}' is not a callable function", + name + ))); + } + + let py_args = args + .into_iter() + .map(|v| json_value_to_pyobject(*py, v)) + .collect::>>()?; + let t_args = pyo3::types::PyTuple::new(*py, py_args)?; + let result = func.call1(t_args)?; + pyo3_async_runtimes::into_future_with_locals(&locals, result) +} + +/// Handles the `CallAsyncFunction` command. +fn handle_call_async_function( + py: Python, + globals: &pyo3::Bound, + name: String, + args: Vec, + responder: oneshot::Sender>, + rt: &Runtime, + locals: pyo3_async_runtimes::TaskLocals, +) { + let result_future = match handle_call_async_pre_await(&py, globals, name, args, locals) { + Ok(fut) => fut, + Err(e) => { + let _ = responder.send(Err(e.to_string())); + return; + } + }; + rt.spawn(async move { + let result = result_future + .await + .map_err(|e| e.to_string()) + .and_then(|py_res| { + Python::attach(|py| { + py_any_to_json(py, &py_res.into_bound(py)).map_err(|e| e.to_string()) + }) + }); + let _ = responder.send(result); + }); +} + /// Recursively converts a Python object to a `serde_json::Value`. fn py_any_to_json(py: Python, obj: &pyo3::Bound<'_, pyo3::PyAny>) -> PyResult { if obj.is_none() { diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index 21d9e0b..ed18bf0 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -47,6 +47,9 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { call_function(vm, scope.clone(), name, args.clone()) .and_then(|obj| py_to_json(vm, &obj)) } + CmdType::CallAsyncFunction { name, args } => { + unimplemented!("Async functions are not supported yet in RustPython") + } CmdType::Stop => break, }; let response = result.map_err(|err| { diff --git a/src/set_venv.py b/src/set_venv.py index d121522..9682654 100644 --- a/src/set_venv.py +++ b/src/set_venv.py @@ -35,4 +35,4 @@ def add_venv_libs_to_syspath(site_packages, with_pth=False): except Exception as e: print(f"Warning: Could not process {pth_file}: {e}") - return site_packages \ No newline at end of file + return site_packages From 17ba6ebc518af527e683320faed22349229676a9 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Fri, 3 Oct 2025 17:19:07 +0200 Subject: [PATCH 02/13] wip - still deadlock --- Cargo.lock | 304 +-------------------------------------------- Cargo.toml | 2 +- src/pyo3_runner.rs | 138 +++++++++----------- 3 files changed, 62 insertions(+), 382 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fdc5888..fc781c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,156 +57,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - -[[package]] -name = "async-channel" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-executor" -version = "1.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497c00e0fd83a72a79a39fcbd8e3e2f055d6f6c7e025f3b3d91f4f8e76527fb8" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "pin-project-lite", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.5.0", - "async-executor", - "async-io", - "async-lock", - "blocking", - "futures-lite", - "once_cell", -] - -[[package]] -name = "async-io" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" -dependencies = [ - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-io", - "futures-lite", - "parking", - "polling", - "rustix 1.1.2", - "slab", - "windows-sys 0.61.1", -] - -[[package]] -name = "async-lock" -version = "3.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" -dependencies = [ - "event-listener 5.4.1", - "event-listener-strategy", - "pin-project-lite", -] - -[[package]] -name = "async-process" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" -dependencies = [ - "async-channel 2.5.0", - "async-io", - "async-lock", - "async-signal", - "async-task", - "blocking", - "cfg-if", - "event-listener 5.4.1", - "futures-lite", - "rustix 1.1.2", -] - -[[package]] -name = "async-signal" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" -dependencies = [ - "async-io", - "async-lock", - "atomic-waker", - "cfg-if", - "futures-core", - "futures-io", - "rustix 1.1.2", - "signal-hook-registry", - "slab", - "windows-sys 0.61.1", -] - -[[package]] -name = "async-std" -version = "1.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c8e079a4ab67ae52b7403632e4618815d6db36d2a010cfe41b02c1b1578f93b" -dependencies = [ - "async-channel 1.9.0", - "async-global-executor", - "async-io", - "async-lock", - "async-process", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" - [[package]] name = "async_py" version = "0.2.1" @@ -231,12 +81,6 @@ dependencies = [ "bytemuck", ] -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "atty" version = "0.2.14" @@ -305,19 +149,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" -dependencies = [ - "async-channel 2.5.0", - "async-task", - "futures-io", - "futures-lite", - "piper", -] - [[package]] name = "bstr" version = "0.2.17" @@ -406,15 +237,6 @@ dependencies = [ "error-code", ] -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -570,33 +392,6 @@ version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - -[[package]] -name = "event-listener" -version = "5.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" -dependencies = [ - "event-listener 5.4.1", - "pin-project-lite", -] - [[package]] name = "exitcode" version = "1.1.2" @@ -684,19 +479,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-lite" -version = "2.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.31" @@ -804,18 +586,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" -[[package]] -name = "gloo-timers" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "half" version = "1.8.3" @@ -992,15 +762,6 @@ dependencies = [ "cpufeatures", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lalrpop-util" version = "0.20.2" @@ -1093,9 +854,6 @@ name = "log" version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" -dependencies = [ - "value-bag", -] [[package]] name = "lz4_flex" @@ -1385,12 +1143,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "parking" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" - [[package]] name = "parking_lot" version = "0.12.4" @@ -1470,17 +1222,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" -dependencies = [ - "atomic-waker", - "fastrand", - "futures-io", -] - [[package]] name = "pkg-config" version = "0.3.32" @@ -1498,20 +1239,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "polling" -version = "3.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" -dependencies = [ - "cfg-if", - "concurrent-queue", - "hermit-abi 0.5.2", - "pin-project-lite", - "rustix 1.1.2", - "windows-sys 0.61.1", -] - [[package]] name = "portable-atomic" version = "1.11.1" @@ -1574,12 +1301,12 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ee6d4cb3e8d5b925f5cdb38da183e0ff18122eb2048d4041c9e7034d026e23" dependencies = [ - "async-std", "futures", "once_cell", "pin-project-lite", "pyo3", "pyo3-async-runtimes-macros", + "tokio", ] [[package]] @@ -2856,12 +2583,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "value-bag" -version = "1.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" - [[package]] name = "vcpkg" version = "0.2.15" @@ -2931,19 +2652,6 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.54" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e038d41e478cc73bae0ff9b36c60cff1c98b8f38f8d7e8061e79ee63608ac5c" -dependencies = [ - "cfg-if", - "js-sys", - "once_cell", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.104" @@ -2976,16 +2684,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "web-sys" -version = "0.3.81" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9367c417a924a74cae129e6a2ae3b47fabb1f8995595ab474029da749a8be120" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "which" version = "4.4.2" diff --git a/Cargo.toml b/Cargo.toml index 41e6ea8..c5ca04a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ optional = true [dependencies.pyo3-async-runtimes] version = "0.26" -features = ["attributes", "async-std-runtime"] +features = ["attributes", "tokio-runtime"] optional = true [dev-dependencies] diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index facffa7..4ba3022 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -10,10 +10,12 @@ use pyo3::{ types::{PyBool, PyDict, PyFloat, PyInt, PyList, PyString}, IntoPyObjectExt, }; +use pyo3_async_runtimes::tokio::{into_future, run_until_complete}; use serde_json::Value; -use std::{ffi::CString, future::Future}; +use core::panic; +use std::{ffi::CString, future::Future, vec}; use tokio::{ - runtime::Runtime, + runtime::{Builder, Runtime}, sync::{mpsc, oneshot}, }; @@ -30,15 +32,12 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { asyncio .call_method1("set_event_loop", (event_loop.clone(),)) .expect("Failed to set event loop"); - let locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone()) - .copy_context(py) - .unwrap(); let globals = PyDict::new(py); - let rt = tokio::runtime::Builder::new_current_thread() + let rt = Builder::new_current_thread() .enable_all() .build() .expect("Failed to create Tokio runtime for async Python execution"); - let rt_future = rt.spawn(async {}); + while let Some(mut cmd) = py.detach(|| receiver.blocking_recv()) { let result = match std::mem::replace(&mut cmd.cmd_type, CmdType::Stop) { CmdType::RunCode(code) => { @@ -58,16 +57,23 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { handle_call_function(py, &globals, name, args) } CmdType::CallAsyncFunction { name, args } => { - let locals = locals.clone_ref(py); - handle_call_async_function( - py, - &globals, - name, - args, - cmd.responder, - &rt, - locals, - ); + let func = get_py_object(&globals, &name).unwrap(); // TODO; + check_func_callable(&func, &name).unwrap(); // TODO + let event_loop = event_loop.clone().unbind(); + let func = func.unbind(); + py.detach(|| rt.spawn(async move { + let result = handle_call_async_function( + func, + args, + event_loop, + ).await; + let response = match result { + Ok(value) => Ok(value), + Err(e) => Err(e.to_string()), + }; + let _ = cmd.responder.send(response); + })); + continue; } CmdType::Stop => break, @@ -80,7 +86,6 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { }; let _ = cmd.responder.send(response); } - rt_future.abort(); // Cleanly stop the asyncio event loop before the thread exits. event_loop .call_method0("stop") .expect("Failed to stop event loop"); @@ -93,7 +98,7 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { fn get_py_object<'py>( globals: &pyo3::Bound<'py, PyDict>, name: &str, -) -> PyResult> { +) -> PyResult> { let mut parts = name.split('.'); let first_part = parts.next().unwrap(); // split always yields at least one item @@ -108,6 +113,17 @@ fn get_py_object<'py>( Ok(obj) } +fn check_func_callable(func: &Bound, name: &str) -> PyResult<()> { + if !func.is_callable() { + Err(PyErr::new::(format!( + "'{}' is not a callable function", + name + ))) + } else { + Ok(()) + } +} + fn handle_run_file( py: Python, globals: &pyo3::Bound<'_, PyDict>, @@ -135,80 +151,46 @@ fn handle_call_function( args: Vec, ) -> PyResult { let func = get_py_object(globals, &name)?; - - if !func.is_callable() { - return Err(PyErr::new::(format!( - "'{}' is not a callable function", - name - ))); - } - - let py_args = args - .into_iter() - .map(|v| json_value_to_pyobject(py, v)) - .collect::>>()?; - let t_args = pyo3::types::PyTuple::new(py, py_args)?; + check_func_callable(&func, &name)?; + let t_args = vec_to_py_tuple(&py, args)?; let result = func.call1(t_args)?; py_any_to_json(py, &result) } -fn handle_call_async_pre_await( - py: &Python, - globals: &pyo3::Bound<'_, PyDict>, - name: String, +fn vec_to_py_tuple<'py>( + py: &Python<'py>, args: Vec, - locals: pyo3_async_runtimes::TaskLocals, -) -> PyResult>> + Send> { - let func = get_py_object(globals, &name)?; - - if !func.is_callable() { - return Err(PyErr::new::(format!( - "'{}' is not a callable function", - name - ))); - } - +) -> PyResult> { let py_args = args .into_iter() .map(|v| json_value_to_pyobject(*py, v)) .collect::>>()?; - let t_args = pyo3::types::PyTuple::new(*py, py_args)?; - let result = func.call1(t_args)?; - pyo3_async_runtimes::into_future_with_locals(&locals, result) + pyo3::types::PyTuple::new(*py, py_args) } /// Handles the `CallAsyncFunction` command. -fn handle_call_async_function( - py: Python, - globals: &pyo3::Bound, - name: String, +async fn handle_call_async_function( + func: Py, args: Vec, - responder: oneshot::Sender>, - rt: &Runtime, - locals: pyo3_async_runtimes::TaskLocals, -) { - let result_future = match handle_call_async_pre_await(&py, globals, name, args, locals) { - Ok(fut) => fut, - Err(e) => { - let _ = responder.send(Err(e.to_string())); - return; - } - }; - rt.spawn(async move { - let result = result_future - .await - .map_err(|e| e.to_string()) - .and_then(|py_res| { - Python::attach(|py| { - py_any_to_json(py, &py_res.into_bound(py)).map_err(|e| e.to_string()) - }) - }); - let _ = responder.send(result); - }); + event_loop: Py, +) -> PyResult { + panic!("debug"); + Python::attach(|py| { + let func = func.bind(py); + let event_loop = event_loop.bind(py).clone(); + let t_args = vec_to_py_tuple(&py, args)?; + let result = func.call1(t_args)?; + dbg!(1); + let result_future = into_future(result)?; + dbg!(2); + let result = run_until_complete(event_loop, async move {result_future.await})?; + dbg!(3); + py_any_to_json(py, &result.into_bound(py)) + }) } /// Recursively converts a Python object to a `serde_json::Value`. -fn py_any_to_json(py: Python, obj: &pyo3::Bound<'_, pyo3::PyAny>) -> PyResult { +fn py_any_to_json(py: Python, obj: &pyo3::Bound<'_, PyAny>) -> PyResult { if obj.is_none() { return Ok(Value::Null); } @@ -249,7 +231,7 @@ fn py_any_to_json(py: Python, obj: &pyo3::Bound<'_, pyo3::PyAny>) -> PyResult PyResult> { +fn json_value_to_pyobject(py: Python, value: Value) -> PyResult> { match value { Value::Null => Ok(py.None()), Value::Bool(b) => b.into_py_any(py), From c5b7372cf9c88d77024425d4e4f5d27956c1857d Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 4 Oct 2025 14:07:53 +0200 Subject: [PATCH 03/13] Add actual async runner and async test code --- README.md | 38 ++++++++++++++++++++ src/lib.rs | 25 ++++++++----- src/pyo3_runner.rs | 87 ++++++++++++++++------------------------------ 3 files changed, 83 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index e95c53d..983a317 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,44 @@ def greet(name): println!("{}", result2.as_str().unwrap()); // Prints: Hello World! Called 2 times from Python. } ``` + +### Async Python Example + +```rust +use async_py::PyRunner; + +#[tokio::main] +async fn main() { + let runner = PyRunner::new(); + let code = r#" +import asyncio +counter = 0 + +async def add_and_sleep(a, b, sleep_time): + global counter + await asyncio.sleep(sleep_time) + counter += 1 + return a + b + counter +"#; + + runner.run(code).await.unwrap(); + let result1 = runner.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 1.into()]); + let result2 = runner.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 0.1.into()]); + let (result1, result2) = tokio::join!(result1, result2); + assert_eq!(result1.unwrap(), Value::Number(17.into())); + assert_eq!(result2.unwrap(), Value::Number(16.into())); +} +``` +Both function calls are triggered to run async code at the same time. While the first call waits for the sleep, +the second can already start and also increment the counter first. Therefore, +result1 will wait longer and compute 5 + 10 + 2, while the result2 can compute 5 + 10 + 1. + +Each call will use its own event loop. This may not be very efficient and changed later. + +Make sure to use `call_async_function` for async python functions. Using `call_function` will +probably raise an error. +`call_async_function` is not available for RustPython. + ### Using a venv It is generally recommended to use a venv to install pip packages. While you cannot switch the interpreter version with this crate, you can use an diff --git a/src/lib.rs b/src/lib.rs index 8c182df..a34a32e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,7 +88,11 @@ impl PyRunner { // This is crucial to avoid blocking the async runtime and to manage the GIL correctly. thread::spawn(move || { #[cfg(all(feature = "pyo3", not(feature = "rustpython")))] - pyo3_runner::python_thread_main(receiver); + { + use tokio::runtime::Builder; + let rt = Builder::new_multi_thread().enable_all().build().unwrap(); + rt.block_on(pyo3_runner::python_thread_main(receiver)); + } #[cfg(feature = "rustpython")] rustpython_runner::python_thread_main(receiver); @@ -294,18 +298,21 @@ def add(a, b): let executor = PyRunner::new(); let code = r#" import asyncio +counter = 0 -async def async_add(a, b): - await asyncio.sleep(0.1) - return a + b +async def add_and_sleep(a, b, sleep_time): + global counter + await asyncio.sleep(sleep_time) + counter += 1 + return a + b + counter "#; executor.run(code).await.unwrap(); - let result = executor - .call_async_function("async_add", vec![5.into(), 10.into()]) - .await - .unwrap(); - assert_eq!(result, Value::Number(15.into())); + let result1 = executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 1.into()]); + let result2 = executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 0.1.into()]); + let (result1, result2) = tokio::join!(result1, result2); + assert_eq!(result1.unwrap(), Value::Number(17.into())); + assert_eq!(result2.unwrap(), Value::Number(16.into())); } #[tokio::test] diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index 4ba3022..0c7de86 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -10,35 +10,18 @@ use pyo3::{ types::{PyBool, PyDict, PyFloat, PyInt, PyList, PyString}, IntoPyObjectExt, }; -use pyo3_async_runtimes::tokio::{into_future, run_until_complete}; use serde_json::Value; -use core::panic; -use std::{ffi::CString, future::Future, vec}; -use tokio::{ - runtime::{Builder, Runtime}, - sync::{mpsc, oneshot}, -}; +use std::ffi::CString; +use tokio::sync::{mpsc, oneshot}; /// The main loop for the Python thread. This function is spawned in a new /// thread and is responsible for all Python interaction. -pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { +pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) { Python::initialize(); - Python::attach(|py| { - // Setup and run the asyncio event loop for the current thread. - let asyncio = py.import("asyncio").expect("Failed to import asyncio"); - let event_loop = asyncio - .call_method0("new_event_loop") - .expect("Failed to create new event loop"); - asyncio - .call_method1("set_event_loop", (event_loop.clone(),)) - .expect("Failed to set event loop"); - let globals = PyDict::new(py); - let rt = Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create Tokio runtime for async Python execution"); - - while let Some(mut cmd) = py.detach(|| receiver.blocking_recv()) { + let globals = Python::attach(|py| PyDict::new(py).unbind()); + while let Some(mut cmd) = receiver.recv().await { + Python::attach(|py| { + let globals = globals.bind(py); let result = match std::mem::replace(&mut cmd.cmd_type, CmdType::Stop) { CmdType::RunCode(code) => { let c_code = CString::new(code).expect("CString::new failed"); @@ -59,24 +42,14 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { CmdType::CallAsyncFunction { name, args } => { let func = get_py_object(&globals, &name).unwrap(); // TODO; check_func_callable(&func, &name).unwrap(); // TODO - let event_loop = event_loop.clone().unbind(); let func = func.unbind(); - py.detach(|| rt.spawn(async move { - let result = handle_call_async_function( - func, - args, - event_loop, - ).await; - let response = match result { - Ok(value) => Ok(value), - Err(e) => Err(e.to_string()), - }; - let _ = cmd.responder.send(response); - })); - continue; + py.detach(|| { + tokio::spawn(handle_call_async_function(func, args, cmd.responder)) + }); + return; // The response is sent async, so we can return early. } - CmdType::Stop => break, + CmdType::Stop => return receiver.close(), }; // Convert PyErr to a string representation to avoid exposing it outside this module. @@ -85,13 +58,10 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { Err(e) => Err(e.to_string()), }; let _ = cmd.responder.send(response); - } - event_loop - .call_method0("stop") - .expect("Failed to stop event loop"); + }); // After the loop, we can send a final confirmation for the Stop command if needed, // but the current implementation in lib.rs handles the channel closing. - }); + } } /// Resolves a potentially dot-separated Python object name from the globals dictionary. @@ -170,23 +140,24 @@ fn vec_to_py_tuple<'py>( /// Handles the `CallAsyncFunction` command. async fn handle_call_async_function( - func: Py, + func: Py, args: Vec, - event_loop: Py, -) -> PyResult { - panic!("debug"); - Python::attach(|py| { + responder: oneshot::Sender>, +) { + let result = Python::attach(|py| { let func = func.bind(py); - let event_loop = event_loop.bind(py).clone(); let t_args = vec_to_py_tuple(&py, args)?; - let result = func.call1(t_args)?; - dbg!(1); - let result_future = into_future(result)?; - dbg!(2); - let result = run_until_complete(event_loop, async move {result_future.await})?; - dbg!(3); - py_any_to_json(py, &result.into_bound(py)) - }) + let coroutine = func.call1(t_args)?; + + let asyncio = py.import("asyncio")?; + let loop_obj = asyncio.call_method0("new_event_loop")?; + asyncio.call_method1("set_event_loop", (loop_obj.clone(),))?; + let result = loop_obj.call_method1("run_until_complete", (coroutine,))?; + loop_obj.call_method0("close")?; + + py_any_to_json(py, &result) + }); + let _ = responder.send(result.map_err(|e| e.to_string())); } /// Recursively converts a Python object to a `serde_json::Value`. From c9052b534e4f796a10356c06821ea7b6f10ea57d Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 4 Oct 2025 14:09:12 +0200 Subject: [PATCH 04/13] remove unnecessary dep --- Cargo.toml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c5ca04a..30a8921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ tokio = { version = "1.36.0", features = ["sync", "macros"] } [features] default = ["pyo3"] # default = ["rustpython"] -pyo3 = ["dep:pyo3", "dep:pyo3-async-runtimes"] +pyo3 = ["dep:pyo3"] rustpython = ["dep:rustpython-vm", "dep:rustpython-stdlib"] [dependencies.pyo3] @@ -28,11 +28,6 @@ features = ["auto-initialize"] optional = true -[dependencies.pyo3-async-runtimes] -version = "0.26" -features = ["attributes", "tokio-runtime"] -optional = true - [dev-dependencies] tempfile = "3" [dependencies.rustpython-vm] From 9177accb0768444e9a014e73ff1ee2a9f450ebb4 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 4 Oct 2025 14:09:20 +0200 Subject: [PATCH 05/13] cargo.lock --- Cargo.lock | 121 ----------------------------------------------------- 1 file changed, 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc781c0..7e5aed5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,7 +63,6 @@ version = "0.2.1" dependencies = [ "dunce", "pyo3", - "pyo3-async-runtimes", "rustpython-stdlib", "rustpython-vm", "serde_json", @@ -431,95 +430,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" -dependencies = [ - "futures-core", - "futures-sink", -] - -[[package]] -name = "futures-core" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" - -[[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" - -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] - -[[package]] -name = "futures-sink" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" - -[[package]] -name = "futures-task" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" - -[[package]] -name = "futures-util" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project-lite", - "pin-utils", - "slab", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -1216,12 +1126,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "pkg-config" version = "0.3.32" @@ -1295,31 +1199,6 @@ dependencies = [ "unindent", ] -[[package]] -name = "pyo3-async-runtimes" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ee6d4cb3e8d5b925f5cdb38da183e0ff18122eb2048d4041c9e7034d026e23" -dependencies = [ - "futures", - "once_cell", - "pin-project-lite", - "pyo3", - "pyo3-async-runtimes-macros", - "tokio", -] - -[[package]] -name = "pyo3-async-runtimes-macros" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c29bc5c673e36a8102d0b9179149c1bb59990d8db4f3ae58bd7dceccab90b951" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] - [[package]] name = "pyo3-build-config" version = "0.26.0" From 13667544818fe1e7882c89e6c871bba8017bf22e Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Thu, 16 Oct 2025 15:59:11 +0200 Subject: [PATCH 06/13] add println output to test --- src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index a34a32e..cfb90d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -278,6 +278,8 @@ z = x + y"#; #[tokio::test] async fn test_run_with_function() { + // cargo test tests::test_run_with_function --release -- --nocapture + let start_time = std::time::Instant::now(); let executor = PyRunner::new(); let code = r#" def add(a, b): @@ -290,6 +292,8 @@ def add(a, b): .await .unwrap(); assert_eq!(result, Value::Number(14.into())); + let duration = start_time.elapsed(); + println!("test_run_with_function took: {} microseconds", duration.as_micros()); } #[cfg(feature = "pyo3")] From 4aef0e7194fe8ea4515d72535a3afb439636f044 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Thu, 16 Oct 2025 16:42:44 +0200 Subject: [PATCH 07/13] add sync versions --- src/lib.rs | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index cfb90d7..6369655 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,6 +133,21 @@ impl PyRunner { .await .map(|_| ()) } + + /// Synchronously executes a block of Python code. + /// + /// This is a blocking wrapper around `run`. It is intended for use in + /// synchronous applications. + /// + /// * `code`: A string slice containing the Python code to execute. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + pub fn run_sync(&self, code: &str) -> Result<(), PyRunnerError> { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.run(code)) + } + /// Asynchronously runs a python file. /// * `file`: Absolute path to a python file to execute. /// Also loads the path of the file to sys.path for imports. @@ -142,6 +157,20 @@ impl PyRunner { .map(|_| ()) } + /// Synchronously runs a python file. + /// + /// This is a blocking wrapper around `run_file`. It is intended for use in + /// synchronous applications. + /// + /// * `file`: Absolute path to a python file to execute. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + pub fn run_file_sync(&self, file: &Path) -> Result<(), PyRunnerError> { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.run_file(file)) + } + /// Asynchronously evaluates a single Python expression. /// /// * `code`: A string slice containing the Python expression to evaluate. @@ -152,6 +181,20 @@ impl PyRunner { self.send_command(CmdType::EvalCode(code.into())).await } + /// Synchronously evaluates a single Python expression. + /// + /// This is a blocking wrapper around `eval`. It is intended for use in + /// synchronous applications. + /// + /// * `code`: A string slice containing the Python expression to evaluate. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + pub fn eval_sync(&self, code: &str) -> Result { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.eval(code)) + } + /// Asynchronously reads a variable from the Python interpreter's global scope. /// /// * `var_name`: The name of the variable to read. It can be a dot-separated path @@ -162,6 +205,20 @@ impl PyRunner { .await } + /// Synchronously reads a variable from the Python interpreter's global scope. + /// + /// This is a blocking wrapper around `read_variable`. It is intended for use in + /// synchronous applications. + /// + /// * `var_name`: The name of the variable to read. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + pub fn read_variable_sync(&self, var_name: &str) -> Result { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.read_variable(var_name)) + } + /// Asynchronously calls a Python function in the interpreter's global scope. /// /// * `name`: The name of the function to call. It can be a dot-separated path @@ -181,6 +238,27 @@ impl PyRunner { .await } + /// Synchronously calls a Python function in the interpreter's global scope. + /// + /// This is a blocking wrapper around `call_function`. It will create a new + /// Tokio runtime to execute the async function. It is intended for use in + /// synchronous applications. + /// + /// * `name`: The name of the function to call. + /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + /// This is for calling from a non-async context. + #[cfg(feature = "pyo3")] + pub fn call_function_sync( + &self, + name: &str, + args: Vec, + ) -> Result { + let rt = tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.call_function(name, args)) + } + /// Asynchronously calls an async Python function in the interpreter's global scope. /// /// * `name`: The name of the function to call. It can be a dot-separated path @@ -200,12 +278,45 @@ impl PyRunner { .await } + /// Synchronously calls an async Python function in the interpreter's global scope. + /// + /// This is a blocking wrapper around `call_async_function`. It is intended for use in + /// synchronous applications. + /// + /// * `name`: The name of the function to call. + /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + #[cfg(feature = "pyo3")] + pub fn call_async_function_sync( + &self, + name: &str, + args: Vec, + ) -> Result { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.call_async_function(name, args)) + } + /// Stops the Python execution thread gracefully. pub async fn stop(&self) -> Result<(), PyRunnerError> { // We can ignore the `Ok(Value::Null)` result. self.send_command(CmdType::Stop).await?; Ok(()) } + + /// Synchronously stops the Python execution thread gracefully. + /// + /// This is a blocking wrapper around `stop`. It is intended for use in + /// synchronous applications. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + pub fn stop_sync(&self) -> Result<(), PyRunnerError> { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.stop()) + } + /// Set python venv environment folder (does not change interpreter) pub async fn set_venv(&self, venv_path: &Path) -> Result<(), PyRunnerError> { if !venv_path.is_dir() { @@ -239,6 +350,20 @@ impl PyRunner { )) .await } + + /// Synchronously sets the python venv environment folder. + /// + /// This is a blocking wrapper around `set_venv`. It is intended for use in + /// synchronous applications. + /// + /// * `venv_path`: Path to the venv directory. + /// + /// **Note:** Calling this from an existing async runtime can lead to panics. + pub fn set_venv_sync(&self, venv_path: &Path) -> Result<(), PyRunnerError> { + let rt = + tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; + rt.block_on(self.set_venv(venv_path)) + } } #[cfg(test)] @@ -276,10 +401,10 @@ z = x + y"#; assert_eq!(z_val, Value::Number(30.into())); } + #[tokio::test] async fn test_run_with_function() { // cargo test tests::test_run_with_function --release -- --nocapture - let start_time = std::time::Instant::now(); let executor = PyRunner::new(); let code = r#" def add(a, b): @@ -287,6 +412,7 @@ def add(a, b): "#; executor.run(code).await.unwrap(); + let start_time = std::time::Instant::now(); let result = executor .call_function("add", vec![5.into(), 9.into()]) .await @@ -296,6 +422,25 @@ def add(a, b): println!("test_run_with_function took: {} microseconds", duration.as_micros()); } + #[test] + fn test_sync_run_with_function() { + // cargo test tests::test_run_with_function --release -- --nocapture + let executor = PyRunner::new(); + let code = r#" +def add(a, b): + return a + b +"#; + + executor.run_sync(code).unwrap(); + let start_time = std::time::Instant::now(); + let result = executor + .call_function_sync("add", vec![5.into(), 9.into()]) + .unwrap(); + assert_eq!(result, Value::Number(14.into())); + let duration = start_time.elapsed(); + println!("test_run_with_function_sync took: {} microseconds", duration.as_micros()); + } + #[cfg(feature = "pyo3")] #[tokio::test] async fn test_run_with_async_function() { @@ -319,6 +464,25 @@ async def add_and_sleep(a, b, sleep_time): assert_eq!(result2.unwrap(), Value::Number(16.into())); } + #[cfg(feature = "pyo3")] + #[test] + fn test_run_with_async_function_sync() { + let executor = PyRunner::new(); + let code = r#" +import asyncio + +async def add(a, b): + await asyncio.sleep(0.1) + return a + b +"#; + + executor.run_sync(code).unwrap(); + let result = executor + .call_async_function_sync("add", vec![5.into(), 9.into()]) + .unwrap(); + assert_eq!(result, Value::Number(14.into())); + } + #[tokio::test] async fn test_concurrent_calls() { let executor = PyRunner::new(); From 5e7c5484a8a4bccb9d96986ba000b17566d352b1 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Thu, 16 Oct 2025 16:49:53 +0200 Subject: [PATCH 08/13] add lazy initialized sync runtime --- Cargo.lock | 1 + Cargo.toml | 1 + src/lib.rs | 38 +++++++++++++++----------------------- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e5aed5..573134a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,6 +62,7 @@ name = "async_py" version = "0.2.1" dependencies = [ "dunce", + "once_cell", "pyo3", "rustpython-stdlib", "rustpython-vm", diff --git a/Cargo.toml b/Cargo.toml index 30a8921..d8aafad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ categories = ["api-bindings", "asynchronous", "external-ffi-bindings"] dunce = "1.0.4" serde_json = "1.0.114" thiserror = "2.0" +once_cell = "1.19" tokio = { version = "1.36.0", features = ["sync", "macros"] } [features] diff --git a/src/lib.rs b/src/lib.rs index 6369655..4ecde77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,12 +10,19 @@ mod pyo3_runner; #[cfg(feature = "rustpython")] mod rustpython_runner; +use once_cell::sync::Lazy; use serde_json::Value; use std::path::{Path, PathBuf}; use std::thread; use thiserror::Error; +use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; +/// A lazily-initialized global Tokio runtime for synchronous functions. +static SYNC_RUNTIME: Lazy = Lazy::new(|| { + Runtime::new().expect("Failed to create a new Tokio runtime for sync functions") +}); + #[derive(Debug)] pub(crate) enum CmdType { RunFile(PathBuf), @@ -143,9 +150,7 @@ impl PyRunner { /// /// **Note:** Calling this from an existing async runtime can lead to panics. pub fn run_sync(&self, code: &str) -> Result<(), PyRunnerError> { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.run(code)) + SYNC_RUNTIME.block_on(self.run(code)) } /// Asynchronously runs a python file. @@ -166,9 +171,7 @@ impl PyRunner { /// /// **Note:** Calling this from an existing async runtime can lead to panics. pub fn run_file_sync(&self, file: &Path) -> Result<(), PyRunnerError> { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.run_file(file)) + SYNC_RUNTIME.block_on(self.run_file(file)) } /// Asynchronously evaluates a single Python expression. @@ -190,9 +193,7 @@ impl PyRunner { /// /// **Note:** Calling this from an existing async runtime can lead to panics. pub fn eval_sync(&self, code: &str) -> Result { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.eval(code)) + SYNC_RUNTIME.block_on(self.eval(code)) } /// Asynchronously reads a variable from the Python interpreter's global scope. @@ -214,9 +215,7 @@ impl PyRunner { /// /// **Note:** Calling this from an existing async runtime can lead to panics. pub fn read_variable_sync(&self, var_name: &str) -> Result { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.read_variable(var_name)) + SYNC_RUNTIME.block_on(self.read_variable(var_name)) } /// Asynchronously calls a Python function in the interpreter's global scope. @@ -255,8 +254,7 @@ impl PyRunner { name: &str, args: Vec, ) -> Result { - let rt = tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.call_function(name, args)) + SYNC_RUNTIME.block_on(self.call_function(name, args)) } /// Asynchronously calls an async Python function in the interpreter's global scope. @@ -293,9 +291,7 @@ impl PyRunner { name: &str, args: Vec, ) -> Result { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.call_async_function(name, args)) + SYNC_RUNTIME.block_on(self.call_async_function(name, args)) } /// Stops the Python execution thread gracefully. @@ -312,9 +308,7 @@ impl PyRunner { /// /// **Note:** Calling this from an existing async runtime can lead to panics. pub fn stop_sync(&self) -> Result<(), PyRunnerError> { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.stop()) + SYNC_RUNTIME.block_on(self.stop()) } /// Set python venv environment folder (does not change interpreter) @@ -360,9 +354,7 @@ impl PyRunner { /// /// **Note:** Calling this from an existing async runtime can lead to panics. pub fn set_venv_sync(&self, venv_path: &Path) -> Result<(), PyRunnerError> { - let rt = - tokio::runtime::Runtime::new().map_err(|e| PyRunnerError::PyError(e.to_string()))?; - rt.block_on(self.set_venv(venv_path)) + SYNC_RUNTIME.block_on(self.set_venv(venv_path)) } } From fad8da67cb52aece3b09af38c7457cadafeee9cd Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Fri, 17 Oct 2025 09:59:20 +0200 Subject: [PATCH 09/13] calling sync from async --- src/lib.rs | 146 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 120 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4ecde77..4e5e227 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,15 +13,11 @@ mod rustpython_runner; use once_cell::sync::Lazy; use serde_json::Value; use std::path::{Path, PathBuf}; +use std::sync::mpsc as std_mpsc; use std::thread; use thiserror::Error; -use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; - -/// A lazily-initialized global Tokio runtime for synchronous functions. -static SYNC_RUNTIME: Lazy = Lazy::new(|| { - Runtime::new().expect("Failed to create a new Tokio runtime for sync functions") -}); +use tokio::runtime::{Builder, Runtime}; #[derive(Debug)] pub(crate) enum CmdType { @@ -41,8 +37,26 @@ pub(crate) struct PyCommand { responder: oneshot::Sender>, } +/// A boxed, send-able future that resolves to a PyRunnerResult. +type Task = Box Result + Send>; + +/// A lazily-initialized worker thread for handling synchronous function calls. +/// This thread has its own private Tokio runtime to safely block on async operations +/// without interfering with any existing runtime the user might be in. +static SYNC_WORKER: Lazy> = Lazy::new(|| { + let (tx, rx) = std_mpsc::channel::(); + + thread::spawn(move || { + let rt = Runtime::new().expect("Failed to create Tokio runtime for sync worker"); + // When the sender (tx) is dropped, rx.recv() will return an Err, ending the loop. + while let Ok(task) = rx.recv() { + let _ = task(&rt); // The result is sent back via a channel inside the task. + } + }); + tx +}); /// Custom error types for the `PyRunner`. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum PyRunnerError { #[error("Failed to send command to Python thread. The thread may have panicked.")] SendCommandFailed, @@ -96,13 +110,15 @@ impl PyRunner { thread::spawn(move || { #[cfg(all(feature = "pyo3", not(feature = "rustpython")))] { - use tokio::runtime::Builder; let rt = Builder::new_multi_thread().enable_all().build().unwrap(); rt.block_on(pyo3_runner::python_thread_main(receiver)); } #[cfg(feature = "rustpython")] - rustpython_runner::python_thread_main(receiver); + { + let rt = Builder::new_current_thread().enable_all().build().unwrap(); + rt.block_on(rustpython_runner::python_thread_main(receiver)); + } }); Self { sender } @@ -131,6 +147,32 @@ impl PyRunner { .map_err(PyRunnerError::PyError) } + /// A private helper function to encapsulate the logic of sending a command + /// and receiving a response synchronously. + fn send_command_sync(&self, cmd_type: CmdType) -> Result { + let (tx, rx) = std_mpsc::channel(); + let sender = self.sender.clone(); + + let cmd_type_clone = cmd_type; // Clone is implicit as CmdType is Copy + let task = Box::new(move |rt: &Runtime| { + let result = rt.block_on(async { + // This is the async `send_command` logic, but we can't call it + // directly because of `&self` lifetime issues inside the closure. + let (responder, receiver) = oneshot::channel(); + let cmd = PyCommand { cmd_type: cmd_type_clone, responder }; + sender.send(cmd).await.map_err(|_| PyRunnerError::SendCommandFailed)?; + receiver.await.map_err(|_| PyRunnerError::ReceiveResultFailed.clone())? + .map_err(PyRunnerError::PyError) + }); + if tx.send(result.clone()).is_err() { + return Err(PyRunnerError::SendCommandFailed); + } + result + }); + + SYNC_WORKER.send(task).map_err(|_| PyRunnerError::SendCommandFailed)?; + rx.recv().map_err(|_| PyRunnerError::ReceiveResultFailed)? + } /// Asynchronously executes a block of Python code. /// /// * `code`: A string slice containing the Python code to execute. @@ -148,9 +190,10 @@ impl PyRunner { /// /// * `code`: A string slice containing the Python code to execute. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). pub fn run_sync(&self, code: &str) -> Result<(), PyRunnerError> { - SYNC_RUNTIME.block_on(self.run(code)) + self.send_command_sync(CmdType::RunCode(code.into())) + .map(|_| ()) } /// Asynchronously runs a python file. @@ -169,9 +212,10 @@ impl PyRunner { /// /// * `file`: Absolute path to a python file to execute. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). pub fn run_file_sync(&self, file: &Path) -> Result<(), PyRunnerError> { - SYNC_RUNTIME.block_on(self.run_file(file)) + self.send_command_sync(CmdType::RunFile(file.to_path_buf())) + .map(|_| ()) } /// Asynchronously evaluates a single Python expression. @@ -191,9 +235,9 @@ impl PyRunner { /// /// * `code`: A string slice containing the Python expression to evaluate. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). pub fn eval_sync(&self, code: &str) -> Result { - SYNC_RUNTIME.block_on(self.eval(code)) + self.send_command_sync(CmdType::EvalCode(code.into())) } /// Asynchronously reads a variable from the Python interpreter's global scope. @@ -213,9 +257,9 @@ impl PyRunner { /// /// * `var_name`: The name of the variable to read. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). pub fn read_variable_sync(&self, var_name: &str) -> Result { - SYNC_RUNTIME.block_on(self.read_variable(var_name)) + self.send_command_sync(CmdType::ReadVariable(var_name.into())) } /// Asynchronously calls a Python function in the interpreter's global scope. @@ -246,15 +290,17 @@ impl PyRunner { /// * `name`: The name of the function to call. /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. - /// This is for calling from a non-async context. + /// **Note:** This function is safe to call from any context (sync or async). #[cfg(feature = "pyo3")] pub fn call_function_sync( &self, name: &str, args: Vec, ) -> Result { - SYNC_RUNTIME.block_on(self.call_function(name, args)) + self.send_command_sync(CmdType::CallFunction { + name: name.into(), + args, + }) } /// Asynchronously calls an async Python function in the interpreter's global scope. @@ -284,14 +330,17 @@ impl PyRunner { /// * `name`: The name of the function to call. /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). #[cfg(feature = "pyo3")] pub fn call_async_function_sync( &self, name: &str, args: Vec, ) -> Result { - SYNC_RUNTIME.block_on(self.call_async_function(name, args)) + self.send_command_sync(CmdType::CallAsyncFunction { + name: name.into(), + args, + }) } /// Stops the Python execution thread gracefully. @@ -306,9 +355,9 @@ impl PyRunner { /// This is a blocking wrapper around `stop`. It is intended for use in /// synchronous applications. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). pub fn stop_sync(&self) -> Result<(), PyRunnerError> { - SYNC_RUNTIME.block_on(self.stop()) + self.send_command_sync(CmdType::Stop).map(|_| ()) } /// Set python venv environment folder (does not change interpreter) @@ -352,9 +401,37 @@ impl PyRunner { /// /// * `venv_path`: Path to the venv directory. /// - /// **Note:** Calling this from an existing async runtime can lead to panics. + /// **Note:** This function is safe to call from any context (sync or async). pub fn set_venv_sync(&self, venv_path: &Path) -> Result<(), PyRunnerError> { - SYNC_RUNTIME.block_on(self.set_venv(venv_path)) + if !venv_path.is_dir() { + return Err(PyRunnerError::PyError(format!( + "Could not find venv directory {}", + venv_path.display() + ))); + } + let set_venv_code = include_str!("set_venv.py"); + self.run_sync(&set_venv_code)?; + + let site_packages = if cfg!(target_os = "windows") { + venv_path.join("Lib").join("site-packages") + } else { + let version_code = "f\"python{sys.version_info.major}.{sys.version_info.minor}\""; + let py_version = self.eval_sync(version_code)?; + venv_path + .join("lib") + .join(py_version.as_str().unwrap()) + .join("site-packages") + }; + #[cfg(all(feature = "pyo3", not(feature = "rustpython")))] + let with_pth = "True"; + #[cfg(feature = "rustpython")] + let with_pth = "False"; + + self.run_sync(&format!( + "add_venv_libs_to_syspath({}, {})", + print_path_for_python(&site_packages), + with_pth + )) } } @@ -393,6 +470,23 @@ z = x + y"#; assert_eq!(z_val, Value::Number(30.into())); } + + #[tokio::test] + async fn test_run_sync_from_async() { + let executor = PyRunner::new(); + let code = r#" +x = 10 +y = 20 +z = x + y"#; + + let result_module = executor.run(code).await; + + assert!(result_module.is_ok()); + + let z_val = executor.read_variable_sync("z").unwrap(); + + assert_eq!(z_val, Value::Number(30.into())); + } #[tokio::test] async fn test_run_with_function() { From 962c041437ad5425fe3dc07a8722e76647991f64 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Fri, 17 Oct 2025 10:17:00 +0200 Subject: [PATCH 10/13] rustpython cleanup --- src/lib.rs | 7 +++---- src/rustpython_runner.rs | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4e5e227..f45104c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ use std::sync::mpsc as std_mpsc; use std::thread; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; -use tokio::runtime::{Builder, Runtime}; +use tokio::runtime::Runtime; #[derive(Debug)] pub(crate) enum CmdType { @@ -110,14 +110,14 @@ impl PyRunner { thread::spawn(move || { #[cfg(all(feature = "pyo3", not(feature = "rustpython")))] { + use tokio::runtime::Builder; let rt = Builder::new_multi_thread().enable_all().build().unwrap(); rt.block_on(pyo3_runner::python_thread_main(receiver)); } #[cfg(feature = "rustpython")] { - let rt = Builder::new_current_thread().enable_all().build().unwrap(); - rt.block_on(rustpython_runner::python_thread_main(receiver)); + rustpython_runner::python_thread_main(receiver); } }); @@ -291,7 +291,6 @@ impl PyRunner { /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. /// /// **Note:** This function is safe to call from any context (sync or async). - #[cfg(feature = "pyo3")] pub fn call_function_sync( &self, name: &str, diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index ed18bf0..15be641 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -48,6 +48,7 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { .and_then(|obj| py_to_json(vm, &obj)) } CmdType::CallAsyncFunction { name, args } => { + dbg!(name, args); unimplemented!("Async functions are not supported yet in RustPython") } CmdType::Stop => break, From 3609057e6e23a34198f81e998741d31f82490f9a Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Fri, 17 Oct 2025 10:20:23 +0200 Subject: [PATCH 11/13] fmt --- src/lib.rs | 45 +++++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f45104c..f4cc8c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,8 +16,8 @@ use std::path::{Path, PathBuf}; use std::sync::mpsc as std_mpsc; use std::thread; use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; use tokio::runtime::Runtime; +use tokio::sync::{mpsc, oneshot}; #[derive(Debug)] pub(crate) enum CmdType { @@ -159,9 +159,17 @@ impl PyRunner { // This is the async `send_command` logic, but we can't call it // directly because of `&self` lifetime issues inside the closure. let (responder, receiver) = oneshot::channel(); - let cmd = PyCommand { cmd_type: cmd_type_clone, responder }; - sender.send(cmd).await.map_err(|_| PyRunnerError::SendCommandFailed)?; - receiver.await.map_err(|_| PyRunnerError::ReceiveResultFailed.clone())? + let cmd = PyCommand { + cmd_type: cmd_type_clone, + responder, + }; + sender + .send(cmd) + .await + .map_err(|_| PyRunnerError::SendCommandFailed)?; + receiver + .await + .map_err(|_| PyRunnerError::ReceiveResultFailed.clone())? .map_err(PyRunnerError::PyError) }); if tx.send(result.clone()).is_err() { @@ -170,7 +178,9 @@ impl PyRunner { result }); - SYNC_WORKER.send(task).map_err(|_| PyRunnerError::SendCommandFailed)?; + SYNC_WORKER + .send(task) + .map_err(|_| PyRunnerError::SendCommandFailed)?; rx.recv().map_err(|_| PyRunnerError::ReceiveResultFailed)? } /// Asynchronously executes a block of Python code. @@ -291,11 +301,7 @@ impl PyRunner { /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. /// /// **Note:** This function is safe to call from any context (sync or async). - pub fn call_function_sync( - &self, - name: &str, - args: Vec, - ) -> Result { + pub fn call_function_sync(&self, name: &str, args: Vec) -> Result { self.send_command_sync(CmdType::CallFunction { name: name.into(), args, @@ -469,7 +475,6 @@ z = x + y"#; assert_eq!(z_val, Value::Number(30.into())); } - #[tokio::test] async fn test_run_sync_from_async() { let executor = PyRunner::new(); @@ -486,7 +491,7 @@ z = x + y"#; assert_eq!(z_val, Value::Number(30.into())); } - + #[tokio::test] async fn test_run_with_function() { // cargo test tests::test_run_with_function --release -- --nocapture @@ -504,7 +509,10 @@ def add(a, b): .unwrap(); assert_eq!(result, Value::Number(14.into())); let duration = start_time.elapsed(); - println!("test_run_with_function took: {} microseconds", duration.as_micros()); + println!( + "test_run_with_function took: {} microseconds", + duration.as_micros() + ); } #[test] @@ -523,7 +531,10 @@ def add(a, b): .unwrap(); assert_eq!(result, Value::Number(14.into())); let duration = start_time.elapsed(); - println!("test_run_with_function_sync took: {} microseconds", duration.as_micros()); + println!( + "test_run_with_function_sync took: {} microseconds", + duration.as_micros() + ); } #[cfg(feature = "pyo3")] @@ -542,8 +553,10 @@ async def add_and_sleep(a, b, sleep_time): "#; executor.run(code).await.unwrap(); - let result1 = executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 1.into()]); - let result2 = executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 0.1.into()]); + let result1 = + executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 1.into()]); + let result2 = + executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 0.1.into()]); let (result1, result2) = tokio::join!(result1, result2); assert_eq!(result1.unwrap(), Value::Number(17.into())); assert_eq!(result2.unwrap(), Value::Number(16.into())); From 769195ac368533f78a02e4c4d34648ddb19be23a Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Fri, 17 Oct 2025 10:43:59 +0200 Subject: [PATCH 12/13] clippy --- Cargo.toml | 13 +++++-------- src/lib.rs | 23 +++++++++++++++++------ src/pyo3_runner.rs | 26 +++++++++++++------------- 3 files changed, 35 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d8aafad..5157f33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ dunce = "1.0.4" serde_json = "1.0.114" thiserror = "2.0" once_cell = "1.19" -tokio = { version = "1.36.0", features = ["sync", "macros"] } +tokio = { version = "1.47.0", features = ["sync", "macros", "rt", "rt-multi-thread"] } [features] default = ["pyo3"] @@ -28,19 +28,16 @@ version = "0.26.0" features = ["auto-initialize"] optional = true - -[dev-dependencies] -tempfile = "3" [dependencies.rustpython-vm] version = "0.4.0" optional = true features = ["threading", "serde", "importlib"] + [dependencies.rustpython-stdlib] version = "0.4.0" optional = true features = ["threading"] -# The `full` feature for tokio is needed for the tests -[dev-dependencies.tokio] -version = "1" -features = ["full"] +[dev-dependencies] +tempfile = "3" +tokio = { version = "1.47.0", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index f4cc8c1..3e9126e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,6 +92,13 @@ pub struct PyRunner { sender: mpsc::Sender, } + +impl Default for PyRunner { + fn default() -> Self { + PyRunner::new() + } +} + impl PyRunner { /// Creates a new `PyRunner` and spawns a dedicated thread for Python execution. /// @@ -153,14 +160,13 @@ impl PyRunner { let (tx, rx) = std_mpsc::channel(); let sender = self.sender.clone(); - let cmd_type_clone = cmd_type; // Clone is implicit as CmdType is Copy let task = Box::new(move |rt: &Runtime| { let result = rt.block_on(async { // This is the async `send_command` logic, but we can't call it // directly because of `&self` lifetime issues inside the closure. let (responder, receiver) = oneshot::channel(); let cmd = PyCommand { - cmd_type: cmd_type_clone, + cmd_type, responder, }; sender @@ -186,6 +192,7 @@ impl PyRunner { /// Asynchronously executes a block of Python code. /// /// * `code`: A string slice containing the Python code to execute. + /// /// This is equivalent to Python's `exec()` function. pub async fn run(&self, code: &str) -> Result<(), PyRunnerError> { self.send_command(CmdType::RunCode(code.into())) @@ -208,6 +215,7 @@ impl PyRunner { /// Asynchronously runs a python file. /// * `file`: Absolute path to a python file to execute. + /// /// Also loads the path of the file to sys.path for imports. pub async fn run_file(&self, file: &Path) -> Result<(), PyRunnerError> { self.send_command(CmdType::RunFile(file.to_path_buf())) @@ -230,8 +238,8 @@ impl PyRunner { /// Asynchronously evaluates a single Python expression. /// - /// * `code`: A string slice containing the Python expression to evaluate. - /// Must not contain definitions or multiple lines. + /// * `code`: A string slice containing the Python expression to evaluate. Must not contain definitions or multiple lines. + /// /// Returns a `Result` containing the expression's result as a `serde_json::Value` on success, /// or a `PyRunnerError` on failure. This is equivalent to Python's `eval()` function. pub async fn eval(&self, code: &str) -> Result { @@ -254,6 +262,7 @@ impl PyRunner { /// /// * `var_name`: The name of the variable to read. It can be a dot-separated path /// to access attributes of objects (e.g., "my_module.my_variable"). + /// /// Returns the variable's value as a `serde_json::Value` on success. pub async fn read_variable(&self, var_name: &str) -> Result { self.send_command(CmdType::ReadVariable(var_name.into())) @@ -277,6 +286,7 @@ impl PyRunner { /// * `name`: The name of the function to call. It can be a dot-separated path /// to access functions within modules (e.g., "my_module.my_function"). /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. + /// /// Returns the function's return value as a `serde_json::Value` on success. /// Does not release GIL during await. pub async fn call_function( @@ -313,6 +323,7 @@ impl PyRunner { /// * `name`: The name of the function to call. It can be a dot-separated path /// to access functions within modules (e.g., "my_module.my_function"). /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. + /// /// Returns the function's return value as a `serde_json::Value` on success. /// Will release GIL during await. pub async fn call_async_function( @@ -374,7 +385,7 @@ impl PyRunner { ))); } let set_venv_code = include_str!("set_venv.py"); - self.run(&set_venv_code).await?; + self.run(set_venv_code).await?; let site_packages = if cfg!(target_os = "windows") { venv_path.join("Lib").join("site-packages") @@ -415,7 +426,7 @@ impl PyRunner { ))); } let set_venv_code = include_str!("set_venv.py"); - self.run_sync(&set_venv_code)?; + self.run_sync(set_venv_code)?; let site_packages = if cfg!(target_os = "windows") { venv_path.join("Lib").join("site-packages") diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index 0c7de86..a0c07bb 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -25,22 +25,22 @@ pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) let result = match std::mem::replace(&mut cmd.cmd_type, CmdType::Stop) { CmdType::RunCode(code) => { let c_code = CString::new(code).expect("CString::new failed"); - py.run(&c_code, Some(&globals), None).map(|_| Value::Null) + py.run(&c_code, Some(globals), None).map(|_| Value::Null) } CmdType::EvalCode(code) => { let c_code = CString::new(code).expect("CString::new failed"); - py.eval(&c_code, Some(&globals), None) - .and_then(|obj| py_any_to_json(py, &obj)) + py.eval(&c_code, Some(globals), None) + .and_then(|obj| py_any_to_json(&obj)) } - CmdType::RunFile(file) => handle_run_file(py, &globals, file), + CmdType::RunFile(file) => handle_run_file(py, globals, file), CmdType::ReadVariable(var_name) => { - get_py_object(&globals, &var_name).and_then(|obj| py_any_to_json(py, &obj)) + get_py_object(globals, &var_name).and_then(|obj| py_any_to_json(&obj)) } CmdType::CallFunction { name, args } => { - handle_call_function(py, &globals, name, args) + handle_call_function(py, globals, name, args) } CmdType::CallAsyncFunction { name, args } => { - let func = get_py_object(&globals, &name).unwrap(); // TODO; + let func = get_py_object(globals, &name).unwrap(); // TODO; check_func_callable(&func, &name).unwrap(); // TODO let func = func.unbind(); @@ -110,7 +110,7 @@ with open({}, 'r') as f: print_path_for_python(&file.to_path_buf()) ); let c_code = CString::new(code).expect("CString::new failed"); - py.run(&c_code, Some(&globals), None).map(|_| Value::Null) + py.run(&c_code, Some(globals), None).map(|_| Value::Null) } /// Handles the `CallFunction` command. @@ -124,7 +124,7 @@ fn handle_call_function( check_func_callable(&func, &name)?; let t_args = vec_to_py_tuple(&py, args)?; let result = func.call1(t_args)?; - py_any_to_json(py, &result) + py_any_to_json(&result) } fn vec_to_py_tuple<'py>( @@ -155,13 +155,13 @@ async fn handle_call_async_function( let result = loop_obj.call_method1("run_until_complete", (coroutine,))?; loop_obj.call_method0("close")?; - py_any_to_json(py, &result) + py_any_to_json(&result) }); let _ = responder.send(result.map_err(|e| e.to_string())); } /// Recursively converts a Python object to a `serde_json::Value`. -fn py_any_to_json(py: Python, obj: &pyo3::Bound<'_, PyAny>) -> PyResult { +fn py_any_to_json(obj: &pyo3::Bound<'_, PyAny>) -> PyResult { if obj.is_none() { return Ok(Value::Null); } @@ -186,13 +186,13 @@ fn py_any_to_json(py: Python, obj: &pyo3::Bound<'_, PyAny>) -> PyResult { } if let Ok(list) = obj.cast::() { let items: PyResult> = - list.iter().map(|item| py_any_to_json(py, &item)).collect(); + list.iter().map(|item| py_any_to_json(&item)).collect(); return Ok(Value::Array(items?)); } if let Ok(dict) = obj.cast::() { let mut map = serde_json::Map::new(); for (key, value) in dict.iter() { - map.insert(key.to_string(), py_any_to_json(py, &value)?); + map.insert(key.to_string(), py_any_to_json(&value)?); } return Ok(Value::Object(map)); } From 0d3d7793e106e0e9df90c234a46708c759944b56 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Fri, 17 Oct 2025 10:48:14 +0200 Subject: [PATCH 13/13] avoid unwrap --- src/pyo3_runner.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index a0c07bb..ed9fea1 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -40,14 +40,19 @@ pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) handle_call_function(py, globals, name, args) } CmdType::CallAsyncFunction { name, args } => { - let func = get_py_object(globals, &name).unwrap(); // TODO; - check_func_callable(&func, &name).unwrap(); // TODO - let func = func.unbind(); - - py.detach(|| { - tokio::spawn(handle_call_async_function(func, args, cmd.responder)) - }); - return; // The response is sent async, so we can return early. + let result: PyResult<_> = (|| { + let func = get_py_object(globals, &name)?; + check_func_callable(&func, &name)?; + Ok(func.unbind()) + })(); + + match result { + Ok(func) => { + py.detach(|| tokio::spawn(handle_call_async_function(func, args, cmd.responder))); + return; // The response is sent async, so we can return early. + } + Err(e) => Err(e), + } } CmdType::Stop => return receiver.close(), };