Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 108 additions & 15 deletions crates/partly-proxy-lib/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,59 @@ use std::{
sync::Arc,
};

use bytes::Bytes;
use http::{StatusCode, header::CONTENT_TYPE};
use partly_proxy_types::{ProxyError, Result, SharedStorage};
use tokio::sync::watch;

use crate::{
cluster::{ClusterHandle, RunningUpstream},
command,
config::{Mode, ProxyConfig, RecordingConfig},
config::{Mode, ProxyConfig, RecordingConfig, UpstreamTarget},
control_plane, listener,
middleware::{ProxyMiddleware, SharedMiddleware},
proxy_io::{ProxyRequest, ProxyResponse},
recorder::Recorder,
replay::ReplaySource,
upstream::UpstreamRegistry,
};

/// Closure invoked when a request in [`Mode::Replay`] finds no matching stub
/// and no matching snapshot. Receives the unmatched request and returns the
/// response to send back to the caller.
pub type ReplayMissHandler = Arc<dyn Fn(ProxyRequest) -> ProxyResponse + Send + Sync>;

pub(crate) fn default_replay_miss_handler() -> ReplayMissHandler {
Arc::new(|_req| {
ProxyResponse::new(StatusCode::SERVICE_UNAVAILABLE)
.with_header(CONTENT_TYPE, Bytes::from_static(b"application/json"))
.with_body(Bytes::from_static(b"{}"))
})
}

/// Builder for a [`ClusterHandle`](crate::ClusterHandle).
#[derive(Default)]
pub struct ProxyClusterBuilder {
recording: RecordingConfig,
default_mode: Mode,
upstreams: Vec<UpstreamSpec>,
global_middleware: Vec<SharedMiddleware>,
tcp_control_addr: Option<SocketAddr>,
storage: Option<SharedStorage>,
replay_miss_handler: ReplayMissHandler,
}

impl Default for ProxyClusterBuilder {
fn default() -> Self {
Self {
recording: RecordingConfig::default(),
default_mode: Mode::Record,
upstreams: Vec::new(),
global_middleware: Vec::new(),
tcp_control_addr: None,
storage: None,
replay_miss_handler: default_replay_miss_handler(),
}
}
}

impl std::fmt::Debug for ProxyClusterBuilder {
Expand All @@ -48,7 +79,7 @@ impl std::fmt::Debug for ProxyClusterBuilder {
.field("global_middleware", &self.global_middleware.len())
.field("tcp_control_addr", &self.tcp_control_addr)
.field("storage", &self.storage.is_some())
.finish()
.finish_non_exhaustive()
}
}

Expand All @@ -59,6 +90,7 @@ pub(crate) struct UpstreamSpec {
pub middleware: Vec<SharedMiddleware>,
pub replay: Option<ReplaySource>,
pub mode: Mode,
pub replay_miss_handler: ReplayMissHandler,
}

impl std::fmt::Debug for UpstreamSpec {
Expand All @@ -84,23 +116,53 @@ impl ProxyClusterBuilder {
self
}

/// Set the default mode for all subsequently-added upstreams and update
/// the recording config accordingly (`Record` → enabled, `Replay` →
/// disabled). Eliminates the need to pass the same mode to every
/// `add_upstream_with*` call and to independently configure recording.
pub fn default_mode(mut self, mode: Mode) -> Self {
self.recording = match mode {
Mode::Record => RecordingConfig::default(),
Mode::Replay => RecordingConfig::disabled(),
};
self.default_mode = mode;
self
}

/// Override the handler invoked when a request in [`Mode::Replay`] finds
/// no matching stub and no matching snapshot. The closure receives the
/// unmatched [`ProxyRequest`] and returns the response sent to the caller.
///
/// The default handler returns `503 {}` with `Content-Type: application/json`.
/// Use this to change the status, body, headers, or trigger a side-effect
/// (e.g. a structured log or metric) on miss. Applies to all
/// subsequently-added upstreams.
pub fn on_replay_miss<F>(mut self, f: F) -> Self
where
F: Fn(ProxyRequest) -> ProxyResponse + Send + Sync + 'static,
{
self.replay_miss_handler = Arc::new(f);
self
}

/// Register an upstream with no per-upstream middleware and no replay
/// source. Names should be unique; duplicates are surfaced by `run()`.
/// Defaults to [`Mode::Record`].
/// Uses the builder's current [`default_mode`](Self::default_mode).
pub fn add_upstream(mut self, name: impl Into<String>, config: ProxyConfig) -> Self {
self.upstreams.push(UpstreamSpec {
name: name.into(),
config,
middleware: Vec::new(),
replay: None,
mode: Mode::Record,
mode: self.default_mode,
replay_miss_handler: Arc::clone(&self.replay_miss_handler),
});
self
}

/// Register an upstream with a list of per-upstream middleware. The
/// effective chain for that upstream becomes `global ++ per_upstream`.
/// Defaults to [`Mode::Record`].
/// Uses the builder's current [`default_mode`](Self::default_mode).
pub fn add_upstream_with_middleware(
mut self,
name: impl Into<String>,
Expand All @@ -112,18 +174,20 @@ impl ProxyClusterBuilder {
config,
middleware,
replay: None,
mode: Mode::Record,
mode: self.default_mode,
replay_miss_handler: Arc::clone(&self.replay_miss_handler),
});
self
}

/// Register an upstream with both per-upstream middleware and an
/// optional replay source. Defaults to [`Mode::Record`].
/// optional replay source. Uses the builder's current
/// [`default_mode`](Self::default_mode).
///
/// See `SPECIFICATION.md` §8.3: in `Record` mode, stubs take priority
/// over replay, which takes priority over the upstream forward. To
/// replay snapshots without ever forwarding to the upstream, use
/// [`Self::add_upstream_with_mode`] with [`Mode::Replay`].
/// replay snapshots without ever forwarding to the upstream, call
/// [`Self::default_mode`] with [`Mode::Replay`] first.
pub fn add_upstream_with(
mut self,
name: impl Into<String>,
Expand All @@ -136,17 +200,20 @@ impl ProxyClusterBuilder {
config,
middleware,
replay,
mode: Mode::Record,
mode: self.default_mode,
replay_miss_handler: Arc::clone(&self.replay_miss_handler),
});
self
}

/// Register an upstream with an explicit [`Mode`].
/// Register an upstream with an explicit [`Mode`], overriding
/// [`default_mode`](Self::default_mode) for this entry.
///
/// In [`Mode::Replay`] the terminal never forwards to the upstream — a
/// missing snapshot yields a `503 {}` response. In [`Mode::Record`] the
/// terminal falls through to the upstream on miss and (when recording
/// is enabled) appends the exchange to the recorder.
/// missing snapshot yields the replay-miss response (default `503 {}`).
/// In [`Mode::Record`] the terminal falls through to the upstream on
/// miss and (when recording is enabled) appends the exchange to the
/// recorder.
pub fn add_upstream_with_mode(
mut self,
name: impl Into<String>,
Expand All @@ -161,6 +228,32 @@ impl ProxyClusterBuilder {
middleware,
replay,
mode,
replay_miss_handler: Arc::clone(&self.replay_miss_handler),
});
self
}

/// Register a stub — an upstream that never forwards to a real backend.
/// All requests are handled by `middleware`; anything that falls through
/// the chain invokes the replay-miss handler (default `503 {}`).
///
/// Equivalent to `add_upstream_with_mode` with a dummy upstream target
/// and `Mode::Replay`, but without requiring callers to supply a
/// `ProxyConfig` with a meaningless upstream URL.
pub fn add_stub(
mut self,
name: impl Into<String>,
bind_addr: SocketAddr,
middleware: Vec<SharedMiddleware>,
) -> Self {
let config = ProxyConfig::http(bind_addr, UpstreamTarget::new("http://stub.internal:0"));
self.upstreams.push(UpstreamSpec {
name: name.into(),
config,
middleware,
replay: None,
mode: Mode::Replay,
replay_miss_handler: Arc::clone(&self.replay_miss_handler),
});
self
}
Expand Down
14 changes: 14 additions & 0 deletions crates/partly-proxy-lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,20 @@ pub enum Mode {
Replay,
}

impl std::str::FromStr for Mode {
type Err = partly_proxy_types::ProxyError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"record" => Ok(Mode::Record),
"replay" => Ok(Mode::Replay),
_ => Err(partly_proxy_types::ProxyError::Command(format!(
"invalid proxy mode {s:?}: expected \"record\" or \"replay\""
))),
}
}
}

/// Outbound TLS settings — see `SPECIFICATION.md` §3.4.
#[derive(Debug, Clone, Default)]
pub struct UpstreamTlsConfig {
Expand Down
4 changes: 2 additions & 2 deletions crates/partly-proxy-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ compile_error!(
);

pub use assertions::TrafficFilter;
pub use builder::ProxyClusterBuilder;
pub use builder::{ProxyClusterBuilder, ReplayMissHandler};
pub use cluster::ClusterHandle;
pub use command::{Command, CommandResponse, CommandSender};
pub use config::{
InboundTlsConfig, Mode, ProxyConfig, RecordingConfig, UpstreamTarget, UpstreamTlsConfig,
};
pub use context::{RequestContext, ResponseSource};
pub use middleware::{Next, ProxyMiddleware, SharedMiddleware, Terminal, TerminalFuture};
pub use middleware::{Next, ProxyMiddleware, SharedMiddleware, Terminal, TerminalFuture, shared};
/// Re-export of the JSON-Lines snapshot backend, available when the
/// `storage-jsonl` feature is on (which it is by default).
#[cfg(feature = "storage-jsonl")]
Expand Down
19 changes: 2 additions & 17 deletions crates/partly-proxy-lib/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub(crate) async fn spawn_listener(
middleware,
spec.replay,
spec.mode,
spec.replay_miss_handler,
);
#[cfg(feature = "_otel_any")]
let runtime = runtime.with_otel(otel_runtime);
Expand Down Expand Up @@ -461,7 +462,7 @@ impl Terminal for LiveTerminal<'_> {
match self.runtime.mode {
Mode::Replay => {
ctx.insert(ResponseSource::ReplayMiss);
Ok(replay_miss_response())
Ok((self.runtime.replay_miss_handler)(req))
}
Mode::Record => {
// Stamp before awaiting so the marker is present even
Expand All @@ -477,22 +478,6 @@ impl Terminal for LiveTerminal<'_> {
}
}

/// `503` response served when a request in [`Mode::Replay`] finds no
/// matching stub and no matching snapshot — the proxy must not forward.
fn replay_miss_response() -> ProxyResponse {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
http::HeaderValue::from_static("application/json"),
);
ProxyResponse {
status: StatusCode::SERVICE_UNAVAILABLE,
headers,
body: Bytes::from_static(b"{}"),
version: Version::HTTP_11,
}
}

fn into_hyper(resp: ProxyResponse) -> Response<Full<Bytes>> {
let mut builder = Response::builder()
.status(resp.status)
Expand Down
8 changes: 8 additions & 0 deletions crates/partly-proxy-lib/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ pub trait ProxyMiddleware: Send + Sync + 'static {
/// Shared, cheaply-clonable middleware reference.
pub type SharedMiddleware = Arc<dyn ProxyMiddleware>;

/// Wrap any [`ProxyMiddleware`] in an `Arc` to produce a [`SharedMiddleware`].
///
/// Avoids explicit `Arc::new(m)` at every call site when building
/// per-upstream middleware lists.
pub fn shared<M: ProxyMiddleware>(m: M) -> SharedMiddleware {
Arc::new(m)
}

/// Boxed future returned by [`Terminal::invoke`].
pub type TerminalFuture<'a> =
Pin<Box<dyn std::future::Future<Output = Result<ProxyResponse>> + Send + 'a>>;
Expand Down
12 changes: 11 additions & 1 deletion crates/partly-proxy-lib/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ impl ReplaySource {

/// Stream an NDJSON file line-by-line into a replay source.
///
/// If the file does not exist an empty [`ReplaySource`] is returned —
/// this covers the common case where a snapshots file has not been
/// created yet (e.g. first run in record mode).
///
/// The loader reads one exchange per line and never materialises the
/// whole file as a single string (per §8.1.1's 100k-exchange scale
/// target). Each malformed line yields a `ProxyError::Recording`.
Expand All @@ -122,7 +126,13 @@ impl ReplaySource {
/// with whichever backend they prefer.
#[cfg(feature = "storage-jsonl")]
pub fn from_jsonl(path: impl AsRef<Path>, strategy: MatchStrategy) -> Result<Self> {
let file = std::fs::File::open(path).map_err(ProxyError::Recording)?;
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(Self::new(Vec::new(), strategy));
}
Err(e) => return Err(ProxyError::Recording(e)),
};
let reader = BufReader::new(file);
let mut exchanges = Vec::new();
for (lineno, line) in reader.lines().enumerate() {
Expand Down
5 changes: 5 additions & 0 deletions crates/partly-proxy-lib/src/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub(crate) struct UpstreamRuntime {
pub replay: Option<ReplaySource>,
/// What happens on a replay miss — see [`Mode`].
pub mode: Mode,
/// Called when a replay miss occurs to produce the response.
pub replay_miss_handler: crate::builder::ReplayMissHandler,
/// OTEL-only fields. Populated via [`UpstreamRuntime::with_otel`].
#[cfg(feature = "_otel_any")]
pub otel: OtelRuntime,
Expand All @@ -71,6 +73,7 @@ impl UpstreamRuntime {
middleware: Vec<SharedMiddleware>,
replay: Option<ReplaySource>,
mode: Mode,
replay_miss_handler: crate::builder::ReplayMissHandler,
) -> Self {
let (pause, _rx) = watch::channel(false);
Self {
Expand All @@ -82,6 +85,7 @@ impl UpstreamRuntime {
pause,
replay,
mode,
replay_miss_handler,
#[cfg(feature = "_otel_any")]
otel: OtelRuntime::default(),
}
Expand Down Expand Up @@ -116,6 +120,7 @@ impl UpstreamRuntime {
Vec::new(),
None,
Mode::Record,
crate::builder::default_replay_miss_handler(),
)
}
}
Expand Down
Loading