Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,16 @@ message FragmentReuseIndexDetails {
// MemWAL Index Types
// ============================================================================

// Lifecycle status of a WAL shard. Drives drop-table two-phase commit:
// a SEALED shard refuses new writer claims (reversible) until the drop
// commits (the shard dir is deleted) or rolls back (status -> ACTIVE).
enum ShardStatus {
// Normal: the shard accepts writer claims.
ACTIVE = 0;
// A drop is in flight: claims are refused. Reversible to ACTIVE.
SEALED = 1;
}

// Shard manifest containing epoch-based fencing and WAL state.
// Each shard has exactly one active writer at any time.
message ShardManifest {
Expand Down Expand Up @@ -566,6 +576,10 @@ message ShardManifest {

// List of flushed MemTable generations and their directory paths.
repeated FlushedGeneration flushed_generations = 8;

// Lifecycle status. Default ACTIVE; SEALED marks an in-flight drop
// (drop-table 2PC). A SEALED manifest refuses claims at claim_epoch.
ShardStatus status = 15;
}

// A shard field value stored as raw Arrow scalar bytes.
Expand Down
38 changes: 38 additions & 0 deletions rust/lance-table/src/system_index/mem_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,39 @@ impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
}
}

/// Lifecycle status of a WAL shard, persisted in [`ShardManifest`].
///
/// `Sealed` is the durable in-doubt record for drop-table two-phase
/// commit: a sealed shard refuses new writer claims (enforced in
/// `claim_epoch`) but is reversible back to `Active` on rollback.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum ShardStatus {
/// Normal: the shard accepts writer claims.
#[default]
Active,
/// A drop is in flight: claims are refused. Reversible.
Sealed,
}

impl ShardStatus {
/// Map to the protobuf enum discriminant (`pb::ShardStatus`).
fn to_i32(self) -> i32 {
match self {
Self::Active => 0,
Self::Sealed => 1,
}
}

/// Map from the protobuf enum discriminant; unknown values decode as
/// `Active` (forward-compatible default).
fn from_i32(v: i32) -> Self {
match v {
1 => Self::Sealed,
_ => Self::Active,
}
}
}

/// Shard manifest containing epoch-based fencing and WAL state.
/// Each shard has exactly one active writer at any time.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -166,6 +199,9 @@ pub struct ShardManifest {
pub wal_entry_position_last_seen: u64,
pub current_generation: u64,
pub flushed_generations: Vec<FlushedGeneration>,
/// Lifecycle status (drop-table 2PC). Defaults to `Active`; preserved
/// across claims via `..base` so only fresh constructions set it.
pub status: ShardStatus,
}

impl DeepSizeOf for ShardManifest {
Expand Down Expand Up @@ -194,6 +230,7 @@ impl From<&ShardManifest> for pb::ShardManifest {
wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
current_generation: rm.current_generation,
flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
status: rm.status.to_i32(),
}
}
}
Expand Down Expand Up @@ -226,6 +263,7 @@ impl TryFrom<pb::ShardManifest> for ShardManifest {
.into_iter()
.map(FlushedGeneration::from)
.collect(),
status: ShardStatus::from_i32(rm.status),
})
}
}
Expand Down
65 changes: 64 additions & 1 deletion rust/lance/src/dataset/mem_wal/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use bytes::Bytes;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use lance_core::{Error, Result};
use lance_index::mem_wal::ShardManifest;
use lance_index::mem_wal::{ShardManifest, ShardStatus};
use lance_io::object_store::ObjectStore;
use lance_table::format::pb;
use log::{info, warn};
Expand Down Expand Up @@ -147,6 +147,7 @@ impl ShardManifestStore {
wal_entry_position_last_seen: 0,
current_generation: 1,
flushed_generations: vec![],
status: ShardStatus::Active,
};

match self.write(&manifest).await {
Expand Down Expand Up @@ -425,6 +426,21 @@ impl ShardManifestStore {
for _ in 0..MAX_CLAIM_RETRIES {
let current = self.read_latest().await?;

// A sealed shard is mid-drop (drop-table 2PC). Refuse the claim
// with a distinguishable error rather than minting a new epoch,
// so a caller that skips its own status check still cannot
// resurrect a shard being dropped. Sophon's reconcile keys on
// the "sealed" marker in this message to tell it apart from an
// ordinary epoch fence.
if let Some(m) = &current
&& m.status == ShardStatus::Sealed
{
return Err(Error::invalid_input(format!(
"shard {} is sealed; refusing claim (drop in flight)",
self.shard_id
)));
}

let (next_version, next_epoch, base_manifest) = match current {
Some(m) => (m.version + 1, m.writer_epoch + 1, Some(m)),
None => (1, 1, None),
Expand All @@ -447,6 +463,7 @@ impl ShardManifestStore {
wal_entry_position_last_seen: 0,
current_generation: 1,
flushed_generations: vec![],
status: ShardStatus::Active,
}
};

Expand Down Expand Up @@ -603,6 +620,7 @@ mod tests {
wal_entry_position_last_seen: 0,
current_generation: 1,
flushed_generations: vec![],
status: ShardStatus::Active,
}
}

Expand Down Expand Up @@ -752,6 +770,51 @@ mod tests {
assert_eq!(second.wal_entry_position_last_seen, 42);
}

#[tokio::test]
async fn test_claim_epoch_refuses_sealed_manifest() {
// A `Sealed` manifest is the drop-table 2PC in-doubt marker:
// `claim_epoch` must refuse it with a distinguishable error rather
// than mint a new epoch, so a sealed shard can't be resurrected even
// by a caller that skips its own status check. Rolling the status
// back to `Active` makes the shard claimable again (reversible).
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

let (epoch, claimed) = manifest_store.claim_epoch(0).await.unwrap();
assert_eq!(claimed.status, ShardStatus::Active);

// Seal it (drop-table prepare).
let sealed = ShardManifest {
version: claimed.version + 1,
status: ShardStatus::Sealed,
..claimed
};
manifest_store.write(&sealed).await.unwrap();

// The claim is refused with the distinguishable "sealed" error —
// and the manifest is left untouched (no new epoch minted).
let err = manifest_store.claim_epoch(0).await.unwrap_err();
assert!(
err.to_string().contains("sealed"),
"expected a distinguishable sealed-refusal error, got: {err}"
);
let after = manifest_store.read_latest().await.unwrap().unwrap();
assert_eq!(after.writer_epoch, sealed.writer_epoch, "no epoch minted");
assert_eq!(after.status, ShardStatus::Sealed);

// Roll back to Active (drop-table abort) → re-claimable.
let active = ShardManifest {
version: sealed.version + 1,
status: ShardStatus::Active,
..sealed
};
manifest_store.write(&active).await.unwrap();
let (next_epoch, reclaimed) = manifest_store.claim_epoch(0).await.unwrap();
assert!(next_epoch > epoch, "rolled-back shard mints the next epoch");
assert_eq!(reclaimed.status, ShardStatus::Active);
}

#[tokio::test]
async fn test_initialize_shard_rejects_conflict_with_mismatch() {
let (store, base_path, _temp_dir) = create_local_store().await;
Expand Down
88 changes: 88 additions & 0 deletions rust/lance/src/dataset/mem_wal/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,37 @@ impl ShardWriter {
}
}

/// Abort the writer without flushing.
///
/// Shuts down the background flush tasks and leaves all buffered
/// memtable state to be dropped with the writer. Unlike
/// [`Self::close`], no WAL/MemTable flush is issued: pending in-memory
/// rows are discarded, not made durable, and no object-store IO is
/// performed. Used on drop-table, where the dataset directory is about
/// to be removed and a flush would only race fresh files back into a
/// doomed path.
///
/// Caller-quiesce contract: `abort` takes `&self` (so it can be called
/// through the `Arc<ShardWriter>` callers hold) and therefore cannot
/// structurally bar a concurrent or subsequent `put` the way consuming
/// `close(self)` does. After abort the dispatchers are gone, so a later
/// `put` would buffer data that never flushes. Callers MUST stop
/// issuing writes before calling abort.
///
/// Blocks until any flush already mid-`handle()` settles —
/// cancellation only fires between messages — so no flush task lingers
/// after abort returns. Idempotent: a second call re-cancels an
/// already-cancelled token and joins an already-emptied task set.
#[instrument(name = "sw_abort", level = "info", skip_all, fields(shard_id = %self.config.shard_id, epoch = self.epoch))]
pub async fn abort(&self) -> Result<()> {
info!(
"Aborting ShardWriter for shard {} (no flush)",
self.config.shard_id
);
self.task_executor.shutdown_all().await?;
Ok(())
}

/// Close the writer gracefully.
///
/// Flushes pending data and shuts down background tasks.
Expand Down Expand Up @@ -4310,6 +4341,63 @@ mod tests {
writer.close().await.unwrap();
}

/// `abort` tears down the background flush tasks WITHOUT flushing —
/// buffered memtable rows are discarded, not sealed into an L0
/// generation the way `close` would. Idempotent on a second call.
#[tokio::test]
async fn test_abort_discards_without_flushing_and_is_idempotent() {
let (store, base_path, base_uri, _temp_dir) = create_local_store().await;
let schema = create_test_schema();

// Thresholds high enough that nothing auto-flushes; the rows stay
// in the active memtable until abort discards them.
let config = ShardWriterConfig {
shard_id: Uuid::new_v4(),
shard_spec_id: 0,
durable_write: false,
sync_indexed_write: false,
max_wal_buffer_size: 64 * 1024 * 1024,
max_wal_flush_interval: None,
max_memtable_size: 64 * 1024 * 1024,
manifest_scan_batch_size: 2,
..Default::default()
};

let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![])
.await
.unwrap();

writer
.put(vec![create_test_batch(&schema, 0, 10)])
.await
.unwrap();
let flushed_before = writer
.manifest()
.await
.unwrap()
.map(|m| m.flushed_generations.len())
.unwrap_or(0);

writer.abort().await.unwrap();

// No generation was sealed — contrast with `close`, which flushes
// the 10 buffered rows into a new L0 generation.
let flushed_after = writer
.manifest()
.await
.unwrap()
.map(|m| m.flushed_generations.len())
.unwrap_or(0);
assert_eq!(
flushed_after, flushed_before,
"abort must not flush a new L0 generation"
);

// Idempotent: re-cancels the already-cancelled token, joins an
// already-emptied task set.
writer.abort().await.unwrap();
}

/// On a successful flush commit the sealed generation's rows land in the
/// manifest immediately, but the in-memory handle is NOT dropped — it
/// lingers for `frozen_memtable_grace` (so in-flight as-of reads keep
Expand Down
Loading