Skip to content
Open
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions forester-utils/src/address_staging_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl AddressStagingTree {
low_element_next_values: &[[u8; 32]],
low_element_indices: &[u64],
low_element_next_indices: &[u64],
low_element_proofs: &[Vec<[u8; 32]>],
low_element_proofs: &[[[u8; 32]; HEIGHT]],
leaves_hashchain: [u8; 32],
zkp_batch_size: usize,
epoch: u64,
Expand All @@ -145,15 +145,12 @@ impl AddressStagingTree {
let inputs = get_batch_address_append_circuit_inputs::<HEIGHT>(
next_index,
old_root,
low_element_values.to_vec(),
low_element_next_values.to_vec(),
low_element_indices.iter().map(|v| *v as usize).collect(),
low_element_next_indices
.iter()
.map(|v| *v as usize)
.collect(),
low_element_proofs.to_vec(),
addresses.to_vec(),
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
addresses,
&mut self.sparse_tree,
leaves_hashchain,
zkp_batch_size,
Expand Down
40 changes: 22 additions & 18 deletions forester/src/forester_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,20 +670,22 @@ fn parse_tree_status(
let fullness = next_index as f64 / capacity as f64 * 100.0;

let (queue_len, queue_cap) = queue_account
.map(|acc| {
unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) }
.ok()
.map(|hs| {
.map(
|acc| match unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) } {
Ok(hs) => {
let len = hs
.iter()
.filter(|(_, cell)| cell.sequence_number.is_none())
.count() as u64;
let cap = hs.get_capacity() as u64;
(len, cap)
})
.unwrap_or((0, 0))
})
.map(|(l, c)| (Some(l), Some(c)))
(Some(len), Some(cap))
}
Err(error) => {
warn!(?error, "Failed to parse StateV1 queue hash set");
(None, None)
}
},
)
.unwrap_or((None, None));

(
Expand Down Expand Up @@ -725,20 +727,22 @@ fn parse_tree_status(
let fullness = next_index as f64 / capacity as f64 * 100.0;

let (queue_len, queue_cap) = queue_account
.map(|acc| {
unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) }
.ok()
.map(|hs| {
.map(
|acc| match unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) } {
Ok(hs) => {
let len = hs
.iter()
.filter(|(_, cell)| cell.sequence_number.is_none())
.count() as u64;
let cap = hs.get_capacity() as u64;
(len, cap)
})
.unwrap_or((0, 0))
})
.map(|(l, c)| (Some(l), Some(c)))
(Some(len), Some(cap))
}
Err(error) => {
warn!(?error, "Failed to parse AddressV1 queue hash set");
(None, None)
}
},
)
.unwrap_or((None, None));

(
Expand Down
123 changes: 103 additions & 20 deletions forester/src/processor/v2/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use light_client::{
indexer::{AddressQueueData, Indexer, QueueElementsV2Options, StateQueueData},
rpc::Rpc,
};
use light_hasher::hash_chain::create_hash_chain_from_slice;

use crate::processor::v2::{common::clamp_to_u16, BatchContext};

Expand All @@ -22,6 +23,17 @@ pub(crate) fn lock_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> Mu
}
}

#[derive(Debug, Clone)]
pub struct AddressBatchSnapshot<const HEIGHT: usize> {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
pub low_element_proofs: Vec<[[u8; 32]; HEIGHT]>,
pub leaves_hashchain: [u8; 32],
}

pub async fn fetch_zkp_batch_size<R: Rpc>(context: &BatchContext<R>) -> crate::Result<u64> {
let rpc = context.rpc_pool.get_connection().await?;
let mut account = rpc
Expand Down Expand Up @@ -474,20 +486,96 @@ impl StreamingAddressQueue {
}
}

pub fn get_batch_data(&self, start: usize, end: usize) -> Option<BatchDataSlice> {
pub fn get_batch_snapshot<const HEIGHT: usize>(
&self,
start: usize,
end: usize,
hashchain_idx: usize,
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
let available = self.wait_for_batch(end);
if start >= available {
return None;
if available < end || start >= end {
return Ok(None);
}
let actual_end = end.min(available);
let data = lock_recover(&self.data, "streaming_address_queue.data");
Some(BatchDataSlice {
addresses: data.addresses[start..actual_end].to_vec(),
low_element_values: data.low_element_values[start..actual_end].to_vec(),
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
})
let Some(addresses) = data.addresses.get(start..end).map(|slice| slice.to_vec()) else {
return Ok(None);
};
if addresses.is_empty() {
return Ok(None);
}
let expected_len = addresses.len();
let Some(low_element_values) = data
.low_element_values
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
let Some(low_element_next_values) = data
.low_element_next_values
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
let Some(low_element_indices) = data
.low_element_indices
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
let Some(low_element_next_indices) = data
.low_element_next_indices
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
if [
low_element_values.len(),
low_element_next_values.len(),
low_element_indices.len(),
low_element_next_indices.len(),
]
.iter()
.any(|&len| len != expected_len)
{
return Ok(None);
}
let low_element_proofs = match data.reconstruct_proofs::<HEIGHT>(start..end) {
Ok(proofs) if proofs.len() == expected_len => proofs,
Ok(_) | Err(_) => return Ok(None),
};

let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
Some(hashchain) => hashchain,
None => {
tracing::debug!(
"Missing leaves_hash_chain for batch {} (available: {}), deriving from addresses",
hashchain_idx,
data.leaves_hash_chains.len()
);
create_hash_chain_from_slice(&addresses).map_err(|error| {
anyhow!(
"Failed to derive leaves_hash_chain for batch {} from {} addresses: {}",
hashchain_idx,
addresses.len(),
error
)
})?
}
};

Ok(Some(AddressBatchSnapshot {
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
addresses,
leaves_hashchain,
}))
}

pub fn into_data(self) -> AddressQueueData {
Expand Down Expand Up @@ -522,6 +610,10 @@ impl StreamingAddressQueue {
lock_recover(&self.data, "streaming_address_queue.data").start_index
}

pub fn tree_next_insertion_index(&self) -> u64 {
lock_recover(&self.data, "streaming_address_queue.data").tree_next_insertion_index
}

pub fn subtrees(&self) -> Vec<[u8; 32]> {
lock_recover(&self.data, "streaming_address_queue.data")
.subtrees
Expand Down Expand Up @@ -553,15 +645,6 @@ impl StreamingAddressQueue {
}
}

#[derive(Debug, Clone)]
pub struct BatchDataSlice {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
}

pub async fn fetch_streaming_address_batches<R: Rpc + 'static>(
context: &BatchContext<R>,
total_elements: u64,
Expand Down
6 changes: 3 additions & 3 deletions forester/src/processor/v2/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
}

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}
Comment on lines 134 to 137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Extract worker-pool bootstrap into a single helper.

The same fallible initialization block now exists in three paths. A small ensure_worker_pool() would keep future startup logging/retry behavior consistent and avoid one path drifting the next time worker bootstrap changes.

Also applies to: 534-537, 563-566

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v2/processor.rs` around lines 134 - 137, Create a
single helper method (e.g., ensure_worker_pool) on the processor that
encapsulates the fallible initialization currently duplicated: check
self.worker_pool.is_none(), call
spawn_proof_workers(&self.context.prover_config)?, and set self.worker_pool =
Some(WorkerPool { job_tx }); then replace the three duplicated blocks with calls
to ensure_worker_pool(). Reference the existing symbols spawn_proof_workers,
WorkerPool, self.worker_pool, and self.context.prover_config so the helper
performs the same behavior and returns the same error propagation as the
original code.


Expand Down Expand Up @@ -532,7 +532,7 @@ where
((queue_size / self.zkp_batch_size) as usize).min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down Expand Up @@ -561,7 +561,7 @@ where
let max_batches = max_batches.min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down
20 changes: 11 additions & 9 deletions forester/src/processor/v2/proof_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,27 @@ struct ProofClients {
}

impl ProofClients {
fn new(config: &ProverConfig) -> Self {
Self {
fn new(config: &ProverConfig) -> crate::Result<Self> {
Ok(Self {
append_client: ProofClient::with_config(
config.append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
nullify_client: ProofClient::with_config(
config.update_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
address_append_client: ProofClient::with_config(
config.address_append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
}
)?,
})
}

fn get_client(&self, input: &ProofInput) -> &ProofClient {
Expand All @@ -164,11 +164,13 @@ impl ProofClients {
}
}

pub fn spawn_proof_workers(config: &ProverConfig) -> async_channel::Sender<ProofJob> {
pub fn spawn_proof_workers(
config: &ProverConfig,
) -> crate::Result<async_channel::Sender<ProofJob>> {
let (job_tx, job_rx) = async_channel::bounded::<ProofJob>(256);
let clients = Arc::new(ProofClients::new(config));
let clients = Arc::new(ProofClients::new(config)?);
tokio::spawn(async move { run_proof_pipeline(job_rx, clients).await });
job_tx
Ok(job_tx)
}

async fn run_proof_pipeline(
Expand Down
Loading
Loading