diff --git a/.changeset/faster-text-imports.md b/.changeset/faster-text-imports.md new file mode 100644 index 000000000..bb938da5e --- /dev/null +++ b/.changeset/faster-text-imports.md @@ -0,0 +1,6 @@ +--- +"loro-crdt": patch +"loro-crdt-map": patch +--- + +Improve text insert and snapshot import performance by avoiding duplicate text boundary validation and skipping eager imported change block parsing. diff --git a/Cargo.lock b/Cargo.lock index 52b43d90d..07bc3b280 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1873,7 +1873,7 @@ checksum = "3f3d053a135388e6b1df14e8af1212af5064746e9b87a06a345a7a779ee9695a" [[package]] name = "loro-wasm" -version = "1.13.3" +version = "1.13.4" dependencies = [ "console_error_panic_hook", "js-sys", diff --git a/crates/loro-internal/src/arena.rs b/crates/loro-internal/src/arena.rs index 8f1d36c69..bd8c04be0 100644 --- a/crates/loro-internal/src/arena.rs +++ b/crates/loro-internal/src/arena.rs @@ -782,7 +782,7 @@ mod tests { ContainerID::new_mergeable(&top_root, "profile", ContainerType::Map); let child = ContainerID::new_normal( loro_common::ID::new(1, 0), - ContainerType::Counter, + ContainerType::List, ); let mergeable_parent_for_resolver = mergeable_parent.clone(); diff --git a/crates/loro-internal/src/handler.rs b/crates/loro-internal/src/handler.rs index 56d4a408c..04a9f4090 100644 --- a/crates/loro-internal/src/handler.rs +++ b/crates/loro-internal/src/handler.rs @@ -1572,7 +1572,12 @@ impl TextHandler { return Ok(()); }; - if pos > self.len(pos_type) { + let len = self.len(pos_type); + if pos > len { + return Ok(()); + } + + if self.all_text_positions_are_boundaries(pos_type, len) { return Ok(()); } @@ -1586,6 +1591,15 @@ impl TextHandler { Ok(()) } + fn all_text_positions_are_boundaries(&self, pos_type: PosType, len: usize) -> bool { + match pos_type { + PosType::Bytes => len == self.len_unicode(), + PosType::Utf16 => len == self.len_unicode(), + PosType::Event if cfg!(feature = "wasm") => len == self.len_unicode(), + _ => false, + } + } + pub fn diagnose(&self) { match &self.inner { MaybeDetached::Detached(t) => { @@ -1851,18 +1865,18 @@ impl TextHandler { /// /// This method requires auto_commit to be enabled. pub fn insert(&self, pos: usize, s: &str, pos_type: PosType) -> LoroResult<()> { - let len = self.len(pos_type); - if pos > len { - return Err(LoroError::OutOfBound { - pos, - len, - info: format!("Position: {}:{}", file!(), line!()).into_boxed_str(), - }); - } - self.validate_text_boundary(pos, pos_type)?; - match &self.inner { MaybeDetached::Detached(t) => { + let len = self.len(pos_type); + if pos > len { + return Err(LoroError::OutOfBound { + pos, + len, + info: format!("Position: {}:{}", file!(), line!()).into_boxed_str(), + }); + } + self.validate_text_boundary(pos, pos_type)?; + let mut t = t.lock(); let (index, _) = t .value @@ -1876,6 +1890,19 @@ impl TextHandler { Ok(()) } MaybeDetached::Attached(a) => { + if s.is_empty() { + let len = self.len(pos_type); + if pos > len { + return Err(LoroError::OutOfBound { + pos, + len, + info: format!("Position: {}:{}", file!(), line!()).into_boxed_str(), + }); + } + self.validate_text_boundary(pos, pos_type)?; + return Ok(()); + } + a.with_txn(|txn| self.insert_with_txn(txn, pos, s, pos_type)) } } diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 750a8d7ce..0d5ae54aa 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -325,10 +325,11 @@ impl ChangeStore { pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) { self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded); let mut inner = self.inner.lock(); - for (_, block) in inner.mem_parsed_kv.iter_mut() { - block - .ensure_changes(&self.arena) - .expect("Parse block error"); + for (id, block) in inner.mem_parsed_kv.iter_mut() { + if let Err(err) = block.ensure_changes(&self.arena) { + warn!(block_id = ?id, ?err, "failed to parse change block"); + continue; + } for c in block.content.try_changes().unwrap() { f(c); } @@ -368,9 +369,10 @@ impl ChangeStore { return None; } - block - .ensure_changes(&self.arena) - .expect("Parse block error"); + if let Err(err) = block.ensure_changes(&self.arena) { + warn!(block_id = ?_id, ?err, "failed to parse change block"); + return None; + } let changes = block.content.try_changes().unwrap(); let start; let end; @@ -456,9 +458,10 @@ impl ChangeStore { return None; } - block - .ensure_changes(&self.arena) - .expect("Parse block error"); + if let Err(err) = block.ensure_changes(&self.arena) { + warn!(block_id = ?_id, ?err, "failed to parse change block"); + return None; + } Some(block.clone()) }) // TODO: PERF avoid alloc @@ -639,7 +642,6 @@ mod mut_external_kv { .import_all(bytes) .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?; drop(kv_store); - self.validate_imported_change_blocks()?; let vv_bytes = self.external_kv.lock().get(VV_KEY).unwrap_or_default(); let vv = VersionVector::decode(&vv_bytes) .map_err(|_| LoroError::DecodeDataCorruptionError)?; @@ -720,24 +722,6 @@ mod mut_external_kv { }) } - fn validate_imported_change_blocks(&self) -> LoroResult<()> { - let blocks: Vec<(ID, Bytes)> = { - let kv_store = self.external_kv.lock(); - kv_store - .scan(Bound::Unbounded, Bound::Unbounded) - .filter(|(id, _)| id.len() == 12) - .map(|(id, bytes)| (ID::from_bytes(&id), bytes)) - .collect() - }; - - for (_id, bytes) in blocks { - let mut block = Arc::new(ChangesBlock::from_bytes(bytes)?); - block.ensure_changes(&self.arena)?; - } - - Ok(()) - } - /// Flush the cached change to kv_store pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) { let mut inner = self.inner.lock(); @@ -911,9 +895,10 @@ mod mut_inner_kv { } // Found the block - block - .ensure_changes(&self.arena) - .expect("Parse block error"); + if let Err(err) = block.ensure_changes(&self.arena) { + warn!(block_id = ?id, ?err, "failed to parse change block"); + return None; + } let index = block.get_change_index_by_lamport_lte(idlp.lamport)?; return Some(BlockChangeRef { change_index: index, @@ -994,7 +979,18 @@ mod mut_inner_kv { for (id, bytes) in iter { let mut block = ChangesBlockBytes::new(bytes.clone()); - let (lamport_start, _lamport_end) = block.lamport_range(); + let (lamport_start, _lamport_end) = match block.lamport_range() { + Ok(range) => range, + Err(err) => { + let block_id = ID::from_bytes(&id); + warn!( + ?block_id, + ?err, + "failed to decode external change block range" + ); + continue; + } + }; if lamport_start <= idlp.lamport { break 'block_scan (id, bytes); } @@ -1004,13 +1000,17 @@ mod mut_inner_kv { }; let block_id = ID::from_bytes(&id); - let mut block = Arc::new( - ChangesBlock::from_bytes(bytes) - .expect("validated external change block should decode"), - ); - block - .ensure_changes(&self.arena) - .expect("Parse block error"); + let mut block = match ChangesBlock::from_bytes(bytes) { + Ok(block) => Arc::new(block), + Err(err) => { + warn!(?block_id, ?err, "failed to decode external change block"); + return None; + } + }; + if let Err(err) = block.ensure_changes(&self.arena) { + warn!(?block_id, ?err, "failed to parse external change block"); + return None; + } inner.mem_parsed_kv.insert(block_id, block.clone()); let index = block.get_change_index_by_lamport_lte(idlp.lamport)?; Some(BlockChangeRef { @@ -1120,9 +1120,10 @@ mod mut_inner_kv { let mut inner = self.inner.lock(); if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() { if block.peer == id.peer && block.counter_range.1 > id.counter { - block - .ensure_changes(&self.arena) - .expect("Parse block error"); + if let Err(err) = block.ensure_changes(&self.arena) { + warn!(block_id = ?_id, ?err, "failed to parse cached change block"); + return None; + } return Some(block.clone()); } } @@ -1144,16 +1145,22 @@ mod mut_inner_kv { let (b_id, b_bytes) = iter.next_back()?; let block_id: ID = ID::from_bytes(&b_id[..]); - let block = ChangesBlock::from_bytes(b_bytes) - .expect("validated external change block should decode"); + let block = match ChangesBlock::from_bytes(b_bytes) { + Ok(block) => block, + Err(err) => { + warn!(?block_id, ?err, "failed to decode external change block"); + return None; + } + }; if block_id.peer == id.peer && block_id.counter <= id.counter && block.counter_range.1 > id.counter { let mut arc_block = Arc::new(block); - arc_block - .ensure_changes(&self.arena) - .expect("Parse block error"); + if let Err(err) = arc_block.ensure_changes(&self.arena) { + warn!(?block_id, ?err, "failed to parse external change block"); + return None; + } inner.mem_parsed_kv.insert(block_id, arc_block.clone()); return Some(arc_block); } @@ -1195,8 +1202,13 @@ mod mut_inner_kv { continue; } - let block = ChangesBlock::from_bytes(bytes.clone()) - .expect("validated external change block should decode"); + let block = match ChangesBlock::from_bytes(bytes.clone()) { + Ok(block) => block, + Err(err) => { + warn!(?id, ?err, "failed to decode external change block"); + continue; + } + }; inner.mem_parsed_kv.insert(id, Arc::new(block)); } } @@ -1222,8 +1234,17 @@ mod mut_inner_kv { return; } - let block = ChangesBlock::from_bytes(next_back_bytes) - .expect("validated external change block should decode"); + let block = match ChangesBlock::from_bytes(next_back_bytes) { + Ok(block) => block, + Err(err) => { + warn!( + ?next_back_id, + ?err, + "failed to decode external change block" + ); + return; + } + }; inner.mem_parsed_kv.insert(next_back_id, Arc::new(block)); } } @@ -1658,11 +1679,11 @@ impl ChangesBlockBytes { bytes } - fn lamport_range(&mut self) -> (Lamport, Lamport) { + fn lamport_range(&mut self) -> LoroResult<(Lamport, Lamport)> { if let Some(header) = self.header.get() { - (header.lamports[0], *header.lamports.last().unwrap()) + Ok((header.lamports[0], *header.lamports.last().unwrap())) } else { - decode_block_range(&self.bytes).unwrap().1 + decode_block_range(&self.bytes).map(|(_, lamport_range)| lamport_range) } } diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index 1d733f29c..f18d1db8f 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -476,10 +476,18 @@ pub fn decode_block_range( )); } - let counter_start = leb128::read::unsigned(&mut bytes).unwrap() as Counter; - let counter_len = leb128::read::unsigned(&mut bytes).unwrap() as Counter; - let lamport_start = leb128::read::unsigned(&mut bytes).unwrap() as Lamport; - let lamport_len = leb128::read::unsigned(&mut bytes).unwrap() as Lamport; + let counter_start = leb128::read::unsigned(&mut bytes).map_err(|e| { + LoroError::DecodeError(format!("Failed to read counter start: {e}").into_boxed_str()) + })? as Counter; + let counter_len = leb128::read::unsigned(&mut bytes).map_err(|e| { + LoroError::DecodeError(format!("Failed to read counter length: {e}").into_boxed_str()) + })? as Counter; + let lamport_start = leb128::read::unsigned(&mut bytes).map_err(|e| { + LoroError::DecodeError(format!("Failed to read lamport start: {e}").into_boxed_str()) + })? as Lamport; + let lamport_len = leb128::read::unsigned(&mut bytes).map_err(|e| { + LoroError::DecodeError(format!("Failed to read lamport length: {e}").into_boxed_str()) + })? as Lamport; Ok(( (counter_start, counter_start + counter_len), (lamport_start, lamport_start + lamport_len),