Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/buttplug_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ test = true
doctest = true
doc = true

[features]
default = ["tokio-runtime"]
tokio-runtime = ["buttplug_core/tokio-runtime"]
wasm = ["buttplug_core/wasm"]

[dependencies]
buttplug_core = { version = "10.0.1", path = "../buttplug_core" }
buttplug_core = { version = "10.0.1", path = "../buttplug_core", default-features = false }
futures = "0.3.32"
thiserror = "2.0.18"
log = "0.4.29"
getset = "0.1.6"
tokio = { version = "1.50.0", features = ["macros"] }
dashmap = { version = "6.1.0" }
tracing-futures = "0.2.5"
tracing = "0.1.44"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
Expand Down
12 changes: 4 additions & 8 deletions crates/buttplug_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use buttplug_core::{
StopCmdV4,
StopScanningV0,
},
util::{async_manager, stream::convert_broadcast_receiver_to_stream},
util::stream::convert_broadcast_receiver_to_stream,
};
use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
use dashmap::DashMap;
Expand All @@ -53,7 +53,6 @@ use std::{
use strum_macros::Display;
use thiserror::Error;
use tokio::sync::{Mutex, broadcast, mpsc};
use tracing_futures::Instrument;

/// Result type used for public APIs.
///
Expand Down Expand Up @@ -320,12 +319,9 @@ impl ButtplugClient {
);

// Start the event loop before we run the handshake.
async_manager::spawn(
async move {
client_event_loop.run().await;
}
.instrument(tracing::info_span!("Client Loop Span")),
);
buttplug_core::spawn!("ButtplugClient event loop", async move {
client_event_loop.run().await;
});
self.run_handshake().await
}

Expand Down
11 changes: 6 additions & 5 deletions crates/buttplug_client_in_process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ doc = true


[features]
default = ["btleplug-manager", "hid-manager", "lovense-dongle-manager", "lovense-connect-service-manager", "serial-manager", "websocket-manager", "xinput-manager"]
default = ["tokio-runtime", "btleplug-manager", "hid-manager", "lovense-dongle-manager", "lovense-connect-service-manager", "serial-manager", "websocket-manager", "xinput-manager"]
btleplug-manager=["buttplug_server_hwmgr_btleplug"]
hid-manager=["buttplug_server_hwmgr_hid"]
lovense-dongle-manager=["buttplug_server_hwmgr_lovense_dongle"]
lovense-connect-service-manager=["buttplug_server_hwmgr_lovense_connect"]
serial-manager=["buttplug_server_hwmgr_serial"]
websocket-manager=["buttplug_server_hwmgr_websocket"]
xinput-manager=["buttplug_server_hwmgr_xinput"]
tokio-runtime = ["buttplug_core/tokio-runtime", "buttplug_client/tokio-runtime", "buttplug_server/tokio-runtime"]
wasm = ["buttplug_core/wasm", "buttplug_client/wasm", "buttplug_server/wasm"]

[dependencies]
buttplug_core = { version = "10.0.1", path = "../buttplug_core" }
buttplug_client = { version = "10.0.1", path = "../buttplug_client" }
buttplug_server = { version = "10.0.1", path = "../buttplug_server" }
buttplug_core = { version = "10.0.1", path = "../buttplug_core", default-features = false }
buttplug_client = { version = "10.0.1", path = "../buttplug_client", default-features = false }
buttplug_server = { version = "10.0.1", path = "../buttplug_server", default-features = false }
buttplug_server_device_config = { version = "10.0.2", path = "../buttplug_server_device_config" }
buttplug_server_hwmgr_btleplug = { version = "10.0.1", path = "../buttplug_server_hwmgr_btleplug", optional = true}
buttplug_server_hwmgr_hid = { version = "10.0.0", path = "../buttplug_server_hwmgr_hid", optional = true}
Expand All @@ -48,5 +50,4 @@ log = "0.4.29"
getset = "0.1.6"
tokio = { version = "1.50.0", features = ["macros"] }
dashmap = { version = "6.1.0" }
tracing-futures = "0.2.5"
tracing = "0.1.44"
6 changes: 2 additions & 4 deletions crates/buttplug_client_in_process/src/in_process_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use buttplug_core::{
connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
errors::{ButtplugError, ButtplugMessageError},
message::{ButtplugClientMessageV4, ButtplugServerMessageV4},
util::async_manager,
};
use buttplug_server::{
ButtplugServer,
Expand All @@ -29,7 +28,6 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
};
use tokio::sync::mpsc::{Sender, channel};
use tracing_futures::Instrument;

#[derive(Default)]
pub struct ButtplugInProcessClientConnectorBuilder {
Expand Down Expand Up @@ -116,7 +114,7 @@ impl ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4>
self.server_outbound_sender = message_sender;
let server_recv = self.server.server_version_event_stream();
async move {
async_manager::spawn(async move {
buttplug_core::spawn!("InProcessClientConnector event sender loop", async move {
info!("Starting In Process Client Connector Event Sender Loop");
pin_mut!(server_recv);
while let Some(event) = server_recv.next().await {
Expand All @@ -130,7 +128,7 @@ impl ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4>
}
}
info!("Stopping In Process Client Connector Event Sender Loop, due to channel receiver being dropped.");
}.instrument(tracing::info_span!("InProcessClientConnectorEventSenderLoop")));
});
connected.store(true, Ordering::Relaxed);
Ok(())
}.boxed()
Expand Down
10 changes: 7 additions & 3 deletions crates/buttplug_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ doc = true

[features]
default=["tokio-runtime"]
tokio-runtime=["tokio/rt"]
wasm=[]
tokio-runtime=["tokio/rt", "tokio/time"]
wasm=["wasm-bindgen-futures", "wasmtimer"]

# Only build docs on one platform (linux)
[package.metadata.docs.rs]
Expand All @@ -46,9 +46,13 @@ log = "0.4.29"
getset = "0.1.6"
jsonschema = { version = "0.45.0", default-features = false }
cfg-if = "1.0.4"
tokio = { version = "1.50.0", features = ["sync", "time", "macros"] }
tokio = { version = "1.50.0", features = ["sync", "macros"] }
async-stream = "0.3.6"
strum_macros = "0.28.0"
strum = "0.28.0"
derive_builder = "0.20.2"
enum_dispatch = "0.3"
tracing = "0.1.44"
wasm-bindgen-futures = { version = "0.4.64", optional = true }
wasmtimer = { version = "0.4.3", optional = true }

3 changes: 1 addition & 2 deletions crates/buttplug_core/src/connector/remote_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::{
ButtplugMessage,
serializer::{ButtplugMessageSerializer, ButtplugSerializedMessage},
},
util::async_manager,
};
use futures::{FutureExt, future::BoxFuture, select};
use log::*;
Expand Down Expand Up @@ -234,7 +233,7 @@ where
// If we connect successfully, we get back the channel from the transport
// to send outgoing messages and receieve incoming events, all serialized.
Ok(()) => {
async_manager::spawn(async move {
crate::spawn!("ButtplugRemoteConnector event loop", async move {
remote_connector_event_loop::<
TransportType,
SerializerType,
Expand Down
3 changes: 1 addition & 2 deletions crates/buttplug_core/src/connector/transport/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
transport::{ButtplugConnectorTransport, ButtplugTransportIncomingMessage},
},
message::serializer::ButtplugSerializedMessage,
util::async_manager,
};
use futures::{
FutureExt,
Expand Down Expand Up @@ -63,7 +62,7 @@ impl ButtplugConnectorTransport for ButtplugStreamTransport {
.await
.take()
.ok_or(ButtplugConnectorError::ConnectorAlreadyConnected)?;
async_manager::spawn(async move {
crate::spawn!("ButtplugStreamTransport", async move {
loop {
select! {
msg = outgoing_receiver.recv() => {
Expand Down
35 changes: 0 additions & 35 deletions crates/buttplug_core/src/util/async_manager/dummy.rs

This file was deleted.

100 changes: 89 additions & 11 deletions crates/buttplug_core/src/util/async_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,94 @@
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

cfg_if::cfg_if! {
if #[cfg(feature = "wasm")] {
mod wasm_bindgen;
pub use self::wasm_bindgen::{WasmBindgenAsyncManager as AsyncManager, spawn, spawn_with_handle};
} else if #[cfg(feature = "tokio-runtime")] {
mod tokio;
pub use self::tokio::{TokioAsyncManager as AsyncManager, spawn, spawn_with_handle};
} else {
mod dummy;
pub use dummy::{DummyAsyncManager as AsyncManager, spawn, spawn_with_handle};
//std::compile_error!("Please choose a runtime feature: tokio-runtime, wasm-bindgen-runtime, dummy-runtime");
use futures::{future::BoxFuture, task::FutureObj};
use std::{future::Future, sync::OnceLock, time::Duration};
use tracing::Span;

#[cfg(feature = "wasm")]
mod wasm;

#[cfg(feature = "tokio-runtime")]
mod tokio;

static GLOBAL_ASYNC_MANAGER: OnceLock<Box<dyn AsyncManager>> = OnceLock::new();

/// Set a custom global async manager.
///
/// Call this once at startup to plug in a non-default async runtime. If not
/// called, the default manager for the enabled feature flag is used.
///
/// # Panics
/// Panics if called more than once.
pub fn set_global_async_manager(manager: Box<dyn AsyncManager>) {
GLOBAL_ASYNC_MANAGER
.set(manager)
.expect("Global async manager can only be set once.");
}

/// Get the default async manager based on enabled feature flags.
fn get_default_async_manager() -> Box<dyn AsyncManager> {
cfg_if::cfg_if! {
if #[cfg(feature = "wasm")] {
return Box::new(wasm::WasmBindgenAsyncManager::default());
} else if #[cfg(feature = "tokio-runtime")] {
return Box::new(tokio::TokioAsyncManager::default());
} else {
unimplemented!(
"No async runtime configured. Enable the `tokio-runtime` or `wasm` feature, \
or call `set_global_async_manager` before performing async operations."
);
}
}
}

fn get_global_async_manager() -> &'static dyn AsyncManager {
GLOBAL_ASYNC_MANAGER
.get_or_init(|| get_default_async_manager())
.as_ref()
}

/// Trait for async runtime abstraction in Buttplug.
///
/// Implement this trait to plug in a custom async runtime, then pass it to
/// [`set_global_async_manager`] before any async operations are performed.
///
/// Built-in implementations are provided for Tokio (via `tokio-runtime` feature)
/// and WASM (via `wasm` feature). For other runtimes (e.g. Embassy, esp-idf),
/// implement this trait and call [`set_global_async_manager`] at startup.
pub trait AsyncManager: std::fmt::Debug + Send + Sync {
/// Spawn a fire-and-forget task on the async runtime.
///
/// The `span` should be used to instrument the task with tracing context.
fn spawn(&self, future: FutureObj<'static, ()>, span: Span);

/// Sleep for the given duration.
fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()>;
}

/// Spawn a fire-and-forget task on the global async manager.
///
/// Prefer the [`spawn!`][crate::spawn] macro for ergonomic use with a task name.
pub fn spawn<F>(future: F, span: Span)
where
F: Future<Output = ()> + Send + 'static,
{
get_global_async_manager().spawn(Box::new(future).into(), span);
}

/// Sleep for the given duration using the global async manager.
pub async fn sleep(duration: Duration) {
get_global_async_manager().sleep(duration).await;
}

/// Spawn a fire-and-forget task on the global async manager.
/// Always prefer to add a name to the task for better tracing context.
#[macro_export]
macro_rules! spawn {
($future:expr) => {
$crate::util::async_manager::spawn($future, tracing::span!(tracing::Level::INFO, "Buttplug Async Task"))
};
($name:expr, $future:expr) => {
$crate::util::async_manager::spawn($future, tracing::span!(tracing::Level::INFO, $name))
};
}
38 changes: 10 additions & 28 deletions crates/buttplug_core/src/util/async_manager/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,19 @@
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

use futures::{
future::{Future, RemoteHandle},
task::{FutureObj, Spawn, SpawnError, SpawnExt},
};
use tokio;
use futures::{future::BoxFuture, task::FutureObj};
use std::time::Duration;
use tracing::{Instrument, Span};

#[derive(Default)]
#[derive(Default, Debug)]
pub struct TokioAsyncManager {}

impl Spawn for TokioAsyncManager {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
tokio::spawn(future);
Ok(())
impl super::AsyncManager for TokioAsyncManager {
fn spawn(&self, future: FutureObj<'static, ()>, span: Span) {
tokio::task::spawn(future.instrument(span));
}
}

pub fn spawn<Fut>(future: Fut)
where
Fut: Future<Output = ()> + Send + 'static,
{
// SAFETY: TokioAsyncManager::spawn_obj always returns Ok(()). The Result type is only
// present to satisfy the Spawn trait interface.
TokioAsyncManager::default()
.spawn(future)
.expect("TokioAsyncManager::spawn_obj always returns Ok")
}

pub fn spawn_with_handle<Fut>(future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
where
Fut: Future + Send + 'static,
Fut::Output: Send,
{
TokioAsyncManager::default().spawn_with_handle(future)
fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()> {
Box::pin(tokio::time::sleep(duration))
}
}
23 changes: 23 additions & 0 deletions crates/buttplug_core/src/util/async_manager/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Buttplug Rust Source Code File - See https://buttplug.io for more info.
//
// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
//
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

use futures::{future::BoxFuture, task::FutureObj};
use std::time::Duration;
use tracing::{Instrument, Span};

#[derive(Default, Debug)]
pub struct WasmBindgenAsyncManager {}

impl super::AsyncManager for WasmBindgenAsyncManager {
fn spawn(&self, future: FutureObj<'static, ()>, span: Span) {
wasm_bindgen_futures::spawn_local(future.instrument(span));
}

fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()> {
Box::pin(wasmtimer::tokio::sleep(duration))
}
}
Loading