Skip to content

Commit 059508e

Browse files
committed
Add support for back-pressure in streaming API
1 parent dc7b7c1 commit 059508e

File tree

5 files changed

+84
-38
lines changed

5 files changed

+84
-38
lines changed

ehttp/src/streaming/mod.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,21 @@
44
//!
55
//! Example:
66
//! ```
7+
//! use std::time::Duration;
8+
//!
79
//! let your_chunk_handler = std::sync::Arc::new(|chunk: Vec<u8>| {
810
//! if chunk.is_empty() {
9-
//! return std::ops::ControlFlow::Break(());
11+
//! return ehttp::streaming::Flow::Break;
1012
//! }
1113
//!
1214
//! println!("received chunk: {} bytes", chunk.len());
13-
//! std::ops::ControlFlow::Continue(())
15+
//!
16+
//! // Example of back-pressure: wait if chunk is large
17+
//! if chunk.len() > 1024 * 1024 {
18+
//! ehttp::streaming::Flow::Wait(Duration::from_millis(100))
19+
//! } else {
20+
//! ehttp::streaming::Flow::Continue
21+
//! }
1422
//! });
1523
//!
1624
//! let url = "https://www.example.com";
@@ -20,17 +28,17 @@
2028
//! Ok(part) => part,
2129
//! Err(err) => {
2230
//! eprintln!("an error occurred while streaming `{url}`: {err}");
23-
//! return std::ops::ControlFlow::Break(());
31+
//! return ehttp::streaming::Flow::Break;
2432
//! }
2533
//! };
2634
//!
2735
//! match part {
2836
//! ehttp::streaming::Part::Response(response) => {
2937
//! println!("Status code: {:?}", response.status);
3038
//! if response.ok {
31-
//! std::ops::ControlFlow::Continue(())
39+
//! ehttp::streaming::Flow::Continue
3240
//! } else {
33-
//! std::ops::ControlFlow::Break(())
41+
//! ehttp::streaming::Flow::Break
3442
//! }
3543
//! }
3644
//! ehttp::streaming::Part::Chunk(chunk) => {
@@ -42,19 +50,19 @@
4250
//!
4351
//! The streaming fetch works like the non-streaming fetch, but instead
4452
//! of receiving the response in full, you receive parts of the response
45-
//! as they are streamed in.
46-
47-
use std::ops::ControlFlow;
53+
//! as they are streamed in. The callback can return [`Flow::Wait`] to implement
54+
//! back-pressure by pausing the stream for a specified duration.
4855
4956
use crate::Request;
5057

5158
/// Performs a HTTP requests and calls the given callback once for the initial response,
5259
/// and then once for each chunk in the response body.
5360
///
54-
/// You can abort the fetch by returning [`ControlFlow::Break`] from the callback.
61+
/// You can abort the fetch by returning [`types::Flow::Break`] from the callback,
62+
/// or implement back-pressure by returning [`types::Flow::Wait`] with a duration.
5563
pub fn fetch(
5664
request: Request,
57-
on_data: impl 'static + Send + Fn(crate::Result<types::Part>) -> ControlFlow<()>,
65+
on_data: impl 'static + Send + Fn(crate::Result<types::Part>) -> types::Flow,
5866
) {
5967
#[cfg(not(target_arch = "wasm32"))]
6068
native::fetch_streaming(request, Box::new(on_data));
@@ -75,4 +83,4 @@ pub use web::fetch_async_streaming;
7583

7684
mod types;
7785

78-
pub use self::types::Part;
86+
pub use self::types::{Flow, Part};

ehttp/src/streaming/native.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
use std::ops::ControlFlow;
1+
use std::{io::Read, thread};
22

33
use crate::Request;
44

5-
use super::Part;
5+
use super::{types::Flow, Part};
66
use crate::types::PartialResponse;
77

88
pub fn fetch_streaming_blocking(
99
request: Request,
10-
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
10+
on_data: Box<dyn Fn(crate::Result<Part>) -> Flow + Send>,
1111
) {
1212
let mut req = ureq::request(&request.method, &request.url);
1313

@@ -48,9 +48,11 @@ pub fn fetch_streaming_blocking(
4848
status_text,
4949
headers,
5050
};
51-
if on_data(Ok(Part::Response(response))).is_break() {
52-
return;
53-
};
51+
match on_data(Ok(Part::Response(response))) {
52+
Flow::Break => return,
53+
Flow::Wait(duration) => thread::sleep(duration),
54+
Flow::Continue => {}
55+
}
5456

5557
let mut reader = resp.into_reader();
5658
loop {
@@ -59,9 +61,11 @@ pub fn fetch_streaming_blocking(
5961
Ok(n) if n > 0 => {
6062
// clone data from buffer and clear it
6163
let chunk = buf[..n].to_vec();
62-
if on_data(Ok(Part::Chunk(chunk))).is_break() {
63-
return;
64-
};
64+
match on_data(Ok(Part::Chunk(chunk))) {
65+
Flow::Break => return,
66+
Flow::Wait(duration) => thread::sleep(duration),
67+
Flow::Continue => {}
68+
}
6569
}
6670
Ok(_) => {
6771
on_data(Ok(Part::Chunk(vec![])));
@@ -83,7 +87,7 @@ pub fn fetch_streaming_blocking(
8387

8488
pub(crate) fn fetch_streaming(
8589
request: Request,
86-
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
90+
on_data: Box<dyn Fn(crate::Result<Part>) -> Flow + Send>,
8791
) {
8892
std::thread::Builder::new()
8993
.name("ehttp".to_owned())

ehttp/src/streaming/types.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
1+
use std::time::Duration;
2+
13
use crate::types::PartialResponse;
24

5+
/// Flow control for streaming responses with back-pressure support.
6+
pub enum Flow {
7+
/// Continue processing immediately.
8+
Continue,
9+
10+
/// Stop processing permanently.
11+
Break,
12+
13+
/// Pause processing for the specified duration.
14+
///
15+
/// You can use this to apply back-pressure.
16+
Wait(Duration),
17+
}
18+
319
/// A piece streamed by [`crate::streaming::fetch`].
420
pub enum Part {
521
/// The header of the response.

ehttp/src/streaming/web.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
use std::ops::ControlFlow;
2-
31
use futures_util::Stream;
42
use futures_util::StreamExt;
53
use wasm_bindgen::prelude::*;
64

75
use crate::web::{fetch_base, get_response_base, spawn_future, string_from_fetch_error};
86
use crate::Request;
97

10-
use super::types::Part;
8+
use super::types::{Flow, Part};
119

1210
/// Only available when compiling for web.
1311
///
@@ -46,9 +44,24 @@ async fn fetch_jsvalue_stream(
4644
)
4745
}
4846

47+
async fn sleep(duration: std::time::Duration) {
48+
// Ignore all errors
49+
50+
let millis = duration.as_millis() as _;
51+
let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
52+
if let Some(window) = web_sys::window() {
53+
window
54+
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis)
55+
.ok();
56+
}
57+
};
58+
let p = js_sys::Promise::new(&mut cb);
59+
wasm_bindgen_futures::JsFuture::from(p).await.ok();
60+
}
61+
4962
pub(crate) fn fetch_streaming(
5063
request: Request,
51-
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
64+
on_data: Box<dyn Fn(crate::Result<Part>) -> Flow + Send>,
5265
) {
5366
spawn_future(async move {
5467
let mut stream = match fetch_jsvalue_stream(&request).await {
@@ -61,11 +74,11 @@ pub(crate) fn fetch_streaming(
6174

6275
while let Some(chunk) = stream.next().await {
6376
match chunk {
64-
Ok(chunk) => {
65-
if on_data(Ok(chunk)).is_break() {
66-
return;
67-
}
68-
}
77+
Ok(chunk) => match on_data(Ok(chunk)) {
78+
Flow::Break => return,
79+
Flow::Wait(duration) => sleep(duration).await,
80+
Flow::Continue => {}
81+
},
6982
Err(e) => {
7083
on_data(Err(string_from_fetch_error(e)));
7184
return;

example_eframe/src/app.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
2-
ops::ControlFlow,
32
sync::{Arc, Mutex},
3+
time::Duration,
44
};
55

66
use eframe::egui;
@@ -107,11 +107,11 @@ impl eframe::App for DemoApp {
107107
fn on_fetch_part(
108108
part: Result<ehttp::streaming::Part, String>,
109109
download_store: &mut Download,
110-
) -> ControlFlow<()> {
110+
) -> ehttp::streaming::Flow {
111111
let part = match part {
112112
Err(err) => {
113113
*download_store = Download::Done(Result::Err(err));
114-
return ControlFlow::Break(());
114+
return ehttp::streaming::Flow::Break;
115115
}
116116
Ok(part) => part,
117117
};
@@ -122,7 +122,7 @@ fn on_fetch_part(
122122
response,
123123
body: Vec::new(),
124124
};
125-
ControlFlow::Continue(())
125+
ehttp::streaming::Flow::Continue
126126
}
127127
ehttp::streaming::Part::Chunk(chunk) => {
128128
if let Download::StreamingInProgress { response, mut body } =
@@ -133,14 +133,19 @@ fn on_fetch_part(
133133
if chunk.is_empty() {
134134
// This was the last chunk.
135135
*download_store = Download::Done(Ok(response.complete(body)));
136-
ControlFlow::Break(())
136+
ehttp::streaming::Flow::Break
137137
} else {
138-
// More to come.
138+
// More to come - demonstrate back-pressure by adding a small delay for large chunks
139139
*download_store = Download::StreamingInProgress { response, body };
140-
ControlFlow::Continue(())
140+
if chunk.len() > 8192 {
141+
// Wait a bit for large chunks to demonstrate back-pressure
142+
ehttp::streaming::Flow::Wait(Duration::from_millis(10))
143+
} else {
144+
ehttp::streaming::Flow::Continue
145+
}
141146
}
142147
} else {
143-
ControlFlow::Break(()) // some data race - abort download.
148+
ehttp::streaming::Flow::Break // some data race - abort download.
144149
}
145150
}
146151
}

0 commit comments

Comments
 (0)