Skip to content

Commit c8dc1fe

Browse files
authored
[ENH] Prefetch on materialize (#5951)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Prefetch blocks during `materialize_logs` based on user ids. Previously materialization could be slow when the blocks are not prefetched. - New functionality - N/A ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent edab0e1 commit c8dc1fe

File tree

2 files changed

+21
-39
lines changed

2 files changed

+21
-39
lines changed

rust/segment/src/blockfile_record.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,8 +919,7 @@ impl RecordSegmentReader<'_> {
919919
.await
920920
}
921921

922-
#[allow(dead_code)]
923-
pub(crate) async fn prefetch_user_id_to_id(&self, keys: Vec<&str>) {
922+
pub(crate) async fn prefetch_user_id_to_id(&self, keys: &[&str]) {
924923
self.user_id_to_id
925924
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
926925
.await

rust/segment/src/types.rs

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ use crate::distributed_spann::{SpannSegmentFlusher, SpannSegmentWriter};
1515

1616
use super::blockfile_metadata::{MetadataSegmentFlusher, MetadataSegmentWriter};
1717
use super::blockfile_record::{
18-
ApplyMaterializedLogError, RecordSegmentFlusher, RecordSegmentReader,
19-
RecordSegmentReaderCreationError, RecordSegmentWriter,
18+
ApplyMaterializedLogError, RecordSegmentFlusher, RecordSegmentReader, RecordSegmentWriter,
2019
};
2120
use super::distributed_hnsw::DistributedHNSWSegmentWriter;
2221

@@ -591,44 +590,28 @@ pub async fn materialize_logs(
591590
let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
592591
let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
593592
if let Some(reader) = &record_segment_reader {
593+
let mut user_ids = logs
594+
.iter()
595+
.map(|(log, _)| log.record.id.as_str())
596+
.collect::<Vec<_>>();
597+
user_ids.sort_unstable();
598+
user_ids.dedup();
594599
async {
595-
for (log_record, _) in logs.iter() {
596-
let exists = match reader
597-
.data_exists_for_user_id(log_record.record.id.as_str())
598-
.await
599-
{
600-
Ok(res) => res,
601-
Err(e) => {
602-
return Err(LogMaterializerError::RecordSegment(e));
603-
}
600+
reader.prefetch_user_id_to_id(&user_ids).await;
601+
602+
let mut existing_offset_ids = Vec::with_capacity(user_ids.len());
603+
for user_id in user_ids {
604+
if let Some(offset_id) = reader.get_offset_id_for_user_id(user_id).await? {
605+
existing_offset_ids.push(offset_id);
606+
existing_id_to_materialized.insert(
607+
user_id,
608+
MaterializedLogRecord::from_segment_offset_id(offset_id),
609+
);
604610
};
605-
if exists {
606-
match reader
607-
.get_offset_id_for_user_id(log_record.record.id.as_str())
608-
.await
609-
{
610-
Ok(Some(offset_id)) => {
611-
existing_id_to_materialized.insert(
612-
log_record.record.id.as_str(),
613-
MaterializedLogRecord::from_segment_offset_id(offset_id),
614-
);
615-
}
616-
Ok(None) => {
617-
return Err(LogMaterializerError::RecordSegment(Box::new(
618-
RecordSegmentReaderCreationError::UserRecordNotFound(format!(
619-
"not found: {}",
620-
log_record.record.id,
621-
)),
622-
)
623-
as _));
624-
}
625-
Err(e) => {
626-
return Err(LogMaterializerError::RecordSegment(e));
627-
}
628-
}
629-
}
630611
}
631-
Ok(())
612+
613+
reader.prefetch_id_to_data(&existing_offset_ids).await;
614+
Ok::<_, LogMaterializerError>(())
632615
}
633616
.instrument(Span::current())
634617
.await?;

0 commit comments

Comments
 (0)