Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/homedb/core/src/index/btree_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ pub trait BtreeIndex<K: 'static + BtreeKey, V: 'static + BtreeValue>: Send + Syn
filter: Option<Arc<dyn GetFilter<K, V>>>,
) -> Result<Option<(K, V)>, BtreeError>;

/// Return the first entry with key >= `key`, or `None` if no such entry.
/// Uses single binary search (find) per node instead of double (match_range).
/// Ideal for MVCC point reads where we seek to `(user_key, !snapshot_ts)`.
async fn seek_gte(&self, key: &K) -> Result<Option<(K, V)>, BtreeError>;

/// Fetch next batch for a previous query result handle.
async fn query_next_batch(
&self,
Expand Down
5 changes: 5 additions & 0 deletions src/homedb/core/src/index/sharded_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,11 @@ impl<K: 'static + Partitionable, V: 'static + BtreeValue> BtreeIndex<K, V> for S
shard_call!(shard, [range], get_first(range))
}

async fn seek_gte(&self, key: &K) -> Result<Option<(K, V)>, BtreeError> {
let part = self.route(key, false);
shard_call!(&self.shards[part.shard_id], [key], seek_gte(&key))
}

async fn query(
&self,
range: BtreeKeyRange<K>,
Expand Down
4 changes: 4 additions & 0 deletions src/homedb/core/src/index/unsharded_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl<K: 'static + BtreeKey, V: 'static + BtreeValue> BtreeIndex<K, V> for Unshar
self.btree.get_first(range).await
}

async fn seek_gte(&self, key: &K) -> Result<Option<(K, V)>, BtreeError> {
self.btree.seek_gte(key).await
}

async fn query(
&self,
range: BtreeKeyRange<K>,
Expand Down
35 changes: 21 additions & 14 deletions src/homedb/core/src/mvcc/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,26 @@ impl IndexOps for MvccOps {
}

async fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
// all_versions_for covers all seq_ids newest-first; batch_size=1 returns the newest.
let scan_range = MvccKey::all_versions_for(key, &self.key_spec);
let handle = self
// seek_gte with (user_key, seq_id=MAX) finds the newest version of user_key
// using single binary search per node — faster than get_first (double binary search).
let seek_key = MvccKey::new(DbKey::new(key.clone(), &self.key_spec), u64::MAX);
let result = self
.btree
.query(scan_range, 1, None, false)
.seek_gte(&seek_key)
.await
.map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?;

match handle.results().first() {
match result {
None => Ok(None),
Some((_, mvcc_val)) => {
Some((found_key, mvcc_val)) => {
// Verify the result is for our user key (seek_gte may overshoot)
if found_key.inner.as_bytes() != key.as_slice() {
return Ok(None);
}
if mvcc_val.is_tombstone() {
Ok(None)
} else {
Ok(mvcc_val.inner().map(|v| v.clone().into_vec()))
Ok(mvcc_val.into_inner().map(|v| v.into_vec()))
}
}
}
Expand Down Expand Up @@ -275,23 +280,25 @@ impl IndexOps for MvccOps {

async fn snapshot_get(&self, key: Vec<u8>, ts: u64) -> Result<Option<Vec<u8>>> {
// A snapshot at ts sees versions with seq_id < ts (exclusive).
// Seeking to inv_seq = !(ts-1) finds the newest version with seq_id ≤ ts-1.
// Seeking to (user_key, ts-1) finds the newest version with seq_id ≤ ts-1.
// ts=0 means nothing was committed before snapshot creation — return None immediately.
if ts == 0 {
return Ok(None);
}

let range_start = MvccKey::newest_since(key.clone(), &self.key_spec, ts - 1);
let range_end = MvccKey::oldest(key, &self.key_spec);
let range = BtreeKeyRange::new(range_start, true, range_end, true);
let seek_key = MvccKey::newest_since(key.clone(), &self.key_spec, ts - 1);

match self.btree.get_first(range, None).await.map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))? {
match self.btree.seek_gte(&seek_key).await.map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))? {
None => Ok(None),
Some((_, mvcc_val)) => {
Some((found_key, mvcc_val)) => {
// Verify the result is for our user key (seek_gte may overshoot)
if found_key.inner.as_bytes() != key.as_slice() {
return Ok(None);
}
if mvcc_val.is_tombstone() {
Ok(None)
} else {
Ok(mvcc_val.inner().map(|v| v.clone().into_vec()))
Ok(mvcc_val.into_inner().map(|v| v.into_vec()))
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/homestore/index/btree/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,23 @@ where
result
}

/// Return the first entry with key >= `key`, or `None`.
///
/// Uses single binary search (find) per node — faster than get_first which
/// uses match_range (double binary search) at every level.
#[tracing::instrument(skip(self, key),
fields(op_id=op_counter(), btree=%self.config.btree_name))]
pub async fn seek_gte(&self, key: &K) -> Result<Option<(K, V)>, BtreeError> {
tracing::debug!("Starting seek_gte operation");
let result = self.seek_gte_internal(key).await;
match &result {
Ok(Some((k, _))) => tracing::debug!(key = ?k, "Found key >= target"),
Ok(None) => tracing::debug!("No key >= target"),
Err(_) => tracing::warn!("seek_gte failed"),
}
result
}

/// Sweep query - returns multiple key-value pairs in range
///
///
Expand Down
93 changes: 93 additions & 0 deletions src/homestore/index/btree/detail/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,99 @@ where
}
}

//================================================================================
// SEEK_GTE: Single binary search per node (faster than get_first for point reads)
//================================================================================

/// Like get() but returns the first entry >= key instead of exact match only.
/// Uses single binary search per node (find) instead of double (match_range).
pub(in super::super) async fn seek_gte_internal(
&self,
key: &K,
) -> Result<Option<(K, V)>, BtreeError> {
let _tree_lock = self.lock_tree_shared().await;
let root_id = self.root_node_id();
let root = self.read_and_lock_node(root_id, LockType::Read).await?;

self.seek_gte_walk(root, key).await
}

/// Recursive seek_gte traversal
///
/// At interior nodes: uses find() (single binary search) to route to the correct child.
/// If the child's subtree has no entry >= key, falls back to the next child's leftmost entry.
#[cfg_attr(feature = "async_code", async_recursion::async_recursion)]
async fn seek_gte_walk(&self, node: Node, key: &K) -> Result<Option<(K, V)>, BtreeError> {
if node.is_leaf() {
return self.seek_gte_in_leaf(&node, key).await;
}

// Interior node: single binary search to find target child
let nentries = node.total_entries();
let (_, idx) = node.find::<K, V>(key);

// Save next child ID before dropping parent (for cross-leaf fallback)
let next_idx = idx + 1;
let next_child_id = if next_idx <= nentries && (next_idx < nentries || node.has_valid_edge()) {
Some(node.get_nth_child_id::<K>(next_idx))
} else {
None
};

let child_id = node.get_nth_child_id::<K>(idx);
let child = self.read_and_lock_node(child_id, LockType::Read).await?;
drop(node); // Release parent lock (standard lock coupling)

let result = self.seek_gte_walk(child, key).await?;
if result.is_some() {
return Ok(result);
}

// Cross-leaf fallback: key was at boundary, entry is in next child's subtree
if let Some(next_id) = next_child_id {
let next_child = self.read_and_lock_node(next_id, LockType::Read).await?;
return self.leftmost_entry(next_child).await;
}

Ok(None)
}

/// Read first entry >= key from leaf node
async fn seek_gte_in_leaf(&self, node: &Node, key: &K) -> Result<Option<(K, V)>, BtreeError> {
debug_assert!(node.is_leaf());

let nentries = node.total_entries();
let (found, idx) = node.find::<K, V>(key);

if found || idx < nentries {
let k = node.get_nth_key::<K, V>(idx, /* copy= */ true);
let v = node.get_nth_value::<K, V>(idx, /* copy= */ true)
.resolve(self.storage.as_ref(), true).await?;
Ok(Some((k, v)))
} else {
Ok(None) // Key > all entries in this leaf; caller tries next sibling
}
}

/// Descend to the leftmost entry of a subtree
#[cfg_attr(feature = "async_code", async_recursion::async_recursion)]
async fn leftmost_entry(&self, node: Node) -> Result<Option<(K, V)>, BtreeError> {
if node.is_leaf() {
if node.total_entries() == 0 {
return Ok(None);
}
let k = node.get_nth_key::<K, V>(0, /* copy= */ true);
let v = node.get_nth_value::<K, V>(0, /* copy= */ true)
.resolve(self.storage.as_ref(), true).await?;
return Ok(Some((k, v)));
}

let child_id = node.get_nth_child_id::<K>(0);
let child = self.read_and_lock_node(child_id, LockType::Read).await?;
drop(node);
self.leftmost_entry(child).await
}

//================================================================================
// QUERY Implementation (Sweep Query with Sibling Links)
//================================================================================
Expand Down