diff --git a/Cargo.lock b/Cargo.lock index 1850061..968e657 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1494,6 +1494,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "potential_utf" version = "0.1.5" @@ -1544,6 +1550,7 @@ dependencies = [ "clap", "flate2", "futures-util", + "http-body 1.0.1", "rustc-hash", "serde", "thiserror", @@ -1551,6 +1558,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "zstd", ] [[package]] @@ -2446,3 +2454,31 @@ name = "zmij" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index b1810a5..526fb00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ bytes = "1" clap = { version = "4.6", features = ["derive"] } flate2 = "1" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } +http-body = "1" rustc-hash = "2.1" serde = { version = "1", features = ["derive"] } thiserror = "2" @@ -30,3 +31,4 @@ tokio = { version = "1", features = ["rt-multi-thread", "signal"] } tower-http = { version = "0.6", features = ["cors", "trace"] } tracing = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi"] } +zstd = { version = "0.13", default-features = false } diff --git a/README.md b/README.md index 0553bbc..7eea999 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,6 @@ See [`valhalla_build_config`](https://github.com/valhalla/valhalla/blob/master/s ``` GET / Status: dataset_id, tile_count, s3_source, s3_etag GET /tiles/{tilePath} Tile by path (Valhalla-compatible) -GET /tiles/{tilePath}.gz Tile by path, gzip-compressed file GET /tiles_by_id/{tile_id} Tile by numeric packed ID GET /health Health check ``` @@ -65,13 +64,22 @@ Examples: - Level 2, tile index 818660 → `2/000/818/660.gph` - Level 0, tile index 529 → `0/000/529.gph` -## Gzip Support +## Compression -Tiles are compressed on the fly. Two modes cover different client expectations: +Rati negotiates the response encoding via `Accept-Encoding`. Both **gzip** and **zstd** +are supported on the wire; when both are accepted, rati prefers zstd (better ratio at +similar speed). -1. **Transparent compression** (`Accept-Encoding: gzip`) — Response includes `Content-Encoding: gzip`; compliant HTTP clients decompress automatically and see plain tile bytes. This is the mode Valhalla uses. +Tiles inside the archive can themselves be compressed — `.gph`, `.gph.gz`, or +`.gph.zst`. The on-disk compression is detected once at startup from the first tile's +filename suffix (assumed uniform across the archive) and rati decompresses transparently +when serving clients that asked for a different encoding. When the on-disk encoding +matches what the client wants, rati passes the bytes straight through — no decode, no +re-encode. -2. **Raw gzip file** (`.gz` extension) — Requesting `/tiles/2/000/818/660.gph.gz` returns the gzip stream as-is, without `Content-Encoding`. The response body *is* a gzip file — useful for saving compressed tiles to disk. +The HEAD path advertises `Content-Length` only when the response encoding matches what's +on disk (the only case where the index size equals the body size we'd send); otherwise +the header is omitted rather than fetching+decoding the tile just to measure it. ## CDN Headers @@ -83,7 +91,7 @@ Every tile response includes headers suitable for CDN caching: | `Last-Modified` | S3 object last-modified timestamp | | `Cache-Control` | `public, max-age=, immutable` — `` from `--cache-max-age` (default 86400) | | `X-Dataset-Id` | Auto-detected from `GraphTileHeader`, overridden with `--dataset-id`, or S3 ETag as fallback | -| `Vary` | `Accept-Encoding` — ensures correct CDN behavior with gzip negotiation | +| `Vary` | `Accept-Encoding` — ensures correct CDN behavior with encoding negotiation | | `Content-Type` | `application/octet-stream` | ## Dataset ID diff --git a/src/archive.rs b/src/archive.rs index 90ac3a2..3c90683 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -1,10 +1,18 @@ //! S3-backed tar archive: index parsing, tile lookups, and S3 I/O. //! //! Loads the tar index from an S3 object via range requests, then serves individual -//! tiles by reading their byte ranges on demand. Follows the same two-step protocol -//! as Valhalla's `GraphReader::load_remote_tar_offsets()`: -//! 1. Fetch bytes [0, 512) — the first tar header — and verify it's `index.bin`. -//! 2. Fetch bytes [512, 512 + size) — the raw index data — and parse it. +//! tiles by reading their byte ranges on demand. Startup performs a handful of small +//! range reads against S3 — no full download: +//! +//! 1. `HeadObject` for ETag, Last-Modified, and total size. +//! 2. First 512 bytes — the leading tar header, expected to name `index.bin`. If it +//! doesn't and `--scan-index` is set, fall back to [`scan_tar_headers`] which walks +//! the whole archive in 8 MB chunks with prefetch. +//! 3. The `index.bin` payload — parsed into a `TileId → (offset, size)` map. +//! 4. One more 512-byte header read to detect tile compression from the first tile's +//! filename suffix ([`detect_compression`]). +//! 5. A 272-byte read (or the full first tile when it's compressed) to extract the +//! dataset id from the `GraphTileHeader` ([`detect_dataset_id`]). use bytes::Bytes; use rustc_hash::FxHashMap; @@ -37,26 +45,63 @@ impl TileId { /// Parse a Valhalla tile path like `2/000/818/660.gph` into a packed ID. /// - /// Mirrors `get_tile_id()` from `valhalla_build_extract`: - /// strip extension, split off level, join remaining digits → `level | (index << 3)`. - /// - /// Accepts any single extension (`.gph`, `.csv`, `.spd`, etc.) or no extension at all. - /// The extension is not validated — the caller decides what content type is expected. + /// Mirrors `get_tile_id()` from `valhalla_build_extract`: split off level, drop any + /// suffix from the tail, join the remaining digits → `level | (index << 3)`. Numeric + /// tile names don't contain dots, so the first dot anywhere after the level is the + /// start of the suffix(es) (`.gph`, `.gph.zst`, etc.). pub fn from_path(path: &str) -> Option { - // Strip any single extension: find the last '.' that comes after the last '/' - let last_slash = path.rfind('/').unwrap_or(0); - let stem = match path[last_slash..].rfind('.') { - Some(dot_pos) => &path[..last_slash + dot_pos], - None => path, - }; - - let (level_str, idx_str) = stem.split_once('/')?; + let (level_str, rest) = path.split_once('/')?; let level: u32 = level_str.parse().ok()?; - let tile_index: u32 = idx_str.replace('/', "").parse().ok()?; + let digits = rest.split_once('.').map_or(rest, |(head, _)| head); + let tile_index: u32 = digits.replace('/', "").parse().ok()?; Some(Self(level | (tile_index << 3))) } } +/// Compression applied to each tile entry inside the tar. Uniform across an archive — detected +/// once at startup from the first tile's filename suffix. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TileCompression { + None, + Gzip, + Zstd, +} + +impl TileCompression { + fn from_name(name: &str) -> Self { + if name.ends_with(".zst") { + Self::Zstd + } else if name.ends_with(".gz") { + Self::Gzip + } else { + Self::None + } + } + + /// IANA token used on the wire in `Accept-Encoding` / `Content-Encoding`. + /// [`Self::None`] returns the conventional `"identity"`; callers emitting + /// `Content-Encoding` should suppress the header in that case rather than + /// sending `identity` (which is redundant). + pub const fn header_name(self) -> &'static str { + match self { + Self::None => "identity", + Self::Gzip => "gzip", + Self::Zstd => "zstd", + } + } +} + +impl std::fmt::Display for TileCompression { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Use a friendlier label for `None` in logs; the HTTP token is "identity" + // (see [`Self::header_name`]) but reads oddly outside an HTTP context. + f.write_str(match self { + Self::None => "none", + other => other.header_name(), + }) + } +} + impl std::fmt::Display for TileId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) @@ -223,6 +268,7 @@ pub struct S3Archive { bucket: Box, key: Box, index: TileIndex, + compression: TileCompression, } impl S3Archive { @@ -284,11 +330,16 @@ impl S3Archive { .try_into() .map_err(|_| S3Error::Protocol("tar header shorter than 512 bytes".into()))?; - // Step 2: Try to load index.bin; fall back to tar scan if missing and --scan-index is set - let index = match read_index_header(header) { + // Step 2: Try to load index.bin; fall back to tar scan if missing and --scan-index is set. + // The scan path sees filenames directly and detects compression inline; the index.bin + // path requires an extra 512-byte range request afterwards to read the first tile's + // tar header. + let (index, compression) = match read_index_header(header) { Ok((data_offset, data_size)) => { let index_bytes = get_range(&client, bucket, key, data_offset, data_size).await?; - parse_index(&index_bytes).map_err(S3Error::Tar)? + let index = parse_index(&index_bytes).map_err(S3Error::Tar)?; + let compression = detect_compression(&client, bucket, key, &index).await?; + (index, compression) } Err(TarError::FirstEntryNotIndex { .. }) if scan_index => { tracing::warn!( @@ -303,13 +354,15 @@ impl S3Archive { Err(e) => return Err(S3Error::Tar(e)), }; + tracing::info!("Tile compression: {compression}"); + // Step 3: Determine the dataset ID let dataset_id: Box = if let Some(override_id) = dataset_id_override { tracing::info!("Using CLI-provided dataset ID: {override_id}"); override_id.into() } else { // Try to auto-detect from the first tile's GraphTileHeader - match detect_dataset_id(&client, bucket, key, &index).await { + match detect_dataset_id(&client, bucket, key, &index, compression).await { Ok(id) => { tracing::info!("Auto-detected dataset ID from graph tile header: {id}"); id.to_string().into() @@ -332,6 +385,7 @@ impl S3Archive { bucket: bucket.into(), key: key.into(), index, + compression, }; let meta = ArchiveMeta { @@ -344,13 +398,14 @@ impl S3Archive { Ok((archive, meta)) } - /// Fetch a tile by its ID. Returns `None` if the tile is not in the index. + /// Fetch a tile's on-disk bytes — exactly what's stored in the tar, compressed or not. + /// Callers inspect [`compression`](Self::compression) to decide whether to decode. + /// Returns `None` if the tile is not in the index. pub async fn get_tile(&self, tile_id: TileId) -> Result, S3Error> { let Some(entry) = self.index.get(&tile_id) else { return Ok(None); }; - - let data = get_range( + let raw = get_range( &self.client, &self.bucket, &self.key, @@ -358,13 +413,39 @@ impl S3Archive { entry.size as u64, ) .await?; - Ok(Some(data)) + Ok(Some(raw)) } - /// Returns the uncompressed tile size from the index, or `None` if the tile doesn't exist. + /// Returns the on-disk tile size from the index, or `None` if the tile doesn't exist. + /// For compressed archives this is the compressed size, not the size the client receives. pub fn tile_size(&self, tile_id: TileId) -> Option { self.index.get(&tile_id).map(|e| e.size) } + + /// Compression scheme used for tiles inside this archive. + pub fn compression(&self) -> TileCompression { + self.compression + } +} + +/// Decompress `data` according to `compression`. Returns the input unchanged for `TileCompression::None`. +pub fn decompress(data: Bytes, compression: TileCompression) -> std::io::Result { + use std::io::Read; + match compression { + TileCompression::None => Ok(data), + TileCompression::Gzip => { + let mut decoder = flate2::read::GzDecoder::new(data.as_ref()); + let mut out = Vec::with_capacity(data.len() * 3); + decoder.read_to_end(&mut out)?; + Ok(Bytes::from(out)) + } + TileCompression::Zstd => { + let mut decoder = zstd::stream::Decoder::new(data.as_ref())?; + let mut out = Vec::with_capacity(data.len() * 4); + decoder.read_to_end(&mut out)?; + Ok(Bytes::from(out)) + } + } } async fn get_range( @@ -408,7 +489,7 @@ async fn scan_tar_headers( bucket: &str, key: &str, archive_size: u64, -) -> Result { +) -> Result<(TileIndex, TileCompression), S3Error> { use futures_util::{StreamExt, stream}; // Full planet tileset might have up to 205k tiles and occupy ~80GB, so making @@ -426,6 +507,7 @@ async fn scan_tar_headers( .buffered(PREFETCH); let mut entries = FxHashMap::default(); + let mut compression = TileCompression::None; let mut pos: u64 = 0; // Current chunk and its position within the archive @@ -485,6 +567,9 @@ async fn scan_tar_headers( header.name, ); } else { + if entries.is_empty() { + compression = TileCompression::from_name(&header.name); + } entries.insert( tile_id, TileEntry { @@ -509,13 +594,43 @@ async fn scan_tar_headers( entries.shrink_to_fit(); tracing::info!("Built index from tar scan: {} tiles", entries.len()); - Ok(entries) + Ok((entries, compression)) +} + +/// Read the tar header of the first tile listed in `index` and detect compression from its name. +/// +/// Index entries store only `tile_id`, not filenames, so we need one additional 512-byte range +/// request to recover the name. The tar header sits 512 bytes before the tile data. +async fn detect_compression( + client: &aws_sdk_s3::Client, + bucket: &str, + key: &str, + index: &TileIndex, +) -> Result { + let Some(entry) = index.values().next() else { + return Ok(TileCompression::None); // no tiles - no compression + }; + let header_bytes = get_range( + client, + bucket, + key, + entry.offset - TAR_BLOCK_SIZE as u64, + TAR_BLOCK_SIZE as u64, + ) + .await?; + let header: &[u8; TAR_BLOCK_SIZE] = header_bytes + .as_ref() + .try_into() + .map_err(|_| S3Error::Protocol("tile header shorter than 512 bytes".into()))?; + let parsed = TarHeader::parse(header).map_err(S3Error::Tar)?; + Ok(TileCompression::from_name(&parsed.name)) } /// Try to detect the dataset ID by reading the first tile's `GraphTileHeader`. /// -/// Picks an arbitrary tile from the index, reads the first 272 bytes (the header), -/// and extracts the `dataset_id` field (`u64` at byte offset 32). +/// Picks an arbitrary tile from the index, reads its first 272 bytes (the header), +/// and extracts the `dataset_id` field (`u64` at byte offset 32). For compressed +/// archives the whole tile is fetched and decompressed first. /// /// Returns an error if no tiles are in the index, the tile is too small, or /// the `dataset_id` field is zero (likely not a graph tile). @@ -524,22 +639,32 @@ async fn detect_dataset_id( bucket: &str, key: &str, index: &TileIndex, + compression: TileCompression, ) -> Result { let entry = index .values() .next() .ok_or_else(|| S3Error::Protocol("no tiles in index to read header from".into()))?; - // We need at least GRAPH_TILE_HEADER_SIZE bytes from the tile - let read_size = GRAPH_TILE_HEADER_SIZE as u64; - if (entry.size as u64) < read_size { - return Err(S3Error::Protocol(format!( - "tile is only {} bytes, too small for GraphTileHeader ({} bytes)", - entry.size, GRAPH_TILE_HEADER_SIZE - ))); - } - - let data = get_range(client, bucket, key, entry.offset, read_size).await?; + // For plain tiles we only need the first 272 bytes; for compressed tiles we have + // to fetch and decompress the whole thing to read any prefix of the plain data. + let data = match compression { + TileCompression::None => { + let read_size = GRAPH_TILE_HEADER_SIZE as u64; + if (entry.size as u64) < read_size { + return Err(S3Error::Protocol(format!( + "tile is only {} bytes, too small for GraphTileHeader ({} bytes)", + entry.size, GRAPH_TILE_HEADER_SIZE + ))); + } + get_range(client, bucket, key, entry.offset, read_size).await? + } + _ => { + let raw = get_range(client, bucket, key, entry.offset, entry.size as u64).await?; + decompress(raw, compression) + .map_err(|e| S3Error::Protocol(format!("decode tile for dataset_id: {e}")))? + } + }; let dataset_id = parse_dataset_id(&data)?; if dataset_id == 0 { @@ -674,6 +799,13 @@ mod tests { let id = TileId::from_path("0/000/529").unwrap(); assert_eq!(id.0, 529 << 3); + // compression suffix on top of an extension + let id = TileId::from_path("2/000/818/660.gph.zst").unwrap(); + assert_eq!(id.0, 2 | (818660 << 3)); + + let id = TileId::from_path("2/000/818/660.gph.gz").unwrap(); + assert_eq!(id.0, 2 | (818660 << 3)); + // invalid assert!(TileId::from_path("").is_none()); assert!(TileId::from_path("660.gph").is_none()); @@ -715,6 +847,47 @@ mod tests { assert!(!index.contains_key(&TileId::new(0xDEAD))); } + #[test] + fn detect_compression_from_name() { + assert_eq!( + TileCompression::from_name("2/000/818/660.gph"), + TileCompression::None + ); + assert_eq!( + TileCompression::from_name("2/000/818/660.gph.gz"), + TileCompression::Gzip + ); + assert_eq!( + TileCompression::from_name("2/000/818/660.gph.zst"), + TileCompression::Zstd + ); + assert_eq!( + TileCompression::from_name("index.bin"), + TileCompression::None + ); + } + + #[test] + fn decompress_round_trips() { + let plain = b"some tile bytes - graph header would go here".repeat(8); + + // None: passthrough + let out = decompress(Bytes::copy_from_slice(&plain), TileCompression::None).unwrap(); + assert_eq!(out.as_ref(), plain.as_slice()); + + // Gzip + let mut enc = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + std::io::Write::write_all(&mut enc, &plain).unwrap(); + let gz = enc.finish().unwrap(); + let out = decompress(Bytes::from(gz), TileCompression::Gzip).unwrap(); + assert_eq!(out.as_ref(), plain.as_slice()); + + // Zstd + let zst = zstd::encode_all(plain.as_slice(), 3).unwrap(); + let out = decompress(Bytes::from(zst), TileCompression::Zstd).unwrap(); + assert_eq!(out.as_ref(), plain.as_slice()); + } + #[test] fn reject_invalid_index_size() { let data = vec![0u8; 17]; // not a multiple of 16 diff --git a/src/main.rs b/src/main.rs index cbfe4d9..d52ea85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use axum::{ Router, extract::{Path, State}, - http::{HeaderMap, HeaderValue, Method, StatusCode}, + http::{HeaderMap, HeaderValue, Method, StatusCode, header}, middleware, response::{IntoResponse, Response}, routing::get, @@ -19,6 +19,8 @@ use tracing::info; mod archive; +use archive::TileCompression; + #[derive(Parser)] struct Config { /// S3 location of the archive: s3://bucket/key.tar @@ -157,42 +159,18 @@ async fn get_tile( return StatusCode::NOT_MODIFIED.into_response(); } - let (path, raw_gzip) = match path.strip_suffix(".gz") { - Some(base) => (base, true), - None => (path.as_str(), false), - }; - - let Some(tile_id) = archive::TileId::from_path(path) else { + let Some(tile_id) = archive::TileId::from_path(&path) else { return StatusCode::BAD_REQUEST.into_response(); }; - // HEAD: return Content-Length without fetching from S3. - // Must come before gzip — browsers send Accept-Encoding on HEAD too. - if method == Method::HEAD { - if raw_gzip || accepts_gzip(&headers) { - return tile_head_gzip(&state, tile_id).into_response(); - } - return tile_head(&state, tile_id).into_response(); - } - - // Mode 2: `.gz` extension — raw gzip file, no Content-Encoding - if raw_gzip { - return get_tile_data(&state, tile_id) - .await - .map(|data| Bytes::from(gzip_compress(&data))) - .into_response(); - } + let response_encoding = pick_response_encoding(&headers, state.archive.compression()); - // Mode 1: `Accept-Encoding: gzip` — compress on the fly with Content-Encoding - if accepts_gzip(&headers) { - return gzip_tile(&state, tile_id).await.into_response(); + if method == Method::HEAD { + return tile_head(&state, tile_id, response_encoding); } - - get_tile_data(&state, tile_id).await.into_response() + serve_tile(&state, tile_id, response_encoding).await } -/// Supports `Accept-Encoding: gzip` (mode 1) but not `.gz` extension (mode 2), -/// because numeric IDs have no file extension to append `.gz` to. async fn get_tile_by_id( method: Method, State(state): State, @@ -204,71 +182,164 @@ async fn get_tile_by_id( } let tile_id = archive::TileId::new(tile_id); + let response_encoding = pick_response_encoding(&headers, state.archive.compression()); if method == Method::HEAD { - if accepts_gzip(&headers) { - return tile_head_gzip(&state, tile_id).into_response(); - } - return tile_head(&state, tile_id).into_response(); + return tile_head(&state, tile_id, response_encoding); } + serve_tile(&state, tile_id, response_encoding).await +} + +/// Body that yields nothing and reports unknown size, so hyper doesn't auto-emit +/// `Content-Length: 0` from the size hint. Used for HEAD responses where we want +/// to omit `Content-Length` (the index size doesn't match what GET would return). +struct EmptyUnknownSize; + +impl http_body::Body for EmptyUnknownSize { + type Data = Bytes; + type Error = std::convert::Infallible; - if accepts_gzip(&headers) { - return gzip_tile(&state, tile_id).await.into_response(); + fn poll_frame( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + std::task::Poll::Ready(None) } - get_tile_data(&state, tile_id).await.into_response() + fn size_hint(&self) -> http_body::SizeHint { + http_body::SizeHint::default() + } } -/// HEAD for plain tiles: return Content-Length from the index without fetching from S3. -fn tile_head(state: &AppState, tile_id: archive::TileId) -> Result { - let size = state - .archive - .tile_size(tile_id) - .ok_or(StatusCode::NOT_FOUND)?; - Ok([(axum::http::header::CONTENT_LENGTH, size.to_string())]) +/// HEAD: never fetch tile bytes. Set `Content-Encoding` for compressed responses, +/// and advertise `Content-Length` only when the response encoding matches what's +/// already on disk — that's the only case where the index size equals the body size +/// we'd send. +/// +/// The body is [`EmptyUnknownSize`] rather than [`axum::body::Body::empty`] so hyper +/// doesn't auto-emit `Content-Length: 0` from the body's exact size hint when we +/// deliberately want the header omitted. +fn tile_head( + state: &AppState, + tile_id: archive::TileId, + response_encoding: TileCompression, +) -> Response { + let Some(size) = state.archive.tile_size(tile_id) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let mut response = Response::new(axum::body::Body::new(EmptyUnknownSize)); + let h = response.headers_mut(); + if response_encoding != TileCompression::None { + h.insert( + header::CONTENT_ENCODING, + HeaderValue::from_static(response_encoding.header_name()), + ); + } + if response_encoding == state.archive.compression() { + h.insert(header::CONTENT_LENGTH, HeaderValue::from(size)); + } + response } -async fn get_tile_data(state: &AppState, tile_id: archive::TileId) -> Result { - match state.archive.get_tile(tile_id).await { - Ok(Some(data)) => Ok(data), - Ok(None) => Err(StatusCode::NOT_FOUND), +/// GET: fetch the tile and produce bytes in the negotiated encoding. Pass-through when +/// the on-disk encoding already matches; otherwise decompress to plain and re-encode. +async fn serve_tile( + state: &AppState, + tile_id: archive::TileId, + response_encoding: TileCompression, +) -> Response { + let tile = match state.archive.get_tile(tile_id).await { + Ok(Some(b)) => b, + Ok(None) => return StatusCode::NOT_FOUND.into_response(), Err(e) => { tracing::error!(tile_id = %tile_id, "S3 error: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } + }; + + let compression = state.archive.compression(); + let body = if compression == response_encoding { + tile + } else { + let plain = match archive::decompress(tile, compression) { + Ok(b) => b, + Err(e) => { + tracing::error!(tile_id = %tile_id, "decode error: {e}"); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + }; + match response_encoding { + TileCompression::None => plain, + TileCompression::Gzip => Bytes::from(gzip_compress(&plain)), + TileCompression::Zstd => Bytes::from(zstd_compress(&plain)), + } + }; + + let mut response = body.into_response(); + if response_encoding != TileCompression::None { + response.headers_mut().insert( + header::CONTENT_ENCODING, + HeaderValue::from_static(response_encoding.header_name()), + ); } + response } -/// Compress tile on the fly and set `Content-Encoding: gzip`. -async fn gzip_tile( - state: &AppState, - tile_id: archive::TileId, -) -> Result { - let data = get_tile_data(state, tile_id).await?; - Ok(( - [(axum::http::header::CONTENT_ENCODING, "gzip")], - Bytes::from(gzip_compress(&data)), - )) -} +/// Pick the response encoding from `Accept-Encoding` and the on-disk compression. +/// +/// Single pass over the header populates `(zstd_ok, gzip_ok)`; the decision then +/// mirrors the negotiation table: +/// - Source matches an accepted encoding → pass through (no decode, no re-encode). +/// - Otherwise pick the best accepted encoding, preferring zstd over gzip. +/// - Nothing matches → plain. +fn pick_response_encoding(headers: &HeaderMap, source: TileCompression) -> TileCompression { + use TileCompression::*; + + const ZSTD: &str = TileCompression::Zstd.header_name(); + const GZIP: &str = TileCompression::Gzip.header_name(); + + let mut zstd_ok = false; + let mut gzip_ok = false; + if let Some(accept) = headers + .get(header::ACCEPT_ENCODING) + .and_then(|v| v.to_str().ok()) + { + for token in accept.split(',') { + let token = token.trim(); + let (name, params) = token.split_once(';').unwrap_or((token, "")); + // Per RFC 7231 §5.3.4: `;q=0` rejects the encoding. + let rejected = params.split(';').any(|p| { + p.trim() + .strip_prefix("q=") + .and_then(|q| q.trim().parse::().ok()) + .is_some_and(|q| q <= 0.0) + }); + if rejected { + continue; + } + if name.eq_ignore_ascii_case(ZSTD) { + zstd_ok = true; + } else if name.eq_ignore_ascii_case(GZIP) { + gzip_ok = true; + } + } + } -/// HEAD for gzip-encoded responses: only confirm the tile exists. No `Content-Length` — -/// we'd have to fetch and compress just to measure it, which defeats the point of HEAD. -fn tile_head_gzip( - state: &AppState, - tile_id: archive::TileId, -) -> Result { - state - .archive - .tile_size(tile_id) - .ok_or(StatusCode::NOT_FOUND)?; - Ok([(axum::http::header::CONTENT_ENCODING, "gzip")]) + match (source, zstd_ok, gzip_ok) { + (Zstd, true, _) => Zstd, // passthrough + (Gzip, _, true) => Gzip, // passthrough + (_, true, _) => Zstd, // re-encode, prefer zstd + (_, false, true) => Gzip, // re-encode to gzip + _ => None, + } } /// Per RFC 9110 §13.1.2, returns `true` if any ETag in `If-None-Match` matches /// (or the wildcard `*` is present), meaning the server should respond with 304. fn is_not_modified(request_headers: &HeaderMap, etag: &str) -> bool { request_headers - .get(axum::http::header::IF_NONE_MATCH) + .get(header::IF_NONE_MATCH) .and_then(|v| v.to_str().ok()) .is_some_and(|v| { v.split(',').any(|part| { @@ -278,30 +349,6 @@ fn is_not_modified(request_headers: &HeaderMap, etag: &str) -> bool { }) } -/// Per RFC 7231 section 5.3.4, `gzip;q=0` means gzip is explicitly unacceptable. -fn accepts_gzip(headers: &HeaderMap) -> bool { - headers - .get(axum::http::header::ACCEPT_ENCODING) - .and_then(|v| v.to_str().ok()) - .is_some_and(|v| { - v.split(',').any(|part| { - let part = part.trim(); - if !part.starts_with("gzip") { - return false; - } - let after_gzip = &part["gzip".len()..]; - if after_gzip.is_empty() { - return true; - } - if let Some(rest) = after_gzip.strip_prefix(";q=") { - rest.trim().parse::().unwrap_or(1.0) > 0.0 - } else { - true - } - }) - }) -} - fn gzip_compress(data: &[u8]) -> Vec { const GZIP_LEVEL: Compression = Compression::new(6); // good balance between size and performance let mut encoder = flate2::write::GzEncoder::new(Vec::new(), GZIP_LEVEL); @@ -311,27 +358,28 @@ fn gzip_compress(data: &[u8]) -> Vec { encoder.finish().expect("gzip finish on Vec cannot fail") } +fn zstd_compress(data: &[u8]) -> Vec { + zstd::encode_all(data, 0).expect("zstd encode to Vec cannot fail") +} + /// Derived once at startup; the `set_cache_headers` middleware merges these into every tile response. fn build_cache_headers(meta: &archive::ArchiveMeta, cache_max_age: u32) -> HeaderMap { let mut headers = HeaderMap::with_capacity(8); if let Ok(val) = HeaderValue::from_str(&meta.etag) { - headers.insert(axum::http::header::ETAG, val); + headers.insert(header::ETAG, val); } if let Ok(val) = HeaderValue::from_str(&meta.last_modified) { - headers.insert(axum::http::header::LAST_MODIFIED, val); + headers.insert(header::LAST_MODIFIED, val); } let cache_control = format!("public, max-age={cache_max_age}, immutable"); if let Ok(val) = HeaderValue::from_str(&cache_control) { - headers.insert(axum::http::header::CACHE_CONTROL, val); + headers.insert(header::CACHE_CONTROL, val); } - headers.insert( - axum::http::header::VARY, - HeaderValue::from_static("Accept-Encoding"), - ); + headers.insert(header::VARY, HeaderValue::from_static("Accept-Encoding")); if let Ok(val) = HeaderValue::from_str(&meta.dataset_id) { headers.insert("X-Dataset-Id", val); @@ -354,19 +402,16 @@ mod tests { }; let headers = build_cache_headers(&meta, 3600); - assert_eq!(headers.get(axum::http::header::ETAG).unwrap(), "\"abc123\""); + assert_eq!(headers.get(header::ETAG).unwrap(), "\"abc123\""); assert_eq!( - headers.get(axum::http::header::LAST_MODIFIED).unwrap(), + headers.get(header::LAST_MODIFIED).unwrap(), "Thu, 17 Apr 2025 12:00:00 GMT" ); assert_eq!( - headers.get(axum::http::header::CACHE_CONTROL).unwrap(), + headers.get(header::CACHE_CONTROL).unwrap(), "public, max-age=3600, immutable" ); - assert_eq!( - headers.get(axum::http::header::VARY).unwrap(), - "Accept-Encoding" - ); + assert_eq!(headers.get(header::VARY).unwrap(), "Accept-Encoding"); assert_eq!(headers.get("X-Dataset-Id").unwrap(), "42"); } @@ -381,7 +426,7 @@ mod tests { let headers = build_cache_headers(&meta, 86400); assert_eq!( - headers.get(axum::http::header::CACHE_CONTROL).unwrap(), + headers.get(header::CACHE_CONTROL).unwrap(), "public, max-age=86400, immutable" ); } @@ -397,7 +442,7 @@ mod tests { let headers = build_cache_headers(&meta, 0); assert_eq!( - headers.get(axum::http::header::CACHE_CONTROL).unwrap(), + headers.get(header::CACHE_CONTROL).unwrap(), "public, max-age=0, immutable" ); } @@ -406,10 +451,7 @@ mod tests { fn is_not_modified_test() { let with = |val: &'static str| { let mut h = HeaderMap::new(); - h.insert( - axum::http::header::IF_NONE_MATCH, - HeaderValue::from_static(val), - ); + h.insert(header::IF_NONE_MATCH, HeaderValue::from_static(val)); h }; let etag = "\"abc123\""; @@ -425,28 +467,44 @@ mod tests { assert!(!is_not_modified(&with("\"abc123-modified\""), etag)); } - #[test] - fn accepts_gzip_test() { - let with = |val: &'static str| { - let mut h = HeaderMap::new(); - h.insert( - axum::http::header::ACCEPT_ENCODING, - HeaderValue::from_static(val), - ); - h - }; + fn with_accept_encoding(val: &'static str) -> HeaderMap { + let mut h = HeaderMap::new(); + h.insert(header::ACCEPT_ENCODING, HeaderValue::from_static(val)); + h + } - // Accepts - assert!(accepts_gzip(&with("gzip, deflate, br"))); - assert!(accepts_gzip(&with("gzip"))); - assert!(accepts_gzip(&with("gzip;q=1.0, deflate;q=0.5"))); - assert!(accepts_gzip(&with("gzip;q=0.5"))); - - // Rejects - assert!(!accepts_gzip(&with("deflate, br"))); - assert!(!accepts_gzip(&HeaderMap::new())); - assert!(!accepts_gzip(&with("gzip;q=0, deflate"))); - assert!(!accepts_gzip(&with("gzip;q=0.0"))); + #[test] + fn pick_response_encoding_test() { + use TileCompression::*; + let pick = |hdr, src| pick_response_encoding(&with_accept_encoding(hdr), src); + + // Passthrough wins, even when other encodings are accepted. + assert_eq!(pick("gzip, zstd", Gzip), Gzip); + assert_eq!(pick("gzip, zstd", Zstd), Zstd); + assert_eq!(pick("gzip", Gzip), Gzip); + assert_eq!(pick("zstd", Zstd), Zstd); + + // Plain source: prefer zstd over gzip. + assert_eq!(pick("gzip, zstd", None), Zstd); + assert_eq!(pick("gzip", None), Gzip); + assert_eq!(pick("zstd", None), Zstd); + + // Re-encode when source isn't accepted. + assert_eq!(pick("gzip", Zstd), Gzip); + assert_eq!(pick("zstd", Gzip), Zstd); + + // q=0 rejects. + assert_eq!(pick("gzip;q=0, zstd", Gzip), Zstd); + assert_eq!(pick("gzip;q=0.0", Gzip), None); + + // q parsing still accepts positive quality values. + assert_eq!(pick("gzip;q=0.5", None), Gzip); + assert_eq!(pick("gzip;q=1.0, deflate;q=0.5", None), Gzip); + + // Nothing useful accepted. + assert_eq!(pick("deflate", Zstd), None); + assert_eq!(pick("gzip-foo", None), None); // prefix lookalike + assert_eq!(pick_response_encoding(&HeaderMap::new(), Gzip), None); } #[test]