Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions ehttp/src/streaming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@
//!
//! Example:
//! ```
//! use std::time::Duration;
//!
//! let your_chunk_handler = std::sync::Arc::new(|chunk: Vec<u8>| {
//! if chunk.is_empty() {
//! return std::ops::ControlFlow::Break(());
//! return ehttp::streaming::Flow::Break;
//! }
//!
//! println!("received chunk: {} bytes", chunk.len());
//! std::ops::ControlFlow::Continue(())
//!
//! // Example of back-pressure: wait if chunk is large
//! if chunk.len() > 1024 * 1024 {
//! ehttp::streaming::Flow::Wait(Duration::from_millis(100))
//! } else {
//! ehttp::streaming::Flow::Continue
//! }
//! });
//!
//! let url = "https://www.example.com";
Expand All @@ -20,17 +28,17 @@
//! Ok(part) => part,
//! Err(err) => {
//! eprintln!("an error occurred while streaming `{url}`: {err}");
//! return std::ops::ControlFlow::Break(());
//! return ehttp::streaming::Flow::Break;
//! }
//! };
//!
//! match part {
//! ehttp::streaming::Part::Response(response) => {
//! println!("Status code: {:?}", response.status);
//! if response.ok {
//! std::ops::ControlFlow::Continue(())
//! ehttp::streaming::Flow::Continue
//! } else {
//! std::ops::ControlFlow::Break(())
//! ehttp::streaming::Flow::Break
//! }
//! }
//! ehttp::streaming::Part::Chunk(chunk) => {
Expand All @@ -42,19 +50,19 @@
//!
//! The streaming fetch works like the non-streaming fetch, but instead
//! of receiving the response in full, you receive parts of the response
//! as they are streamed in.

use std::ops::ControlFlow;
//! as they are streamed in. The callback can return [`Flow::Wait`] to implement
//! back-pressure by pausing the stream for a specified duration.

use crate::Request;

/// Performs a HTTP requests and calls the given callback once for the initial response,
/// and then once for each chunk in the response body.
///
/// You can abort the fetch by returning [`ControlFlow::Break`] from the callback.
/// You can abort the fetch by returning [`types::Flow::Break`] from the callback,
/// or implement back-pressure by returning [`types::Flow::Wait`] with a duration.
pub fn fetch(
request: Request,
on_data: impl 'static + Send + Fn(crate::Result<types::Part>) -> ControlFlow<()>,
on_data: impl 'static + Send + Fn(crate::Result<types::Part>) -> types::Flow,
) {
#[cfg(not(target_arch = "wasm32"))]
native::fetch_streaming(request, Box::new(on_data));
Expand All @@ -75,4 +83,4 @@ pub use web::fetch_async_streaming;

mod types;

pub use self::types::Part;
pub use self::types::{Flow, Part};
24 changes: 14 additions & 10 deletions ehttp/src/streaming/native.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::ops::ControlFlow;
use std::{io::Read, thread};

use crate::Request;

use super::Part;
use super::{types::Flow, Part};
use crate::types::PartialResponse;

pub fn fetch_streaming_blocking(
request: Request,
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
on_data: Box<dyn Fn(crate::Result<Part>) -> Flow + Send>,
) {
let mut req = ureq::request(&request.method, &request.url);

Expand Down Expand Up @@ -48,9 +48,11 @@ pub fn fetch_streaming_blocking(
status_text,
headers,
};
if on_data(Ok(Part::Response(response))).is_break() {
return;
};
match on_data(Ok(Part::Response(response))) {
Flow::Break => return,
Flow::Wait(duration) => thread::sleep(duration),
Flow::Continue => {}
}

let mut reader = resp.into_reader();
loop {
Expand All @@ -59,9 +61,11 @@ pub fn fetch_streaming_blocking(
Ok(n) if n > 0 => {
// clone data from buffer and clear it
let chunk = buf[..n].to_vec();
if on_data(Ok(Part::Chunk(chunk))).is_break() {
return;
};
match on_data(Ok(Part::Chunk(chunk))) {
Flow::Break => return,
Flow::Wait(duration) => thread::sleep(duration),
Flow::Continue => {}
}
}
Ok(_) => {
on_data(Ok(Part::Chunk(vec![])));
Expand All @@ -83,7 +87,7 @@ pub fn fetch_streaming_blocking(

pub(crate) fn fetch_streaming(
request: Request,
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
on_data: Box<dyn Fn(crate::Result<Part>) -> Flow + Send>,
) {
std::thread::Builder::new()
.name("ehttp".to_owned())
Expand Down
16 changes: 16 additions & 0 deletions ehttp/src/streaming/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
use std::time::Duration;

use crate::types::PartialResponse;

/// Flow control for streaming responses with back-pressure support.
pub enum Flow {
/// Continue processing immediately.
Continue,

/// Stop processing permanently.
Break,

/// Pause processing for the specified duration.
///
/// You can use this to apply back-pressure.
Wait(Duration),
}

/// A piece streamed by [`crate::streaming::fetch`].
pub enum Part {
/// The header of the response.
Expand Down
31 changes: 22 additions & 9 deletions ehttp/src/streaming/web.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::ops::ControlFlow;

use futures_util::Stream;
use futures_util::StreamExt;
use wasm_bindgen::prelude::*;

use crate::web::{fetch_base, get_response_base, spawn_future, string_from_fetch_error};
use crate::Request;

use super::types::Part;
use super::types::{Flow, Part};

/// Only available when compiling for web.
///
Expand Down Expand Up @@ -46,9 +44,24 @@ async fn fetch_jsvalue_stream(
)
}

async fn sleep(duration: std::time::Duration) {
// Ignore all errors

let millis = duration.as_millis() as _;
let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
if let Some(window) = web_sys::window() {
window
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis)
.ok();
}
};
let p = js_sys::Promise::new(&mut cb);
wasm_bindgen_futures::JsFuture::from(p).await.ok();
}

pub(crate) fn fetch_streaming(
request: Request,
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
on_data: Box<dyn Fn(crate::Result<Part>) -> Flow + Send>,
) {
spawn_future(async move {
let mut stream = match fetch_jsvalue_stream(&request).await {
Expand All @@ -61,11 +74,11 @@ pub(crate) fn fetch_streaming(

while let Some(chunk) = stream.next().await {
match chunk {
Ok(chunk) => {
if on_data(Ok(chunk)).is_break() {
return;
}
}
Ok(chunk) => match on_data(Ok(chunk)) {
Flow::Break => return,
Flow::Wait(duration) => sleep(duration).await,
Flow::Continue => {}
},
Err(e) => {
on_data(Err(string_from_fetch_error(e)));
return;
Expand Down
21 changes: 13 additions & 8 deletions example_eframe/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
ops::ControlFlow,
sync::{Arc, Mutex},
time::Duration,
};

use eframe::egui;
Expand Down Expand Up @@ -107,11 +107,11 @@ impl eframe::App for DemoApp {
fn on_fetch_part(
part: Result<ehttp::streaming::Part, String>,
download_store: &mut Download,
) -> ControlFlow<()> {
) -> ehttp::streaming::Flow {
let part = match part {
Err(err) => {
*download_store = Download::Done(Result::Err(err));
return ControlFlow::Break(());
return ehttp::streaming::Flow::Break;
}
Ok(part) => part,
};
Expand All @@ -122,7 +122,7 @@ fn on_fetch_part(
response,
body: Vec::new(),
};
ControlFlow::Continue(())
ehttp::streaming::Flow::Continue
}
ehttp::streaming::Part::Chunk(chunk) => {
if let Download::StreamingInProgress { response, mut body } =
Expand All @@ -133,14 +133,19 @@ fn on_fetch_part(
if chunk.is_empty() {
// This was the last chunk.
*download_store = Download::Done(Ok(response.complete(body)));
ControlFlow::Break(())
ehttp::streaming::Flow::Break
} else {
// More to come.
// More to come - demonstrate back-pressure by adding a small delay for large chunks
*download_store = Download::StreamingInProgress { response, body };
ControlFlow::Continue(())
if chunk.len() > 8192 {
// Wait a bit for large chunks to demonstrate back-pressure
ehttp::streaming::Flow::Wait(Duration::from_millis(10))
} else {
ehttp::streaming::Flow::Continue
}
}
} else {
ControlFlow::Break(()) // some data race - abort download.
ehttp::streaming::Flow::Break // some data race - abort download.
}
}
}
Expand Down
Loading