From 83f5c1166854c2158081cf3c8e972ee2e2f099ca Mon Sep 17 00:00:00 2001 From: Josiah Bull Date: Wed, 20 May 2026 03:17:01 +0000 Subject: [PATCH 1/2] feat: minor fixes after using the library for real. --- crates/partly-proxy-lib/src/builder.rs | 121 +++++++++++++++++++--- crates/partly-proxy-lib/src/config.rs | 15 +++ crates/partly-proxy-lib/src/lib.rs | 4 +- crates/partly-proxy-lib/src/listener.rs | 18 +--- crates/partly-proxy-lib/src/middleware.rs | 8 ++ crates/partly-proxy-lib/src/replay.rs | 12 ++- crates/partly-proxy-lib/src/upstream.rs | 5 + 7 files changed, 150 insertions(+), 33 deletions(-) diff --git a/crates/partly-proxy-lib/src/builder.rs b/crates/partly-proxy-lib/src/builder.rs index 34b7678..b92cc52 100644 --- a/crates/partly-proxy-lib/src/builder.rs +++ b/crates/partly-proxy-lib/src/builder.rs @@ -13,28 +13,59 @@ use std::{ sync::Arc, }; +use bytes::Bytes; +use http::{StatusCode, header::CONTENT_TYPE}; use partly_proxy_types::{ProxyError, Result, SharedStorage}; use tokio::sync::watch; use crate::{ cluster::{ClusterHandle, RunningUpstream}, command, - config::{Mode, ProxyConfig, RecordingConfig}, + config::{Mode, ProxyConfig, RecordingConfig, UpstreamTarget}, control_plane, listener, middleware::{ProxyMiddleware, SharedMiddleware}, + proxy_io::{ProxyRequest, ProxyResponse}, recorder::Recorder, replay::ReplaySource, upstream::UpstreamRegistry, }; +/// Closure invoked when a request in [`Mode::Replay`] finds no matching stub +/// and no matching snapshot. Receives the unmatched request and returns the +/// response to send back to the caller. +pub type ReplayMissHandler = Arc ProxyResponse + Send + Sync>; + +pub(crate) fn default_replay_miss_handler() -> ReplayMissHandler { + Arc::new(|_req| { + ProxyResponse::new(StatusCode::SERVICE_UNAVAILABLE) + .with_header(CONTENT_TYPE, Bytes::from_static(b"application/json")) + .with_body(Bytes::from_static(b"{}")) + }) +} + /// Builder for a [`ClusterHandle`](crate::ClusterHandle). -#[derive(Default)] pub struct ProxyClusterBuilder { recording: RecordingConfig, + default_mode: Mode, upstreams: Vec, global_middleware: Vec, tcp_control_addr: Option, storage: Option, + replay_miss_handler: ReplayMissHandler, +} + +impl Default for ProxyClusterBuilder { + fn default() -> Self { + Self { + recording: RecordingConfig::default(), + default_mode: Mode::Record, + upstreams: Vec::new(), + global_middleware: Vec::new(), + tcp_control_addr: None, + storage: None, + replay_miss_handler: default_replay_miss_handler(), + } + } } impl std::fmt::Debug for ProxyClusterBuilder { @@ -59,6 +90,7 @@ pub(crate) struct UpstreamSpec { pub middleware: Vec, pub replay: Option, pub mode: Mode, + pub replay_miss_handler: ReplayMissHandler, } impl std::fmt::Debug for UpstreamSpec { @@ -84,23 +116,53 @@ impl ProxyClusterBuilder { self } + /// Set the default mode for all subsequently-added upstreams and update + /// the recording config accordingly (`Record` → enabled, `Replay` → + /// disabled). Eliminates the need to pass the same mode to every + /// `add_upstream_with*` call and to independently configure recording. + pub fn default_mode(mut self, mode: Mode) -> Self { + self.recording = match mode { + Mode::Record => RecordingConfig::default(), + Mode::Replay => RecordingConfig::disabled(), + }; + self.default_mode = mode; + self + } + + /// Override the handler invoked when a request in [`Mode::Replay`] finds + /// no matching stub and no matching snapshot. The closure receives the + /// unmatched [`ProxyRequest`] and returns the response sent to the caller. + /// + /// The default handler returns `503 {}` with `Content-Type: application/json`. + /// Use this to change the status, body, headers, or trigger a side-effect + /// (e.g. a structured log or metric) on miss. Applies to all + /// subsequently-added upstreams. + pub fn on_replay_miss(mut self, f: F) -> Self + where + F: Fn(ProxyRequest) -> ProxyResponse + Send + Sync + 'static, + { + self.replay_miss_handler = Arc::new(f); + self + } + /// Register an upstream with no per-upstream middleware and no replay /// source. Names should be unique; duplicates are surfaced by `run()`. - /// Defaults to [`Mode::Record`]. + /// Uses the builder's current [`default_mode`](Self::default_mode). pub fn add_upstream(mut self, name: impl Into, config: ProxyConfig) -> Self { self.upstreams.push(UpstreamSpec { name: name.into(), config, middleware: Vec::new(), replay: None, - mode: Mode::Record, + mode: self.default_mode, + replay_miss_handler: Arc::clone(&self.replay_miss_handler), }); self } /// Register an upstream with a list of per-upstream middleware. The /// effective chain for that upstream becomes `global ++ per_upstream`. - /// Defaults to [`Mode::Record`]. + /// Uses the builder's current [`default_mode`](Self::default_mode). pub fn add_upstream_with_middleware( mut self, name: impl Into, @@ -112,18 +174,20 @@ impl ProxyClusterBuilder { config, middleware, replay: None, - mode: Mode::Record, + mode: self.default_mode, + replay_miss_handler: Arc::clone(&self.replay_miss_handler), }); self } /// Register an upstream with both per-upstream middleware and an - /// optional replay source. Defaults to [`Mode::Record`]. + /// optional replay source. Uses the builder's current + /// [`default_mode`](Self::default_mode). /// /// See `SPECIFICATION.md` §8.3: in `Record` mode, stubs take priority /// over replay, which takes priority over the upstream forward. To - /// replay snapshots without ever forwarding to the upstream, use - /// [`Self::add_upstream_with_mode`] with [`Mode::Replay`]. + /// replay snapshots without ever forwarding to the upstream, call + /// [`Self::default_mode`] with [`Mode::Replay`] first. pub fn add_upstream_with( mut self, name: impl Into, @@ -136,17 +200,20 @@ impl ProxyClusterBuilder { config, middleware, replay, - mode: Mode::Record, + mode: self.default_mode, + replay_miss_handler: Arc::clone(&self.replay_miss_handler), }); self } - /// Register an upstream with an explicit [`Mode`]. + /// Register an upstream with an explicit [`Mode`], overriding + /// [`default_mode`](Self::default_mode) for this entry. /// /// In [`Mode::Replay`] the terminal never forwards to the upstream — a - /// missing snapshot yields a `503 {}` response. In [`Mode::Record`] the - /// terminal falls through to the upstream on miss and (when recording - /// is enabled) appends the exchange to the recorder. + /// missing snapshot yields the replay-miss response (default `503 {}`). + /// In [`Mode::Record`] the terminal falls through to the upstream on + /// miss and (when recording is enabled) appends the exchange to the + /// recorder. pub fn add_upstream_with_mode( mut self, name: impl Into, @@ -161,6 +228,32 @@ impl ProxyClusterBuilder { middleware, replay, mode, + replay_miss_handler: Arc::clone(&self.replay_miss_handler), + }); + self + } + + /// Register a stub — an upstream that never forwards to a real backend. + /// All requests are handled by `middleware`; anything that falls through + /// the chain invokes the replay-miss handler (default `503 {}`). + /// + /// Equivalent to `add_upstream_with_mode` with a dummy upstream target + /// and `Mode::Replay`, but without requiring callers to supply a + /// `ProxyConfig` with a meaningless upstream URL. + pub fn add_stub( + mut self, + name: impl Into, + bind_addr: SocketAddr, + middleware: Vec, + ) -> Self { + let config = ProxyConfig::http(bind_addr, UpstreamTarget::new("http://stub.internal:0")); + self.upstreams.push(UpstreamSpec { + name: name.into(), + config, + middleware, + replay: None, + mode: Mode::Replay, + replay_miss_handler: Arc::clone(&self.replay_miss_handler), }); self } diff --git a/crates/partly-proxy-lib/src/config.rs b/crates/partly-proxy-lib/src/config.rs index da46dab..9d023b0 100644 --- a/crates/partly-proxy-lib/src/config.rs +++ b/crates/partly-proxy-lib/src/config.rs @@ -231,6 +231,21 @@ pub enum Mode { Replay, } +impl std::str::FromStr for Mode { + type Err = partly_proxy_types::ProxyError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "record" => Ok(Mode::Record), + "replay" => Ok(Mode::Replay), + _ => Err(partly_proxy_types::ProxyError::Command(format!( + "invalid proxy mode {:?}: expected \"record\" or \"replay\"", + s + ))), + } + } +} + /// Outbound TLS settings — see `SPECIFICATION.md` §3.4. #[derive(Debug, Clone, Default)] pub struct UpstreamTlsConfig { diff --git a/crates/partly-proxy-lib/src/lib.rs b/crates/partly-proxy-lib/src/lib.rs index 5ab6f2f..14ec4b5 100644 --- a/crates/partly-proxy-lib/src/lib.rs +++ b/crates/partly-proxy-lib/src/lib.rs @@ -34,14 +34,14 @@ compile_error!( ); pub use assertions::TrafficFilter; -pub use builder::ProxyClusterBuilder; +pub use builder::{ProxyClusterBuilder, ReplayMissHandler}; pub use cluster::ClusterHandle; pub use command::{Command, CommandResponse, CommandSender}; pub use config::{ InboundTlsConfig, Mode, ProxyConfig, RecordingConfig, UpstreamTarget, UpstreamTlsConfig, }; pub use context::{RequestContext, ResponseSource}; -pub use middleware::{Next, ProxyMiddleware, SharedMiddleware, Terminal, TerminalFuture}; +pub use middleware::{Next, ProxyMiddleware, SharedMiddleware, Terminal, TerminalFuture, shared}; /// Re-export of the JSON-Lines snapshot backend, available when the /// `storage-jsonl` feature is on (which it is by default). #[cfg(feature = "storage-jsonl")] diff --git a/crates/partly-proxy-lib/src/listener.rs b/crates/partly-proxy-lib/src/listener.rs index c837cf0..704b7b9 100644 --- a/crates/partly-proxy-lib/src/listener.rs +++ b/crates/partly-proxy-lib/src/listener.rs @@ -96,6 +96,7 @@ pub(crate) async fn spawn_listener( middleware, spec.replay, spec.mode, + spec.replay_miss_handler, ); #[cfg(feature = "_otel_any")] let runtime = runtime.with_otel(otel_runtime); @@ -461,7 +462,7 @@ impl Terminal for LiveTerminal<'_> { match self.runtime.mode { Mode::Replay => { ctx.insert(ResponseSource::ReplayMiss); - Ok(replay_miss_response()) + Ok((self.runtime.replay_miss_handler)(req)) } Mode::Record => { // Stamp before awaiting so the marker is present even @@ -477,21 +478,6 @@ impl Terminal for LiveTerminal<'_> { } } -/// `503` response served when a request in [`Mode::Replay`] finds no -/// matching stub and no matching snapshot — the proxy must not forward. -fn replay_miss_response() -> ProxyResponse { - let mut headers = HeaderMap::new(); - headers.insert( - CONTENT_TYPE, - http::HeaderValue::from_static("application/json"), - ); - ProxyResponse { - status: StatusCode::SERVICE_UNAVAILABLE, - headers, - body: Bytes::from_static(b"{}"), - version: Version::HTTP_11, - } -} fn into_hyper(resp: ProxyResponse) -> Response> { let mut builder = Response::builder() diff --git a/crates/partly-proxy-lib/src/middleware.rs b/crates/partly-proxy-lib/src/middleware.rs index 65cad8b..7dda8b1 100644 --- a/crates/partly-proxy-lib/src/middleware.rs +++ b/crates/partly-proxy-lib/src/middleware.rs @@ -42,6 +42,14 @@ pub trait ProxyMiddleware: Send + Sync + 'static { /// Shared, cheaply-clonable middleware reference. pub type SharedMiddleware = Arc; +/// Wrap any [`ProxyMiddleware`] in an `Arc` to produce a [`SharedMiddleware`]. +/// +/// Avoids explicit `Arc::new(m)` at every call site when building +/// per-upstream middleware lists. +pub fn shared(m: M) -> SharedMiddleware { + Arc::new(m) +} + /// Boxed future returned by [`Terminal::invoke`]. pub type TerminalFuture<'a> = Pin> + Send + 'a>>; diff --git a/crates/partly-proxy-lib/src/replay.rs b/crates/partly-proxy-lib/src/replay.rs index 20e8fe1..064311e 100644 --- a/crates/partly-proxy-lib/src/replay.rs +++ b/crates/partly-proxy-lib/src/replay.rs @@ -113,6 +113,10 @@ impl ReplaySource { /// Stream an NDJSON file line-by-line into a replay source. /// + /// If the file does not exist an empty [`ReplaySource`] is returned — + /// this covers the common case where a snapshots file has not been + /// created yet (e.g. first run in record mode). + /// /// The loader reads one exchange per line and never materialises the /// whole file as a single string (per §8.1.1's 100k-exchange scale /// target). Each malformed line yields a `ProxyError::Recording`. @@ -122,7 +126,13 @@ impl ReplaySource { /// with whichever backend they prefer. #[cfg(feature = "storage-jsonl")] pub fn from_jsonl(path: impl AsRef, strategy: MatchStrategy) -> Result { - let file = std::fs::File::open(path).map_err(ProxyError::Recording)?; + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok(Self::new(Vec::new(), strategy)); + } + Err(e) => return Err(ProxyError::Recording(e)), + }; let reader = BufReader::new(file); let mut exchanges = Vec::new(); for (lineno, line) in reader.lines().enumerate() { diff --git a/crates/partly-proxy-lib/src/upstream.rs b/crates/partly-proxy-lib/src/upstream.rs index cb24a1b..e38bbab 100644 --- a/crates/partly-proxy-lib/src/upstream.rs +++ b/crates/partly-proxy-lib/src/upstream.rs @@ -57,6 +57,8 @@ pub(crate) struct UpstreamRuntime { pub replay: Option, /// What happens on a replay miss — see [`Mode`]. pub mode: Mode, + /// Called when a replay miss occurs to produce the response. + pub replay_miss_handler: crate::builder::ReplayMissHandler, /// OTEL-only fields. Populated via [`UpstreamRuntime::with_otel`]. #[cfg(feature = "_otel_any")] pub otel: OtelRuntime, @@ -71,6 +73,7 @@ impl UpstreamRuntime { middleware: Vec, replay: Option, mode: Mode, + replay_miss_handler: crate::builder::ReplayMissHandler, ) -> Self { let (pause, _rx) = watch::channel(false); Self { @@ -82,6 +85,7 @@ impl UpstreamRuntime { pause, replay, mode, + replay_miss_handler, #[cfg(feature = "_otel_any")] otel: OtelRuntime::default(), } @@ -116,6 +120,7 @@ impl UpstreamRuntime { Vec::new(), None, Mode::Record, + crate::builder::default_replay_miss_handler(), ) } } From be40d1cf007facc5c746988af601aa552ac20f01 Mon Sep 17 00:00:00 2001 From: Josiah Bull Date: Wed, 20 May 2026 03:25:20 +0000 Subject: [PATCH 2/2] chore: ran linting. --- crates/partly-proxy-lib/src/builder.rs | 2 +- crates/partly-proxy-lib/src/config.rs | 3 +-- crates/partly-proxy-lib/src/listener.rs | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/partly-proxy-lib/src/builder.rs b/crates/partly-proxy-lib/src/builder.rs index b92cc52..9a75995 100644 --- a/crates/partly-proxy-lib/src/builder.rs +++ b/crates/partly-proxy-lib/src/builder.rs @@ -79,7 +79,7 @@ impl std::fmt::Debug for ProxyClusterBuilder { .field("global_middleware", &self.global_middleware.len()) .field("tcp_control_addr", &self.tcp_control_addr) .field("storage", &self.storage.is_some()) - .finish() + .finish_non_exhaustive() } } diff --git a/crates/partly-proxy-lib/src/config.rs b/crates/partly-proxy-lib/src/config.rs index 9d023b0..340d1d7 100644 --- a/crates/partly-proxy-lib/src/config.rs +++ b/crates/partly-proxy-lib/src/config.rs @@ -239,8 +239,7 @@ impl std::str::FromStr for Mode { "record" => Ok(Mode::Record), "replay" => Ok(Mode::Replay), _ => Err(partly_proxy_types::ProxyError::Command(format!( - "invalid proxy mode {:?}: expected \"record\" or \"replay\"", - s + "invalid proxy mode {s:?}: expected \"record\" or \"replay\"" ))), } } diff --git a/crates/partly-proxy-lib/src/listener.rs b/crates/partly-proxy-lib/src/listener.rs index 704b7b9..8d03ea5 100644 --- a/crates/partly-proxy-lib/src/listener.rs +++ b/crates/partly-proxy-lib/src/listener.rs @@ -478,7 +478,6 @@ impl Terminal for LiveTerminal<'_> { } } - fn into_hyper(resp: ProxyResponse) -> Response> { let mut builder = Response::builder() .status(resp.status)