diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index b281cb7cda145..c0499b5559c99 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -42,7 +42,8 @@ use crate::metastore::multi_index::{ }; use crate::metastore::partition::PartitionIndexKey; use crate::metastore::replay_handle::{ - ReplayHandle, ReplayHandleIndexKey, ReplayHandleRocksIndex, ReplayHandleRocksTable, SeqPointer, + validate_seq_pointers_by_location, ReplayHandle, ReplayHandleIndexKey, ReplayHandleRocksIndex, + ReplayHandleRocksTable, SeqPointer, }; use crate::metastore::source::{ Source, SourceCredentials, SourceIndexKey, SourceRocksIndex, SourceRocksTable, @@ -4563,6 +4564,8 @@ impl MetaStore for RocksMetaStore { self.write_operation( "create_replay_handle_from_seq_pointers", move |db_ref, batch_pipe| { + let table = TableRocksTable::new(db_ref.clone()).get_row_or_not_found(table_id)?; + validate_seq_pointers_by_location(&table, &seq_pointers)?; let handle = ReplayHandle::new_from_seq_pointers(table_id, seq_pointers); Ok(ReplayHandleRocksTable::new(db_ref.clone()).insert(handle, batch_pipe)?) }, @@ -4638,7 +4641,7 @@ impl MetaStore for RocksMetaStore { return Err(CubeError::internal("Can't merge empty replay handles list".to_string())); } let table = ReplayHandleRocksTable::new(db_ref.clone()); - let chunks_table = ChunkRocksTable::new(db_ref); + let chunks_table = ChunkRocksTable::new(db_ref.clone()); let mut replay_handles: Vec> = Vec::new(); for id in old_ids.into_iter() { let replay_handle = table.get_row_or_not_found(id)?; @@ -4670,7 +4673,11 @@ impl MetaStore for RocksMetaStore { replay_handles.push(replay_handle); } let new_handle = if let Some(_) = new_seq_pointer { - let new_replay_handle = ReplayHandle::new_from_seq_pointers(replay_handles[0].get_row().table_id(), new_seq_pointer); + let table_id = replay_handles[0].get_row().table_id(); + let tables_table = TableRocksTable::new(db_ref.clone()); + let tables_row = tables_table.get_row_or_not_found(table_id)?; + validate_seq_pointers_by_location(&tables_row, &new_seq_pointer)?; + let new_replay_handle = ReplayHandle::new_from_seq_pointers(table_id, new_seq_pointer); Some(table.insert(new_replay_handle, batch_pipe)?) } else { @@ -7662,6 +7669,81 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn replay_handle_location_length_guard_test() -> Result<(), CubeError> { + let config = Config::test("replay_handle_location_length_guard_test"); + let store_path = env::current_dir()?.join("rh-guard-local"); + let remote_store_path = env::current_dir()?.join("rh-guard-remote"); + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); + + let meta_store = RocksMetaStore::new( + store_path.join("metastore").as_path(), + BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), + config.config_obj(), + )?; + + meta_store.create_schema("foo".to_string(), false).await?; + let mut columns = Vec::new(); + columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); + + let locations = vec![ + "stream://k/T/0".to_string(), + "stream://k/T/1".to_string(), + "stream://k/T/2".to_string(), + ]; + let table = meta_store + .create_table( + "foo".to_string(), + "boo".to_string(), + columns.clone(), + Some(locations), + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + + let mismatching = Some(vec![Some(SeqPointer::new(Some(0), Some(1))); 6]); + assert!(meta_store + .create_replay_handle_from_seq_pointers(table.get_id(), mismatching) + .await + .is_err()); + assert!(meta_store + .get_replay_handles_by_table(table.get_id()) + .await? + .is_empty()); + + let matching = Some(vec![Some(SeqPointer::new(Some(0), Some(1))); 3]); + meta_store + .create_replay_handle_from_seq_pointers(table.get_id(), matching) + .await?; + assert_eq!( + meta_store + .get_replay_handles_by_table(table.get_id()) + .await? + .len(), + 1 + ); + + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) + } } impl RocksMetaStore { diff --git a/rust/cubestore/cubestore/src/metastore/replay_handle.rs b/rust/cubestore/cubestore/src/metastore/replay_handle.rs index d3d07dfbd6004..3649f00ef8923 100644 --- a/rust/cubestore/cubestore/src/metastore/replay_handle.rs +++ b/rust/cubestore/cubestore/src/metastore/replay_handle.rs @@ -203,7 +203,7 @@ pub fn seq_pointer_for_location<'a>( )) })?; if locations.len() != seq_pointers_by_location.len() { - return Err(CubeError::internal(format!( + return Err(CubeError::corrupt_data(format!( "Location array size mismatch during accessing seq pointers: {:?} and {:?}", table.get_row().locations(), seq_pointers_by_location @@ -213,6 +213,25 @@ pub fn seq_pointer_for_location<'a>( Ok(&seq_pointers_by_location[pos]) } +pub fn validate_seq_pointers_by_location( + table: &IdRow, + seq_pointers_by_location: &Option>>, +) -> Result<(), CubeError> { + if let Some(seq_pointers) = seq_pointers_by_location { + let locations_len = table.get_row().locations().map(|l| l.len()).unwrap_or(0); + if locations_len != seq_pointers.len() { + return Err(CubeError::internal(format!( + "Refusing to persist replay handle for table {}: {} locations but {} seq pointers: {:?}", + table.get_id(), + locations_len, + seq_pointers.len(), + seq_pointers + ))); + } + } + Ok(()) +} + pub fn location_position(table: &IdRow
, location: &str) -> Result { let locations = table.get_row().locations().ok_or_else(|| { CubeError::internal(format!( @@ -365,3 +384,56 @@ impl RocksSecondaryIndex for ReplayHandleRoc *self as IndexId } } + +#[cfg(test)] +mod tests { + use super::*; + + fn table_with_locations(count: usize) -> IdRow
{ + let locations = (0..count).map(|i| format!("loc-{}", i)).collect::>(); + IdRow::new( + 1, + Table::new( + "t".to_string(), + 1, + Vec::new(), + Some(locations), + None, + true, + None, + None, + None, + None, + None, + None, + Vec::new(), + None, + None, + None, + ), + ) + } + + fn pointers(count: usize) -> Option>> { + Some(vec![Some(SeqPointer::new(Some(0), Some(1))); count]) + } + + #[test] + fn validate_matching_length_ok() { + let table = table_with_locations(3); + assert!(validate_seq_pointers_by_location(&table, &pointers(3)).is_ok()); + } + + #[test] + fn validate_mismatching_length_err() { + let table = table_with_locations(3); + assert!(validate_seq_pointers_by_location(&table, &pointers(6)).is_err()); + assert!(validate_seq_pointers_by_location(&table, &pointers(2)).is_err()); + } + + #[test] + fn validate_none_pointers_ok() { + let table = table_with_locations(3); + assert!(validate_seq_pointers_by_location(&table, &None).is_ok()); + } +} diff --git a/rust/cubestore/cubestore/src/scheduler/mod.rs b/rust/cubestore/cubestore/src/scheduler/mod.rs index 87e3037633693..464b6b7c7f3f3 100644 --- a/rust/cubestore/cubestore/src/scheduler/mod.rs +++ b/rust/cubestore/cubestore/src/scheduler/mod.rs @@ -6,7 +6,7 @@ use crate::metastore::partition::partition_file_name; use crate::metastore::replay_handle::ReplayHandle; use crate::metastore::replay_handle::{ subtract_from_right_seq_pointer_by_location, subtract_if_covers_seq_pointer_by_location, - union_seq_pointer_by_location, SeqPointerForLocation, + union_seq_pointer_by_location, SeqPointer, SeqPointerForLocation, }; use crate::metastore::table::Table; use crate::metastore::{ @@ -481,67 +481,85 @@ impl SchedulerImpl { .into_iter() .chunk_by(|(h, _)| h.get_row().table_id()) { - let mut seq_pointer_by_location = None; - let mut ids = Vec::new(); let handles = handles.collect::>(); - for (handle, _) in handles - .iter() - .filter(|(handle, no_active_chunks)| !is_newest_handle(handle) && *no_active_chunks) - { - union_seq_pointer_by_location( - &mut seq_pointer_by_location, - handle.get_row().seq_pointers_by_location(), - )?; - ids.push(handle.get_id()); - } let empty_vec = Vec::new(); let failed = table_to_failed.get(&table_id).unwrap_or(&empty_vec); - for (failed_handle, no_active_chunks) in failed.iter() { - let mut failed_seq_pointers = - failed_handle.get_row().seq_pointers_by_location().clone(); - let mut replay_after_failed_union = None; - let replay_after_failed = handles - .iter() - .filter(|(h, _)| { - h.get_id() > failed_handle.get_id() - && !h.get_row().has_failed_to_persist_chunks() - }) - .collect::>(); - for (replay, _) in replay_after_failed.iter() { - union_seq_pointer_by_location( - &mut replay_after_failed_union, - replay.get_row().seq_pointers_by_location(), - )?; - } - subtract_if_covers_seq_pointer_by_location( - &mut failed_seq_pointers, - &replay_after_failed_union, - )?; - let empty_seq_pointers = failed_seq_pointers - .map(|p| { - p.iter() - .all(|p| p.as_ref().map(|p| p.is_empty()).unwrap_or(true)) - }) - .unwrap_or(true); - if empty_seq_pointers && *no_active_chunks { - ids.push(failed_handle.get_id()); - } else if !empty_seq_pointers { - subtract_from_right_seq_pointer_by_location( - &mut seq_pointer_by_location, - failed_handle.get_row().seq_pointers_by_location(), - )?; + // Isolate per-table merge: a corrupt handle (e.g. seq pointer / location + // length mismatch) must not abort merging of the remaining tables. + let table_merge = + (|| -> Result<(Vec, Option>>), CubeError> { + let mut seq_pointer_by_location = None; + let mut ids = Vec::new(); + for (handle, _) in handles.iter().filter(|(handle, no_active_chunks)| { + !is_newest_handle(handle) && *no_active_chunks + }) { + union_seq_pointer_by_location( + &mut seq_pointer_by_location, + handle.get_row().seq_pointers_by_location(), + )?; + ids.push(handle.get_id()); + } + + for (failed_handle, no_active_chunks) in failed.iter() { + let mut failed_seq_pointers = + failed_handle.get_row().seq_pointers_by_location().clone(); + let mut replay_after_failed_union = None; + let replay_after_failed = handles + .iter() + .filter(|(h, _)| { + h.get_id() > failed_handle.get_id() + && !h.get_row().has_failed_to_persist_chunks() + }) + .collect::>(); + for (replay, _) in replay_after_failed.iter() { + union_seq_pointer_by_location( + &mut replay_after_failed_union, + replay.get_row().seq_pointers_by_location(), + )?; + } + subtract_if_covers_seq_pointer_by_location( + &mut failed_seq_pointers, + &replay_after_failed_union, + )?; + let empty_seq_pointers = failed_seq_pointers + .map(|p| { + p.iter() + .all(|p| p.as_ref().map(|p| p.is_empty()).unwrap_or(true)) + }) + .unwrap_or(true); + if empty_seq_pointers && *no_active_chunks { + ids.push(failed_handle.get_id()); + } else if !empty_seq_pointers { + subtract_from_right_seq_pointer_by_location( + &mut seq_pointer_by_location, + failed_handle.get_row().seq_pointers_by_location(), + )?; + } + } + + Ok((ids, seq_pointer_by_location)) + })(); + + match table_merge { + Ok(merge) => to_merge.push(merge), + Err(e) => { + error!("Skipping replay handle merge for table {}: {}", table_id, e); + continue; } } - - to_merge.push((ids, seq_pointer_by_location)); } for (ids, seq_pointer_by_location) in to_merge.into_iter() { if !ids.is_empty() { - self.meta_store + if let Err(e) = self + .meta_store .replace_replay_handles(ids, seq_pointer_by_location) - .await?; + .await + { + error!("Skipping replay handle merge: {}", e); + continue; + } } }