Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/faster-text-imports.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"loro-crdt": patch
"loro-crdt-map": patch
---

Improve text insert and snapshot import performance by avoiding duplicate text boundary validation and skipping eager imported change block parsing.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/loro-internal/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ mod tests {
ContainerID::new_mergeable(&top_root, "profile", ContainerType::Map);
let child = ContainerID::new_normal(
loro_common::ID::new(1, 0),
ContainerType::Counter,
ContainerType::List,
);

let mergeable_parent_for_resolver = mergeable_parent.clone();
Expand Down
49 changes: 38 additions & 11 deletions crates/loro-internal/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,12 @@ impl TextHandler {
return Ok(());
};

if pos > self.len(pos_type) {
let len = self.len(pos_type);
if pos > len {
return Ok(());
}

if self.all_text_positions_are_boundaries(pos_type, len) {
return Ok(());
}

Expand All @@ -1586,6 +1591,15 @@ impl TextHandler {
Ok(())
}

fn all_text_positions_are_boundaries(&self, pos_type: PosType, len: usize) -> bool {
match pos_type {
PosType::Bytes => len == self.len_unicode(),
PosType::Utf16 => len == self.len_unicode(),
PosType::Event if cfg!(feature = "wasm") => len == self.len_unicode(),
_ => false,
}
}

pub fn diagnose(&self) {
match &self.inner {
MaybeDetached::Detached(t) => {
Expand Down Expand Up @@ -1851,18 +1865,18 @@ impl TextHandler {
///
/// This method requires auto_commit to be enabled.
pub fn insert(&self, pos: usize, s: &str, pos_type: PosType) -> LoroResult<()> {
let len = self.len(pos_type);
if pos > len {
return Err(LoroError::OutOfBound {
pos,
len,
info: format!("Position: {}:{}", file!(), line!()).into_boxed_str(),
});
}
self.validate_text_boundary(pos, pos_type)?;

match &self.inner {
MaybeDetached::Detached(t) => {
let len = self.len(pos_type);
if pos > len {
return Err(LoroError::OutOfBound {
pos,
len,
info: format!("Position: {}:{}", file!(), line!()).into_boxed_str(),
});
}
self.validate_text_boundary(pos, pos_type)?;

let mut t = t.lock();
let (index, _) = t
.value
Expand All @@ -1876,6 +1890,19 @@ impl TextHandler {
Ok(())
}
MaybeDetached::Attached(a) => {
if s.is_empty() {
let len = self.len(pos_type);
if pos > len {
return Err(LoroError::OutOfBound {
pos,
len,
info: format!("Position: {}:{}", file!(), line!()).into_boxed_str(),
});
}
self.validate_text_boundary(pos, pos_type)?;
return Ok(());
}

a.with_txn(|txn| self.insert_with_txn(txn, pos, s, pos_type))
}
}
Expand Down
131 changes: 76 additions & 55 deletions crates/loro-internal/src/oplog/change_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,11 @@ impl ChangeStore {
pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
let mut inner = self.inner.lock();
for (_, block) in inner.mem_parsed_kv.iter_mut() {
block
.ensure_changes(&self.arena)
.expect("Parse block error");
for (id, block) in inner.mem_parsed_kv.iter_mut() {
if let Err(err) = block.ensure_changes(&self.arena) {
warn!(block_id = ?id, ?err, "failed to parse change block");
continue;
}
for c in block.content.try_changes().unwrap() {
f(c);
}
Expand Down Expand Up @@ -368,9 +369,10 @@ impl ChangeStore {
return None;
}

block
.ensure_changes(&self.arena)
.expect("Parse block error");
if let Err(err) = block.ensure_changes(&self.arena) {
warn!(block_id = ?_id, ?err, "failed to parse change block");
return None;
}
let changes = block.content.try_changes().unwrap();
let start;
let end;
Expand Down Expand Up @@ -456,9 +458,10 @@ impl ChangeStore {
return None;
}

block
.ensure_changes(&self.arena)
.expect("Parse block error");
if let Err(err) = block.ensure_changes(&self.arena) {
warn!(block_id = ?_id, ?err, "failed to parse change block");
return None;
}
Some(block.clone())
})
// TODO: PERF avoid alloc
Expand Down Expand Up @@ -639,7 +642,6 @@ mod mut_external_kv {
.import_all(bytes)
.map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
drop(kv_store);
self.validate_imported_change_blocks()?;
let vv_bytes = self.external_kv.lock().get(VV_KEY).unwrap_or_default();
let vv = VersionVector::decode(&vv_bytes)
.map_err(|_| LoroError::DecodeDataCorruptionError)?;
Expand Down Expand Up @@ -720,24 +722,6 @@ mod mut_external_kv {
})
}

fn validate_imported_change_blocks(&self) -> LoroResult<()> {
let blocks: Vec<(ID, Bytes)> = {
let kv_store = self.external_kv.lock();
kv_store
.scan(Bound::Unbounded, Bound::Unbounded)
.filter(|(id, _)| id.len() == 12)
.map(|(id, bytes)| (ID::from_bytes(&id), bytes))
.collect()
};

for (_id, bytes) in blocks {
let mut block = Arc::new(ChangesBlock::from_bytes(bytes)?);
block.ensure_changes(&self.arena)?;
}

Ok(())
}

/// Flush the cached change to kv_store
pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
let mut inner = self.inner.lock();
Expand Down Expand Up @@ -911,9 +895,10 @@ mod mut_inner_kv {
}

// Found the block
block
.ensure_changes(&self.arena)
.expect("Parse block error");
if let Err(err) = block.ensure_changes(&self.arena) {
warn!(block_id = ?id, ?err, "failed to parse change block");
return None;
}
let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
return Some(BlockChangeRef {
change_index: index,
Expand Down Expand Up @@ -994,7 +979,18 @@ mod mut_inner_kv {

for (id, bytes) in iter {
let mut block = ChangesBlockBytes::new(bytes.clone());
let (lamport_start, _lamport_end) = block.lamport_range();
let (lamport_start, _lamport_end) = match block.lamport_range() {
Ok(range) => range,
Err(err) => {
let block_id = ID::from_bytes(&id);
warn!(
?block_id,
?err,
"failed to decode external change block range"
);
continue;
}
};
if lamport_start <= idlp.lamport {
break 'block_scan (id, bytes);
}
Expand All @@ -1004,13 +1000,17 @@ mod mut_inner_kv {
};

let block_id = ID::from_bytes(&id);
let mut block = Arc::new(
ChangesBlock::from_bytes(bytes)
.expect("validated external change block should decode"),
);
block
.ensure_changes(&self.arena)
.expect("Parse block error");
let mut block = match ChangesBlock::from_bytes(bytes) {
Ok(block) => Arc::new(block),
Err(err) => {
warn!(?block_id, ?err, "failed to decode external change block");
return None;
}
};
if let Err(err) = block.ensure_changes(&self.arena) {
warn!(?block_id, ?err, "failed to parse external change block");
return None;
}
inner.mem_parsed_kv.insert(block_id, block.clone());
let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
Some(BlockChangeRef {
Expand Down Expand Up @@ -1120,9 +1120,10 @@ mod mut_inner_kv {
let mut inner = self.inner.lock();
if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
if block.peer == id.peer && block.counter_range.1 > id.counter {
block
.ensure_changes(&self.arena)
.expect("Parse block error");
if let Err(err) = block.ensure_changes(&self.arena) {
warn!(block_id = ?_id, ?err, "failed to parse cached change block");
return None;
}
return Some(block.clone());
}
}
Expand All @@ -1144,16 +1145,22 @@ mod mut_inner_kv {

let (b_id, b_bytes) = iter.next_back()?;
let block_id: ID = ID::from_bytes(&b_id[..]);
let block = ChangesBlock::from_bytes(b_bytes)
.expect("validated external change block should decode");
let block = match ChangesBlock::from_bytes(b_bytes) {
Ok(block) => block,
Err(err) => {
warn!(?block_id, ?err, "failed to decode external change block");
return None;
}
};
if block_id.peer == id.peer
&& block_id.counter <= id.counter
&& block.counter_range.1 > id.counter
{
let mut arc_block = Arc::new(block);
arc_block
.ensure_changes(&self.arena)
.expect("Parse block error");
if let Err(err) = arc_block.ensure_changes(&self.arena) {
warn!(?block_id, ?err, "failed to parse external change block");
return None;
}
inner.mem_parsed_kv.insert(block_id, arc_block.clone());
return Some(arc_block);
}
Expand Down Expand Up @@ -1195,8 +1202,13 @@ mod mut_inner_kv {
continue;
}

let block = ChangesBlock::from_bytes(bytes.clone())
.expect("validated external change block should decode");
let block = match ChangesBlock::from_bytes(bytes.clone()) {
Ok(block) => block,
Err(err) => {
warn!(?id, ?err, "failed to decode external change block");
continue;
}
};
inner.mem_parsed_kv.insert(id, Arc::new(block));
}
}
Expand All @@ -1222,8 +1234,17 @@ mod mut_inner_kv {
return;
}

let block = ChangesBlock::from_bytes(next_back_bytes)
.expect("validated external change block should decode");
let block = match ChangesBlock::from_bytes(next_back_bytes) {
Ok(block) => block,
Err(err) => {
warn!(
?next_back_id,
?err,
"failed to decode external change block"
);
return;
}
};
inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
}
}
Expand Down Expand Up @@ -1658,11 +1679,11 @@ impl ChangesBlockBytes {
bytes
}

fn lamport_range(&mut self) -> (Lamport, Lamport) {
fn lamport_range(&mut self) -> LoroResult<(Lamport, Lamport)> {
if let Some(header) = self.header.get() {
(header.lamports[0], *header.lamports.last().unwrap())
Ok((header.lamports[0], *header.lamports.last().unwrap()))
} else {
decode_block_range(&self.bytes).unwrap().1
decode_block_range(&self.bytes).map(|(_, lamport_range)| lamport_range)
}
}

Expand Down
16 changes: 12 additions & 4 deletions crates/loro-internal/src/oplog/change_store/block_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,18 @@ pub fn decode_block_range(
));
}

let counter_start = leb128::read::unsigned(&mut bytes).unwrap() as Counter;
let counter_len = leb128::read::unsigned(&mut bytes).unwrap() as Counter;
let lamport_start = leb128::read::unsigned(&mut bytes).unwrap() as Lamport;
let lamport_len = leb128::read::unsigned(&mut bytes).unwrap() as Lamport;
let counter_start = leb128::read::unsigned(&mut bytes).map_err(|e| {
LoroError::DecodeError(format!("Failed to read counter start: {e}").into_boxed_str())
})? as Counter;
let counter_len = leb128::read::unsigned(&mut bytes).map_err(|e| {
LoroError::DecodeError(format!("Failed to read counter length: {e}").into_boxed_str())
})? as Counter;
let lamport_start = leb128::read::unsigned(&mut bytes).map_err(|e| {
LoroError::DecodeError(format!("Failed to read lamport start: {e}").into_boxed_str())
})? as Lamport;
let lamport_len = leb128::read::unsigned(&mut bytes).map_err(|e| {
LoroError::DecodeError(format!("Failed to read lamport length: {e}").into_boxed_str())
})? as Lamport;
Ok((
(counter_start, counter_start + counter_len),
(lamport_start, lamport_start + lamport_len),
Expand Down
Loading