diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..3ca04e0ed26 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -521,6 +521,16 @@ message FragmentReuseIndexDetails { // MemWAL Index Types // ============================================================================ +// Lifecycle status of a WAL shard. Drives drop-table two-phase commit: +// a SEALED shard refuses new writer claims (reversible) until the drop +// commits (the shard dir is deleted) or rolls back (status -> ACTIVE). +enum ShardStatus { + // Normal: the shard accepts writer claims. + ACTIVE = 0; + // A drop is in flight: claims are refused. Reversible to ACTIVE. + SEALED = 1; +} + // Shard manifest containing epoch-based fencing and WAL state. // Each shard has exactly one active writer at any time. message ShardManifest { @@ -566,6 +576,10 @@ message ShardManifest { // List of flushed MemTable generations and their directory paths. repeated FlushedGeneration flushed_generations = 8; + + // Lifecycle status. Default ACTIVE; SEALED marks an in-flight drop + // (drop-table 2PC). A SEALED manifest refuses claims at claim_epoch. + ShardStatus status = 15; } // A shard field value stored as raw Arrow scalar bytes. diff --git a/rust/lance-table/src/system_index/mem_wal.rs b/rust/lance-table/src/system_index/mem_wal.rs index 3bf279df062..1d82fd9e44f 100644 --- a/rust/lance-table/src/system_index/mem_wal.rs +++ b/rust/lance-table/src/system_index/mem_wal.rs @@ -141,6 +141,39 @@ impl TryFrom for IndexCatchupProgress { } } +/// Lifecycle status of a WAL shard, persisted in [`ShardManifest`]. +/// +/// `Sealed` is the durable in-doubt record for drop-table two-phase +/// commit: a sealed shard refuses new writer claims (enforced in +/// `claim_epoch`) but is reversible back to `Active` on rollback. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +pub enum ShardStatus { + /// Normal: the shard accepts writer claims. + #[default] + Active, + /// A drop is in flight: claims are refused. Reversible. + Sealed, +} + +impl ShardStatus { + /// Map to the protobuf enum discriminant (`pb::ShardStatus`). + fn to_i32(self) -> i32 { + match self { + Self::Active => 0, + Self::Sealed => 1, + } + } + + /// Map from the protobuf enum discriminant; unknown values decode as + /// `Active` (forward-compatible default). + fn from_i32(v: i32) -> Self { + match v { + 1 => Self::Sealed, + _ => Self::Active, + } + } +} + /// Shard manifest containing epoch-based fencing and WAL state. /// Each shard has exactly one active writer at any time. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -166,6 +199,9 @@ pub struct ShardManifest { pub wal_entry_position_last_seen: u64, pub current_generation: u64, pub flushed_generations: Vec, + /// Lifecycle status (drop-table 2PC). Defaults to `Active`; preserved + /// across claims via `..base` so only fresh constructions set it. + pub status: ShardStatus, } impl DeepSizeOf for ShardManifest { @@ -194,6 +230,7 @@ impl From<&ShardManifest> for pb::ShardManifest { wal_entry_position_last_seen: rm.wal_entry_position_last_seen, current_generation: rm.current_generation, flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(), + status: rm.status.to_i32(), } } } @@ -226,6 +263,7 @@ impl TryFrom for ShardManifest { .into_iter() .map(FlushedGeneration::from) .collect(), + status: ShardStatus::from_i32(rm.status), }) } } diff --git a/rust/lance/src/dataset/mem_wal/manifest.rs b/rust/lance/src/dataset/mem_wal/manifest.rs index 48df22dca88..b9d04afa23e 100644 --- a/rust/lance/src/dataset/mem_wal/manifest.rs +++ b/rust/lance/src/dataset/mem_wal/manifest.rs @@ -35,7 +35,7 @@ use bytes::Bytes; use futures::StreamExt; use futures::stream::FuturesUnordered; use lance_core::{Error, Result}; -use lance_index::mem_wal::ShardManifest; +use lance_index::mem_wal::{ShardManifest, ShardStatus}; use lance_io::object_store::ObjectStore; use lance_table::format::pb; use log::{info, warn}; @@ -147,6 +147,7 @@ impl ShardManifestStore { wal_entry_position_last_seen: 0, current_generation: 1, flushed_generations: vec![], + status: ShardStatus::Active, }; match self.write(&manifest).await { @@ -425,6 +426,21 @@ impl ShardManifestStore { for _ in 0..MAX_CLAIM_RETRIES { let current = self.read_latest().await?; + // A sealed shard is mid-drop (drop-table 2PC). Refuse the claim + // with a distinguishable error rather than minting a new epoch, + // so a caller that skips its own status check still cannot + // resurrect a shard being dropped. Sophon's reconcile keys on + // the "sealed" marker in this message to tell it apart from an + // ordinary epoch fence. + if let Some(m) = ¤t + && m.status == ShardStatus::Sealed + { + return Err(Error::invalid_input(format!( + "shard {} is sealed; refusing claim (drop in flight)", + self.shard_id + ))); + } + let (next_version, next_epoch, base_manifest) = match current { Some(m) => (m.version + 1, m.writer_epoch + 1, Some(m)), None => (1, 1, None), @@ -447,6 +463,7 @@ impl ShardManifestStore { wal_entry_position_last_seen: 0, current_generation: 1, flushed_generations: vec![], + status: ShardStatus::Active, } }; @@ -603,6 +620,7 @@ mod tests { wal_entry_position_last_seen: 0, current_generation: 1, flushed_generations: vec![], + status: ShardStatus::Active, } } @@ -752,6 +770,51 @@ mod tests { assert_eq!(second.wal_entry_position_last_seen, 42); } + #[tokio::test] + async fn test_claim_epoch_refuses_sealed_manifest() { + // A `Sealed` manifest is the drop-table 2PC in-doubt marker: + // `claim_epoch` must refuse it with a distinguishable error rather + // than mint a new epoch, so a sealed shard can't be resurrected even + // by a caller that skips its own status check. Rolling the status + // back to `Active` makes the shard claimable again (reversible). + let (store, base_path, _temp_dir) = create_local_store().await; + let shard_id = Uuid::new_v4(); + let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2); + + let (epoch, claimed) = manifest_store.claim_epoch(0).await.unwrap(); + assert_eq!(claimed.status, ShardStatus::Active); + + // Seal it (drop-table prepare). + let sealed = ShardManifest { + version: claimed.version + 1, + status: ShardStatus::Sealed, + ..claimed + }; + manifest_store.write(&sealed).await.unwrap(); + + // The claim is refused with the distinguishable "sealed" error — + // and the manifest is left untouched (no new epoch minted). + let err = manifest_store.claim_epoch(0).await.unwrap_err(); + assert!( + err.to_string().contains("sealed"), + "expected a distinguishable sealed-refusal error, got: {err}" + ); + let after = manifest_store.read_latest().await.unwrap().unwrap(); + assert_eq!(after.writer_epoch, sealed.writer_epoch, "no epoch minted"); + assert_eq!(after.status, ShardStatus::Sealed); + + // Roll back to Active (drop-table abort) → re-claimable. + let active = ShardManifest { + version: sealed.version + 1, + status: ShardStatus::Active, + ..sealed + }; + manifest_store.write(&active).await.unwrap(); + let (next_epoch, reclaimed) = manifest_store.claim_epoch(0).await.unwrap(); + assert!(next_epoch > epoch, "rolled-back shard mints the next epoch"); + assert_eq!(reclaimed.status, ShardStatus::Active); + } + #[tokio::test] async fn test_initialize_shard_rejects_conflict_with_mismatch() { let (store, base_path, _temp_dir) = create_local_store().await; diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 491bb68aec5..de83788b51e 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -1961,6 +1961,37 @@ impl ShardWriter { } } + /// Abort the writer without flushing. + /// + /// Shuts down the background flush tasks and leaves all buffered + /// memtable state to be dropped with the writer. Unlike + /// [`Self::close`], no WAL/MemTable flush is issued: pending in-memory + /// rows are discarded, not made durable, and no object-store IO is + /// performed. Used on drop-table, where the dataset directory is about + /// to be removed and a flush would only race fresh files back into a + /// doomed path. + /// + /// Caller-quiesce contract: `abort` takes `&self` (so it can be called + /// through the `Arc` callers hold) and therefore cannot + /// structurally bar a concurrent or subsequent `put` the way consuming + /// `close(self)` does. After abort the dispatchers are gone, so a later + /// `put` would buffer data that never flushes. Callers MUST stop + /// issuing writes before calling abort. + /// + /// Blocks until any flush already mid-`handle()` settles — + /// cancellation only fires between messages — so no flush task lingers + /// after abort returns. Idempotent: a second call re-cancels an + /// already-cancelled token and joins an already-emptied task set. + #[instrument(name = "sw_abort", level = "info", skip_all, fields(shard_id = %self.config.shard_id, epoch = self.epoch))] + pub async fn abort(&self) -> Result<()> { + info!( + "Aborting ShardWriter for shard {} (no flush)", + self.config.shard_id + ); + self.task_executor.shutdown_all().await?; + Ok(()) + } + /// Close the writer gracefully. /// /// Flushes pending data and shuts down background tasks. @@ -4310,6 +4341,63 @@ mod tests { writer.close().await.unwrap(); } + /// `abort` tears down the background flush tasks WITHOUT flushing — + /// buffered memtable rows are discarded, not sealed into an L0 + /// generation the way `close` would. Idempotent on a second call. + #[tokio::test] + async fn test_abort_discards_without_flushing_and_is_idempotent() { + let (store, base_path, base_uri, _temp_dir) = create_local_store().await; + let schema = create_test_schema(); + + // Thresholds high enough that nothing auto-flushes; the rows stay + // in the active memtable until abort discards them. + let config = ShardWriterConfig { + shard_id: Uuid::new_v4(), + shard_spec_id: 0, + durable_write: false, + sync_indexed_write: false, + max_wal_buffer_size: 64 * 1024 * 1024, + max_wal_flush_interval: None, + max_memtable_size: 64 * 1024 * 1024, + manifest_scan_batch_size: 2, + ..Default::default() + }; + + let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![]) + .await + .unwrap(); + + writer + .put(vec![create_test_batch(&schema, 0, 10)]) + .await + .unwrap(); + let flushed_before = writer + .manifest() + .await + .unwrap() + .map(|m| m.flushed_generations.len()) + .unwrap_or(0); + + writer.abort().await.unwrap(); + + // No generation was sealed — contrast with `close`, which flushes + // the 10 buffered rows into a new L0 generation. + let flushed_after = writer + .manifest() + .await + .unwrap() + .map(|m| m.flushed_generations.len()) + .unwrap_or(0); + assert_eq!( + flushed_after, flushed_before, + "abort must not flush a new L0 generation" + ); + + // Idempotent: re-cancels the already-cancelled token, joins an + // already-emptied task set. + writer.abort().await.unwrap(); + } + /// On a successful flush commit the sealed generation's rows land in the /// manifest immediately, but the in-memory handle is NOT dropped — it /// lingers for `frozen_memtable_grace` (so in-flight as-of reads keep