Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 100 additions & 7 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,57 @@ impl DirectoryNamespace {
.uri)
}

/// Resolves a branch to its `(uri, object-store path)` for a
/// `create_table_version` commit, so the commit write and the zombie check
/// share one path.
///
/// `BranchContents` is the source of truth, so check the ref first: a
/// registered branch commits directly. With no ref, accept the commit only on
/// an empty chain (the `create_branch` bootstrap, whose first commit precedes
/// its ref); reject a chain that already holds committed versions as a zombie.
async fn resolve_branch_for_commit(
&self,
table_uri: &str,
branch: &str,
) -> Result<(String, Path)> {
let main = self
.configured_builder(table_uri)
.load()
.await
.map_err(|e| {
lance_core::Error::from(NamespaceError::TableNotFound {
message: format!("table at '{}' not found: {}", table_uri, e),
})
})?;
let branch_location = main.branch_location().find_branch(Some(branch))?;
match main.branches().get(branch).await {
Ok(_) => Ok((branch_location.uri, branch_location.path)),
Err(lance_core::Error::RefNotFound { .. }) => {
if self
.branch_has_committed_versions(&branch_location.path)
.await?
{
return Err(NamespaceError::TableNotFound {
message: format!(
"branch '{}' not found for table at '{}'",
branch, table_uri
),
}
.into());
}
Ok((branch_location.uri, branch_location.path))
}
Err(e) => Err(e),
}
}

async fn branch_has_committed_versions(&self, branch_path: &Path) -> Result<bool> {
Ok(!self
.list_versions_under(branch_path, false, Some(1))
.await?
.is_empty())
}

fn validate_dir_only_properties(
properties: Option<&HashMap<String, String>>,
operation: &str,
Expand Down Expand Up @@ -1352,6 +1403,20 @@ impl DirectoryNamespace {
limit: Option<i32>,
) -> Result<Vec<TableVersion>> {
let table_path = self.object_store_path_from_uri(table_uri)?;
self.list_versions_under(&table_path, descending, limit)
.await
}

/// List committed manifest versions under `table_path`'s `_versions/`
/// directory. Takes the object-store `Path` directly because converting a URI
/// back to a path (via `object_store_path_from_uri`) can diverge from the real
/// storage location on Windows and miss the manifests.
async fn list_versions_under(
&self,
table_path: &Path,
descending: bool,
limit: Option<i32>,
) -> Result<Vec<TableVersion>> {
let versions_dir = table_path.clone().join(VERSIONS_DIR);
let manifest_metas: Vec<_> = self
.object_store
Expand All @@ -1361,8 +1426,8 @@ impl DirectoryNamespace {
.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!(
"Failed to list manifest files for table at '{}': {}",
table_uri, e
"Failed to list manifest files under '{}': {}",
versions_dir, e
),
})
})?;
Expand Down Expand Up @@ -3139,16 +3204,17 @@ impl LanceNamespace for DirectoryNamespace {
self.record_op("create_table_version");
let branch = Self::normalized_branch(request.branch.as_deref())?;
let table_uri = self.resolve_table_location(&request.id).await?;
let table_uri = match branch {
Some(b) => self.resolve_branch_location(&table_uri, b).await?,
None => table_uri,
let (table_uri, table_path) = match branch {
Some(b) => self.resolve_branch_for_commit(&table_uri, b).await?,
None => {
let table_path = self.object_store_path_from_uri(&table_uri)?;
(table_uri, table_path)
}
};

let staging_manifest_path = &request.manifest_path;
let version = request.version as u64;

let table_path = self.object_store_path_from_uri(&table_uri)?;

// Determine naming scheme from request, default to V2
let naming_scheme = match request.naming_scheme.as_deref() {
Some("V1") => ManifestNamingScheme::V1,
Expand Down Expand Up @@ -12448,4 +12514,31 @@ mod tests {
let result = namespace.alter_table_drop_columns(request).await;
assert!(result.is_err(), "Should fail when table does not exist");
}

#[tokio::test]
async fn test_create_branch_on_managed_dataset_succeeds() {
use lance::dataset::builder::DatasetBuilder;

let temp = TempStdDir::default();
let ns = create_managed_namespace(temp.to_str().unwrap()).await;
let table_id = vec!["t".to_string()];
let mut main = create_managed_table(&ns, &table_id).await;

let fork_version = main.version().version;
let branch = main
.create_branch("exp", fork_version, None)
.await
.expect("create_branch failed");
assert_eq!(branch.manifest.branch.as_deref(), Some("exp"));
assert_eq!(scan_id_column(&branch).await, vec![1, 2]);

let reopened = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
.await
.unwrap()
.with_branch("exp", None)
.load()
.await
.expect("reopen branch failed");
assert_eq!(scan_id_column(&reopened).await, vec![1, 2]);
}
}
Loading