diff --git a/ehttp/src/streaming/mod.rs b/ehttp/src/streaming/mod.rs index 522e096..3b2dd03 100644 --- a/ehttp/src/streaming/mod.rs +++ b/ehttp/src/streaming/mod.rs @@ -4,13 +4,21 @@ //! //! Example: //! ``` +//! use std::time::Duration; +//! //! let your_chunk_handler = std::sync::Arc::new(|chunk: Vec| { //! if chunk.is_empty() { -//! return std::ops::ControlFlow::Break(()); +//! return ehttp::streaming::Flow::Break; //! } //! //! println!("received chunk: {} bytes", chunk.len()); -//! std::ops::ControlFlow::Continue(()) +//! +//! // Example of back-pressure: wait if chunk is large +//! if chunk.len() > 1024 * 1024 { +//! ehttp::streaming::Flow::Wait(Duration::from_millis(100)) +//! } else { +//! ehttp::streaming::Flow::Continue +//! } //! }); //! //! let url = "https://www.example.com"; @@ -20,7 +28,7 @@ //! Ok(part) => part, //! Err(err) => { //! eprintln!("an error occurred while streaming `{url}`: {err}"); -//! return std::ops::ControlFlow::Break(()); +//! return ehttp::streaming::Flow::Break; //! } //! }; //! @@ -28,9 +36,9 @@ //! ehttp::streaming::Part::Response(response) => { //! println!("Status code: {:?}", response.status); //! if response.ok { -//! std::ops::ControlFlow::Continue(()) +//! ehttp::streaming::Flow::Continue //! } else { -//! std::ops::ControlFlow::Break(()) +//! ehttp::streaming::Flow::Break //! } //! } //! ehttp::streaming::Part::Chunk(chunk) => { @@ -42,19 +50,19 @@ //! //! The streaming fetch works like the non-streaming fetch, but instead //! of receiving the response in full, you receive parts of the response -//! as they are streamed in. - -use std::ops::ControlFlow; +//! as they are streamed in. The callback can return [`Flow::Wait`] to implement +//! back-pressure by pausing the stream for a specified duration. use crate::Request; /// Performs a HTTP requests and calls the given callback once for the initial response, /// and then once for each chunk in the response body. /// -/// You can abort the fetch by returning [`ControlFlow::Break`] from the callback. +/// You can abort the fetch by returning [`types::Flow::Break`] from the callback, +/// or implement back-pressure by returning [`types::Flow::Wait`] with a duration. pub fn fetch( request: Request, - on_data: impl 'static + Send + Fn(crate::Result) -> ControlFlow<()>, + on_data: impl 'static + Send + Fn(crate::Result) -> types::Flow, ) { #[cfg(not(target_arch = "wasm32"))] native::fetch_streaming(request, Box::new(on_data)); @@ -75,4 +83,4 @@ pub use web::fetch_async_streaming; mod types; -pub use self::types::Part; +pub use self::types::{Flow, Part}; diff --git a/ehttp/src/streaming/native.rs b/ehttp/src/streaming/native.rs index b26c4c6..f25d037 100644 --- a/ehttp/src/streaming/native.rs +++ b/ehttp/src/streaming/native.rs @@ -1,13 +1,13 @@ -use std::ops::ControlFlow; +use std::{io::Read, thread}; use crate::Request; -use super::Part; +use super::{types::Flow, Part}; use crate::types::PartialResponse; pub fn fetch_streaming_blocking( request: Request, - on_data: Box) -> ControlFlow<()> + Send>, + on_data: Box) -> Flow + Send>, ) { let mut req = ureq::request(&request.method, &request.url); @@ -48,9 +48,11 @@ pub fn fetch_streaming_blocking( status_text, headers, }; - if on_data(Ok(Part::Response(response))).is_break() { - return; - }; + match on_data(Ok(Part::Response(response))) { + Flow::Break => return, + Flow::Wait(duration) => thread::sleep(duration), + Flow::Continue => {} + } let mut reader = resp.into_reader(); loop { @@ -59,9 +61,11 @@ pub fn fetch_streaming_blocking( Ok(n) if n > 0 => { // clone data from buffer and clear it let chunk = buf[..n].to_vec(); - if on_data(Ok(Part::Chunk(chunk))).is_break() { - return; - }; + match on_data(Ok(Part::Chunk(chunk))) { + Flow::Break => return, + Flow::Wait(duration) => thread::sleep(duration), + Flow::Continue => {} + } } Ok(_) => { on_data(Ok(Part::Chunk(vec![]))); @@ -83,7 +87,7 @@ pub fn fetch_streaming_blocking( pub(crate) fn fetch_streaming( request: Request, - on_data: Box) -> ControlFlow<()> + Send>, + on_data: Box) -> Flow + Send>, ) { std::thread::Builder::new() .name("ehttp".to_owned()) diff --git a/ehttp/src/streaming/types.rs b/ehttp/src/streaming/types.rs index 647eabb..a0ace03 100644 --- a/ehttp/src/streaming/types.rs +++ b/ehttp/src/streaming/types.rs @@ -1,5 +1,21 @@ +use std::time::Duration; + use crate::types::PartialResponse; +/// Flow control for streaming responses with back-pressure support. +pub enum Flow { + /// Continue processing immediately. + Continue, + + /// Stop processing permanently. + Break, + + /// Pause processing for the specified duration. + /// + /// You can use this to apply back-pressure. + Wait(Duration), +} + /// A piece streamed by [`crate::streaming::fetch`]. pub enum Part { /// The header of the response. diff --git a/ehttp/src/streaming/web.rs b/ehttp/src/streaming/web.rs index 4f69563..feab226 100644 --- a/ehttp/src/streaming/web.rs +++ b/ehttp/src/streaming/web.rs @@ -1,5 +1,3 @@ -use std::ops::ControlFlow; - use futures_util::Stream; use futures_util::StreamExt; use wasm_bindgen::prelude::*; @@ -7,7 +5,7 @@ use wasm_bindgen::prelude::*; use crate::web::{fetch_base, get_response_base, spawn_future, string_from_fetch_error}; use crate::Request; -use super::types::Part; +use super::types::{Flow, Part}; /// Only available when compiling for web. /// @@ -46,9 +44,24 @@ async fn fetch_jsvalue_stream( ) } +async fn sleep(duration: std::time::Duration) { + // Ignore all errors + + let millis = duration.as_millis() as _; + let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| { + if let Some(window) = web_sys::window() { + window + .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis) + .ok(); + } + }; + let p = js_sys::Promise::new(&mut cb); + wasm_bindgen_futures::JsFuture::from(p).await.ok(); +} + pub(crate) fn fetch_streaming( request: Request, - on_data: Box) -> ControlFlow<()> + Send>, + on_data: Box) -> Flow + Send>, ) { spawn_future(async move { let mut stream = match fetch_jsvalue_stream(&request).await { @@ -61,11 +74,11 @@ pub(crate) fn fetch_streaming( while let Some(chunk) = stream.next().await { match chunk { - Ok(chunk) => { - if on_data(Ok(chunk)).is_break() { - return; - } - } + Ok(chunk) => match on_data(Ok(chunk)) { + Flow::Break => return, + Flow::Wait(duration) => sleep(duration).await, + Flow::Continue => {} + }, Err(e) => { on_data(Err(string_from_fetch_error(e))); return; diff --git a/example_eframe/src/app.rs b/example_eframe/src/app.rs index 6e8d8a1..e720c3a 100644 --- a/example_eframe/src/app.rs +++ b/example_eframe/src/app.rs @@ -1,6 +1,6 @@ use std::{ - ops::ControlFlow, sync::{Arc, Mutex}, + time::Duration, }; use eframe::egui; @@ -107,11 +107,11 @@ impl eframe::App for DemoApp { fn on_fetch_part( part: Result, download_store: &mut Download, -) -> ControlFlow<()> { +) -> ehttp::streaming::Flow { let part = match part { Err(err) => { *download_store = Download::Done(Result::Err(err)); - return ControlFlow::Break(()); + return ehttp::streaming::Flow::Break; } Ok(part) => part, }; @@ -122,7 +122,7 @@ fn on_fetch_part( response, body: Vec::new(), }; - ControlFlow::Continue(()) + ehttp::streaming::Flow::Continue } ehttp::streaming::Part::Chunk(chunk) => { if let Download::StreamingInProgress { response, mut body } = @@ -133,14 +133,19 @@ fn on_fetch_part( if chunk.is_empty() { // This was the last chunk. *download_store = Download::Done(Ok(response.complete(body))); - ControlFlow::Break(()) + ehttp::streaming::Flow::Break } else { - // More to come. + // More to come - demonstrate back-pressure by adding a small delay for large chunks *download_store = Download::StreamingInProgress { response, body }; - ControlFlow::Continue(()) + if chunk.len() > 8192 { + // Wait a bit for large chunks to demonstrate back-pressure + ehttp::streaming::Flow::Wait(Duration::from_millis(10)) + } else { + ehttp::streaming::Flow::Continue + } } } else { - ControlFlow::Break(()) // some data race - abort download. + ehttp::streaming::Flow::Break // some data race - abort download. } } }