diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 681cfa430f2..5305612b13a 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -1281,6 +1281,55 @@ impl DirectoryNamespace { .uri) } + /// Resolves a branch to its `(uri, object-store path)` for `create_table_version`. + /// + /// `BranchContents` is the source of truth, so check the ref first: a + /// registered branch commits directly. With no ref, accept the commit only on + /// an empty chain (the `create_branch` bootstrap, whose first commit precedes + /// its ref); reject a chain that already holds committed versions as a zombie. + async fn resolve_branch_for_commit( + &self, + table_uri: &str, + branch: &str, + ) -> Result<(String, Path)> { + let main = self + .configured_builder(table_uri) + .load() + .await + .map_err(|e| { + lance_core::Error::from(NamespaceError::TableNotFound { + message: format!("table at '{}' not found: {}", table_uri, e), + }) + })?; + let branch_location = main.branch_location().find_branch(Some(branch))?; + match main.branches().get(branch).await { + Ok(_) => Ok((branch_location.uri, branch_location.path)), + Err(lance_core::Error::RefNotFound { .. }) => { + if self + .branch_has_committed_versions(&branch_location.path) + .await? + { + return Err(NamespaceError::TableNotFound { + message: format!( + "branch '{}' not found for table at '{}'", + branch, table_uri + ), + } + .into()); + } + Ok((branch_location.uri, branch_location.path)) + } + Err(e) => Err(e), + } + } + + async fn branch_has_committed_versions(&self, branch_path: &Path) -> Result { + Ok(!self + .list_versions_under(branch_path, false, Some(1)) + .await? + .is_empty()) + } + fn validate_dir_only_properties( properties: Option<&HashMap>, operation: &str, @@ -1352,6 +1401,19 @@ impl DirectoryNamespace { limit: Option, ) -> Result> { let table_path = self.object_store_path_from_uri(table_uri)?; + self.list_versions_under(&table_path, descending, limit) + .await + } + + /// List committed manifest versions under `table_path/_versions/`. + /// `table_path` must be an object-store `Path`; converting a URI to a path + /// can miss manifests on Windows. + async fn list_versions_under( + &self, + table_path: &Path, + descending: bool, + limit: Option, + ) -> Result> { let versions_dir = table_path.clone().join(VERSIONS_DIR); let manifest_metas: Vec<_> = self .object_store @@ -1361,8 +1423,8 @@ impl DirectoryNamespace { .map_err(|e| { lance_core::Error::from(NamespaceError::Internal { message: format!( - "Failed to list manifest files for table at '{}': {}", - table_uri, e + "Failed to list manifest files under '{}': {}", + versions_dir, e ), }) })?; @@ -3139,16 +3201,17 @@ impl LanceNamespace for DirectoryNamespace { self.record_op("create_table_version"); let branch = Self::normalized_branch(request.branch.as_deref())?; let table_uri = self.resolve_table_location(&request.id).await?; - let table_uri = match branch { - Some(b) => self.resolve_branch_location(&table_uri, b).await?, - None => table_uri, + let (table_uri, table_path) = match branch { + Some(b) => self.resolve_branch_for_commit(&table_uri, b).await?, + None => { + let table_path = self.object_store_path_from_uri(&table_uri)?; + (table_uri, table_path) + } }; let staging_manifest_path = &request.manifest_path; let version = request.version as u64; - let table_path = self.object_store_path_from_uri(&table_uri)?; - // Determine naming scheme from request, default to V2 let naming_scheme = match request.naming_scheme.as_deref() { Some("V1") => ManifestNamingScheme::V1, @@ -5618,8 +5681,6 @@ mod tests { let (namespace, _temp_dir) = create_test_namespace().await; create_scalar_table(&namespace, "users").await; - // Stage a real (loadable) manifest under tree/ghost/_versions/ without - // create_branch, so the path exists but has no BranchContents ref. let dataset = open_dataset(&namespace, "users").await; let store = dataset.object_store(None).await.unwrap(); let manifest = store @@ -5644,15 +5705,15 @@ mod tests { .bytes() .await .unwrap(); - let zombie = Path::from(format!( - "{}/tree/ghost/_versions/{}", - dataset.branch_location().path, - manifest.location.filename().unwrap() - )); + let zombie = dataset + .branch_location() + .find_branch(Some("ghost")) + .unwrap() + .path + .join(VERSIONS_DIR) + .join(manifest.location.filename().unwrap()); store.inner.put(&zombie, bytes.into()).await.unwrap(); - // The directory is physically present, but the source of truth has no - // such branch -- this is what makes every op below reject it. assert!(dataset.branches().get("ghost").await.is_err()); fn rejected(label: &str, r: Result) { @@ -12448,4 +12509,31 @@ mod tests { let result = namespace.alter_table_drop_columns(request).await; assert!(result.is_err(), "Should fail when table does not exist"); } + + #[tokio::test] + async fn test_create_branch_on_managed_dataset_succeeds() { + use lance::dataset::builder::DatasetBuilder; + + let temp = TempStdDir::default(); + let ns = create_managed_namespace(temp.to_str().unwrap()).await; + let table_id = vec!["t".to_string()]; + let mut main = create_managed_table(&ns, &table_id).await; + + let fork_version = main.version().version; + let branch = main + .create_branch("exp", fork_version, None) + .await + .expect("create_branch failed"); + assert_eq!(branch.manifest.branch.as_deref(), Some("exp")); + assert_eq!(scan_id_column(&branch).await, vec![1, 2]); + + let reopened = DatasetBuilder::from_namespace(ns.clone(), table_id.clone()) + .await + .unwrap() + .with_branch("exp", None) + .load() + .await + .expect("reopen branch failed"); + assert_eq!(scan_id_column(&reopened).await, vec![1, 2]); + } }