Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use crate::metastore::multi_index::{
};
use crate::metastore::partition::PartitionIndexKey;
use crate::metastore::replay_handle::{
ReplayHandle, ReplayHandleIndexKey, ReplayHandleRocksIndex, ReplayHandleRocksTable, SeqPointer,
validate_seq_pointers_by_location, ReplayHandle, ReplayHandleIndexKey, ReplayHandleRocksIndex,
ReplayHandleRocksTable, SeqPointer,
};
use crate::metastore::source::{
Source, SourceCredentials, SourceIndexKey, SourceRocksIndex, SourceRocksTable,
Expand Down Expand Up @@ -4563,6 +4564,8 @@ impl MetaStore for RocksMetaStore {
self.write_operation(
"create_replay_handle_from_seq_pointers",
move |db_ref, batch_pipe| {
let table = TableRocksTable::new(db_ref.clone()).get_row_or_not_found(table_id)?;
validate_seq_pointers_by_location(&table, &seq_pointers)?;
let handle = ReplayHandle::new_from_seq_pointers(table_id, seq_pointers);
Ok(ReplayHandleRocksTable::new(db_ref.clone()).insert(handle, batch_pipe)?)
},
Expand Down Expand Up @@ -4638,7 +4641,7 @@ impl MetaStore for RocksMetaStore {
return Err(CubeError::internal("Can't merge empty replay handles list".to_string()));
}
let table = ReplayHandleRocksTable::new(db_ref.clone());
let chunks_table = ChunkRocksTable::new(db_ref);
let chunks_table = ChunkRocksTable::new(db_ref.clone());
let mut replay_handles: Vec<IdRow<ReplayHandle>> = Vec::new();
for id in old_ids.into_iter() {
let replay_handle = table.get_row_or_not_found(id)?;
Expand Down Expand Up @@ -4670,7 +4673,11 @@ impl MetaStore for RocksMetaStore {
replay_handles.push(replay_handle);
}
let new_handle = if let Some(_) = new_seq_pointer {
let new_replay_handle = ReplayHandle::new_from_seq_pointers(replay_handles[0].get_row().table_id(), new_seq_pointer);
let table_id = replay_handles[0].get_row().table_id();
let tables_table = TableRocksTable::new(db_ref.clone());
let tables_row = tables_table.get_row_or_not_found(table_id)?;
validate_seq_pointers_by_location(&tables_row, &new_seq_pointer)?;
let new_replay_handle = ReplayHandle::new_from_seq_pointers(table_id, new_seq_pointer);
Some(table.insert(new_replay_handle, batch_pipe)?)

} else {
Expand Down Expand Up @@ -7662,6 +7669,81 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn replay_handle_location_length_guard_test() -> Result<(), CubeError> {
let config = Config::test("replay_handle_location_length_guard_test");
let store_path = env::current_dir()?.join("rh-guard-local");
let remote_store_path = env::current_dir()?.join("rh-guard-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());

let meta_store = RocksMetaStore::new(
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
)?;

meta_store.create_schema("foo".to_string(), false).await?;
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));

let locations = vec![
"stream://k/T/0".to_string(),
"stream://k/T/1".to_string(),
"stream://k/T/2".to_string(),
];
let table = meta_store
.create_table(
"foo".to_string(),
"boo".to_string(),
columns.clone(),
Some(locations),
None,
vec![],
true,
None,
None,
None,
None,
None,
None,
None,
None,
None,
false,
None,
)
.await?;

let mismatching = Some(vec![Some(SeqPointer::new(Some(0), Some(1))); 6]);
assert!(meta_store
.create_replay_handle_from_seq_pointers(table.get_id(), mismatching)
.await
.is_err());
assert!(meta_store
.get_replay_handles_by_table(table.get_id())
.await?
.is_empty());

let matching = Some(vec![Some(SeqPointer::new(Some(0), Some(1))); 3]);
meta_store
.create_replay_handle_from_seq_pointers(table.get_id(), matching)
.await?;
assert_eq!(
meta_store
.get_replay_handles_by_table(table.get_id())
.await?
.len(),
1
);

let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());

Ok(())
}
}

impl RocksMetaStore {
Expand Down
74 changes: 73 additions & 1 deletion rust/cubestore/cubestore/src/metastore/replay_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub fn seq_pointer_for_location<'a>(
))
})?;
if locations.len() != seq_pointers_by_location.len() {
return Err(CubeError::internal(format!(
return Err(CubeError::corrupt_data(format!(
"Location array size mismatch during accessing seq pointers: {:?} and {:?}",
table.get_row().locations(),
seq_pointers_by_location
Expand All @@ -213,6 +213,25 @@ pub fn seq_pointer_for_location<'a>(
Ok(&seq_pointers_by_location[pos])
}

pub fn validate_seq_pointers_by_location(
table: &IdRow<Table>,
seq_pointers_by_location: &Option<Vec<Option<SeqPointer>>>,
) -> Result<(), CubeError> {
if let Some(seq_pointers) = seq_pointers_by_location {
let locations_len = table.get_row().locations().map(|l| l.len()).unwrap_or(0);
if locations_len != seq_pointers.len() {
return Err(CubeError::internal(format!(
"Refusing to persist replay handle for table {}: {} locations but {} seq pointers: {:?}",
table.get_id(),
locations_len,
seq_pointers.len(),
seq_pointers
)));
}
}
Ok(())
}

pub fn location_position(table: &IdRow<Table>, location: &str) -> Result<usize, CubeError> {
let locations = table.get_row().locations().ok_or_else(|| {
CubeError::internal(format!(
Expand Down Expand Up @@ -365,3 +384,56 @@ impl RocksSecondaryIndex<ReplayHandle, ReplayHandleIndexKey> for ReplayHandleRoc
*self as IndexId
}
}

#[cfg(test)]
mod tests {
use super::*;

fn table_with_locations(count: usize) -> IdRow<Table> {
let locations = (0..count).map(|i| format!("loc-{}", i)).collect::<Vec<_>>();
IdRow::new(
1,
Table::new(
"t".to_string(),
1,
Vec::new(),
Some(locations),
None,
true,
None,
None,
None,
None,
None,
None,
Vec::new(),
None,
None,
None,
),
)
}

fn pointers(count: usize) -> Option<Vec<Option<SeqPointer>>> {
Some(vec![Some(SeqPointer::new(Some(0), Some(1))); count])
}

#[test]
fn validate_matching_length_ok() {
let table = table_with_locations(3);
assert!(validate_seq_pointers_by_location(&table, &pointers(3)).is_ok());
}

#[test]
fn validate_mismatching_length_err() {
let table = table_with_locations(3);
assert!(validate_seq_pointers_by_location(&table, &pointers(6)).is_err());
assert!(validate_seq_pointers_by_location(&table, &pointers(2)).is_err());
}

#[test]
fn validate_none_pointers_ok() {
let table = table_with_locations(3);
assert!(validate_seq_pointers_by_location(&table, &None).is_ok());
}
}
120 changes: 69 additions & 51 deletions rust/cubestore/cubestore/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::metastore::partition::partition_file_name;
use crate::metastore::replay_handle::ReplayHandle;
use crate::metastore::replay_handle::{
subtract_from_right_seq_pointer_by_location, subtract_if_covers_seq_pointer_by_location,
union_seq_pointer_by_location, SeqPointerForLocation,
union_seq_pointer_by_location, SeqPointer, SeqPointerForLocation,
};
use crate::metastore::table::Table;
use crate::metastore::{
Expand Down Expand Up @@ -481,67 +481,85 @@ impl SchedulerImpl {
.into_iter()
.chunk_by(|(h, _)| h.get_row().table_id())
{
let mut seq_pointer_by_location = None;
let mut ids = Vec::new();
let handles = handles.collect::<Vec<_>>();
for (handle, _) in handles
.iter()
.filter(|(handle, no_active_chunks)| !is_newest_handle(handle) && *no_active_chunks)
{
union_seq_pointer_by_location(
&mut seq_pointer_by_location,
handle.get_row().seq_pointers_by_location(),
)?;
ids.push(handle.get_id());
}
let empty_vec = Vec::new();
let failed = table_to_failed.get(&table_id).unwrap_or(&empty_vec);

for (failed_handle, no_active_chunks) in failed.iter() {
let mut failed_seq_pointers =
failed_handle.get_row().seq_pointers_by_location().clone();
let mut replay_after_failed_union = None;
let replay_after_failed = handles
.iter()
.filter(|(h, _)| {
h.get_id() > failed_handle.get_id()
&& !h.get_row().has_failed_to_persist_chunks()
})
.collect::<Vec<_>>();
for (replay, _) in replay_after_failed.iter() {
union_seq_pointer_by_location(
&mut replay_after_failed_union,
replay.get_row().seq_pointers_by_location(),
)?;
}
subtract_if_covers_seq_pointer_by_location(
&mut failed_seq_pointers,
&replay_after_failed_union,
)?;
let empty_seq_pointers = failed_seq_pointers
.map(|p| {
p.iter()
.all(|p| p.as_ref().map(|p| p.is_empty()).unwrap_or(true))
})
.unwrap_or(true);
if empty_seq_pointers && *no_active_chunks {
ids.push(failed_handle.get_id());
} else if !empty_seq_pointers {
subtract_from_right_seq_pointer_by_location(
&mut seq_pointer_by_location,
failed_handle.get_row().seq_pointers_by_location(),
)?;
// Isolate per-table merge: a corrupt handle (e.g. seq pointer / location
// length mismatch) must not abort merging of the remaining tables.
let table_merge =
(|| -> Result<(Vec<u64>, Option<Vec<Option<SeqPointer>>>), CubeError> {
let mut seq_pointer_by_location = None;
let mut ids = Vec::new();
for (handle, _) in handles.iter().filter(|(handle, no_active_chunks)| {
!is_newest_handle(handle) && *no_active_chunks
}) {
union_seq_pointer_by_location(
&mut seq_pointer_by_location,
handle.get_row().seq_pointers_by_location(),
)?;
ids.push(handle.get_id());
}

for (failed_handle, no_active_chunks) in failed.iter() {
let mut failed_seq_pointers =
failed_handle.get_row().seq_pointers_by_location().clone();
let mut replay_after_failed_union = None;
let replay_after_failed = handles
.iter()
.filter(|(h, _)| {
h.get_id() > failed_handle.get_id()
&& !h.get_row().has_failed_to_persist_chunks()
})
.collect::<Vec<_>>();
for (replay, _) in replay_after_failed.iter() {
union_seq_pointer_by_location(
&mut replay_after_failed_union,
replay.get_row().seq_pointers_by_location(),
)?;
}
subtract_if_covers_seq_pointer_by_location(
&mut failed_seq_pointers,
&replay_after_failed_union,
)?;
let empty_seq_pointers = failed_seq_pointers
.map(|p| {
p.iter()
.all(|p| p.as_ref().map(|p| p.is_empty()).unwrap_or(true))
})
.unwrap_or(true);
if empty_seq_pointers && *no_active_chunks {
ids.push(failed_handle.get_id());
} else if !empty_seq_pointers {
subtract_from_right_seq_pointer_by_location(
&mut seq_pointer_by_location,
failed_handle.get_row().seq_pointers_by_location(),
)?;
}
}

Ok((ids, seq_pointer_by_location))
})();

match table_merge {
Ok(merge) => to_merge.push(merge),
Err(e) => {
error!("Skipping replay handle merge for table {}: {}", table_id, e);
continue;
}
}

to_merge.push((ids, seq_pointer_by_location));
}

for (ids, seq_pointer_by_location) in to_merge.into_iter() {
if !ids.is_empty() {
self.meta_store
if let Err(e) = self
.meta_store
.replace_replay_handles(ids, seq_pointer_by_location)
.await?;
.await
{
error!("Skipping replay handle merge: {}", e);
continue;
}
}
}

Expand Down
Loading