diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ba3c07b9..b8fc2a334 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Introduce `SyncChainMmr` RPC endpoint to sync chain MMR deltas within specified block ranges ([#1591](https://github.com/0xMiden/node/issues/1591)). - Fixed `TransactionHeader` serialization for row insertion on database & fixed transaction cursor on retrievals ([#1701](https://github.com/0xMiden/node/issues/1701)). - Added KMS signing support in validator ([#1677](https://github.com/0xMiden/node/pull/1677)). +- Added finality field for `SyncChainMmr` requests ([#1725](https://github.com/0xMiden/miden-node/pull/1725)). - Added per-IP gRPC rate limiting across services as well as global concurrent connection limit ([#1746](https://github.com/0xMiden/node/issues/1746)). ### Changes diff --git a/Cargo.lock b/Cargo.lock index 16ea17404..5bb1a9464 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3157,7 +3157,6 @@ dependencies = [ "diesel", "diesel_migrations", "fs-err", - "futures", "hex", "indexmap", "libsqlite3-sys", diff --git a/bin/node/src/commands/bundled.rs b/bin/node/src/commands/bundled.rs index 2db11c065..d2ad1689f 100644 --- a/bin/node/src/commands/bundled.rs +++ b/bin/node/src/commands/bundled.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; +use std::num::NonZeroUsize; use std::path::PathBuf; use anyhow::Context; use miden_node_block_producer::BlockProducer; use miden_node_rpc::Rpc; -use miden_node_store::Store; +use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, Store}; use miden_node_utils::clap::{GrpcOptionsExternal, StorageOptions}; use miden_node_utils::grpc::UrlExt; use miden_node_validator::{Validator, ValidatorSigner}; @@ -82,6 +83,14 @@ pub enum BundledCommand { #[arg(long = "enable-otel", default_value_t = false, env = ENV_ENABLE_OTEL, value_name = "BOOL")] enable_otel: bool, + /// Maximum number of concurrent block proofs to be scheduled. + #[arg( + long = "max-concurrent-proofs", + default_value_t = DEFAULT_MAX_CONCURRENT_PROOFS, + value_name = "NUM" + )] + max_concurrent_proofs: NonZeroUsize, + #[command(flatten)] grpc_options: GrpcOptionsExternal, @@ -124,6 +133,7 @@ impl BundledCommand { validator, enable_otel: _, grpc_options, + max_concurrent_proofs, storage_options, } => { Self::start( @@ -134,6 +144,7 @@ impl BundledCommand { ntx_builder, validator, grpc_options, + max_concurrent_proofs, storage_options, ) .await @@ -150,6 +161,7 @@ impl BundledCommand { ntx_builder: NtxBuilderConfig, validator: BundledValidatorConfig, grpc_options: GrpcOptionsExternal, + max_concurrent_proofs: NonZeroUsize, storage_options: StorageOptions, ) -> anyhow::Result<()> { // Start listening on all gRPC urls so that inter-component connections can be created @@ -208,6 +220,7 @@ impl BundledCommand { data_directory: data_directory_clone, block_prover_url, grpc_options: grpc_options.into(), + max_concurrent_proofs, storage_options, } .serve() diff --git a/bin/node/src/commands/store.rs b/bin/node/src/commands/store.rs index 53ea3dae9..edc5fe252 100644 --- a/bin/node/src/commands/store.rs +++ b/bin/node/src/commands/store.rs @@ -1,8 +1,9 @@ +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use anyhow::Context; -use miden_node_store::Store; use miden_node_store::genesis::GenesisBlock; +use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, Store}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fs::ensure_empty_directory; use miden_node_utils::grpc::UrlExt; @@ -65,6 +66,13 @@ pub enum StoreCommand { #[arg(long = "enable-otel", default_value_t = false, env = ENV_ENABLE_OTEL, value_name = "BOOL")] enable_otel: bool, + /// Maximum number of concurrent block proofs to be scheduled. + #[arg( + long = "max-concurrent-proofs", + default_value_t = DEFAULT_MAX_CONCURRENT_PROOFS, + value_name = "NUM" + )] + max_concurrent_proofs: NonZeroUsize, #[command(flatten)] grpc_options: GrpcOptionsInternal, @@ -89,6 +97,7 @@ impl StoreCommand { data_directory, enable_otel: _, grpc_options, + max_concurrent_proofs, storage_options, } => { Self::start( @@ -98,6 +107,7 @@ impl StoreCommand { block_prover_url, data_directory, grpc_options, + max_concurrent_proofs, storage_options, ) .await @@ -113,6 +123,7 @@ impl StoreCommand { } } + #[expect(clippy::too_many_arguments)] async fn start( rpc_url: Url, ntx_builder_url: Url, @@ -120,6 +131,7 @@ impl StoreCommand { block_prover_url: Option, data_directory: PathBuf, grpc_options: GrpcOptionsInternal, + max_concurrent_proofs: NonZeroUsize, storage_options: StorageOptions, ) -> anyhow::Result<()> { let rpc_listener = rpc_url @@ -150,6 +162,7 @@ impl StoreCommand { block_producer_listener, data_directory, grpc_options, + max_concurrent_proofs, storage_options, } .serve() diff --git a/bin/stress-test/src/seeding/mod.rs b/bin/stress-test/src/seeding/mod.rs index f6ba16510..e1b3c2b17 100644 --- a/bin/stress-test/src/seeding/mod.rs +++ b/bin/stress-test/src/seeding/mod.rs @@ -93,7 +93,7 @@ pub async fn seed_store( let faucet = create_faucet(); let fee_params = FeeParameters::new(faucet.id(), 0).unwrap(); let signer = EcdsaSecretKey::new(); - let genesis_state = GenesisState::new(vec![faucet.clone()], fee_params, 1, 1, signer); + let genesis_state = GenesisState::new(vec![faucet.clone()], fee_params, 1, 1, signer.clone()); let genesis_block = genesis_state .clone() .into_block() @@ -118,6 +118,7 @@ pub async fn seed_store( &store_client, data_directory, accounts_filepath, + &signer, ) .await; @@ -129,6 +130,7 @@ pub async fn seed_store( /// /// The first transaction in each batch sends assets from the faucet to 255 accounts. /// The rest of the transactions consume the notes created by the faucet in the previous block. +#[expect(clippy::too_many_arguments)] async fn generate_blocks( num_accounts: usize, public_accounts_percentage: u8, @@ -137,6 +139,7 @@ async fn generate_blocks( store_client: &StoreClient, data_directory: DataDirectory, accounts_filepath: PathBuf, + signer: &EcdsaSecretKey, ) -> SeedingMetrics { // Each block is composed of [`BATCHES_PER_BLOCK`] batches, and each batch is composed of // [`TRANSACTIONS_PER_BATCH`] txs. The first note of the block is always a send assets tx @@ -215,7 +218,8 @@ async fn generate_blocks( let block_inputs = get_block_inputs(store_client, &batches, &mut metrics).await; // update blocks - prev_block_header = apply_block(batches, block_inputs, store_client, &mut metrics).await; + prev_block_header = + apply_block(batches, block_inputs, store_client, &mut metrics, signer).await; if current_anchor_header.block_epoch() != prev_block_header.block_epoch() { current_anchor_header = prev_block_header.clone(); } @@ -250,11 +254,12 @@ async fn apply_block( block_inputs: BlockInputs, store_client: &StoreClient, metrics: &mut SeedingMetrics, + signer: &EcdsaSecretKey, ) -> BlockHeader { let proposed_block = ProposedBlock::new(block_inputs, batches).unwrap(); let (header, body) = proposed_block.clone().into_header_and_body().unwrap(); let block_size: usize = header.to_bytes().len() + body.to_bytes().len(); - let signature = EcdsaSecretKey::new().sign(header.commitment()); + let signature = signer.sign(header.commitment()); // SAFETY: The header, body, and signature are known to correspond to each other. let signed_block = SignedBlock::new_unchecked(header, body, signature); let ordered_batches = proposed_block.batches().clone(); @@ -555,6 +560,7 @@ pub async fn start_store( block_producer_listener, data_directory: dir, grpc_options: GrpcOptionsInternal::bench(), + max_concurrent_proofs: miden_node_store::DEFAULT_MAX_CONCURRENT_PROOFS, storage_options: StorageOptions::bench(), } .serve() diff --git a/bin/stress-test/src/store/mod.rs b/bin/stress-test/src/store/mod.rs index f1ea6e0f7..2b5759a88 100644 --- a/bin/stress-test/src/store/mod.rs +++ b/bin/stress-test/src/store/mod.rs @@ -470,6 +470,7 @@ async fn sync_chain_mmr( ) -> SyncChainMmrRun { let sync_request = proto::rpc::SyncChainMmrRequest { block_range: Some(proto::rpc::BlockRange { block_from, block_to: Some(block_to) }), + finality: proto::rpc::Finality::Committed.into(), }; let start = Instant::now(); diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index 3e5da2025..dedf74b84 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -2,7 +2,7 @@ use std::num::NonZeroUsize; use std::time::Duration; use miden_node_proto::generated::block_producer::api_client as block_producer_client; -use miden_node_store::{GenesisState, Store}; +use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, GenesisState, Store}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fee::test_fee_params; use miden_node_validator::{Validator, ValidatorSigner}; @@ -159,6 +159,7 @@ async fn start_store( block_prover_url: None, data_directory: dir, grpc_options: GrpcOptionsInternal::bench(), + max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, storage_options: StorageOptions::bench(), } .serve() diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index 89b7a23c4..5f190dfdf 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -7,8 +7,8 @@ use http::{HeaderMap, HeaderValue}; use miden_node_proto::clients::{Builder, GrpcClient, Interceptor, RpcClient}; use miden_node_proto::generated::rpc::api_client::ApiClient as ProtoClient; use miden_node_proto::generated::{self as proto}; -use miden_node_store::Store; use miden_node_store::genesis::config::GenesisConfig; +use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, Store}; use miden_node_utils::clap::{GrpcOptionsExternal, GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fee::test_fee; use miden_node_utils::limiter::{ @@ -480,6 +480,7 @@ async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, So block_producer_listener, data_directory: dir, grpc_options: GrpcOptionsInternal::test(), + max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, storage_options: StorageOptions::default(), } .serve() @@ -523,6 +524,7 @@ async fn restart_store(store_addr: SocketAddr, data_directory: &std::path::Path) block_producer_listener, data_directory: dir, grpc_options: GrpcOptionsInternal::test(), + max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, storage_options: StorageOptions::default(), } .serve() @@ -599,6 +601,7 @@ async fn sync_chain_mmr_returns_delta() { let request = proto::rpc::SyncChainMmrRequest { block_range: Some(proto::rpc::BlockRange { block_from: 0, block_to: None }), + finality: proto::rpc::Finality::Committed.into(), }; let response = rpc_client.sync_chain_mmr(request).await.expect("sync_chain_mmr should succeed"); let response = response.into_inner(); diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 7c8135d97..555aaf0ec 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -21,7 +21,6 @@ deadpool-diesel = { features = ["sqlite"], version = "0.6" } diesel = { features = ["numeric", "sqlite"], version = "2.3" } diesel_migrations = { features = ["sqlite"], version = "2.3" } fs-err = { workspace = true } -futures = { workspace = true } hex = { version = "0.4" } indexmap = { workspace = true } libsqlite3-sys = { workspace = true } diff --git a/crates/store/src/blocks.rs b/crates/store/src/blocks.rs index e771332ba..aac4279e0 100644 --- a/crates/store/src/blocks.rs +++ b/crates/store/src/blocks.rs @@ -63,10 +63,7 @@ impl BlockStore { Ok(Self { store_dir }) } - pub async fn load_block( - &self, - block_num: BlockNumber, - ) -> Result>, std::io::Error> { + pub async fn load_block(&self, block_num: BlockNumber) -> std::io::Result>> { match tokio::fs::read(self.block_path(block_num)).await { Ok(data) => Ok(Some(data)), Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None), @@ -81,11 +78,7 @@ impl BlockStore { err, fields(block_size = data.len()) )] - pub async fn save_block( - &self, - block_num: BlockNumber, - data: &[u8], - ) -> Result<(), std::io::Error> { + pub async fn save_block(&self, block_num: BlockNumber, data: &[u8]) -> std::io::Result<()> { let (epoch_path, block_path) = self.epoch_block_path(block_num)?; if !epoch_path.exists() { tokio::fs::create_dir_all(epoch_path).await?; @@ -94,11 +87,7 @@ impl BlockStore { tokio::fs::write(block_path, data).await } - pub fn save_block_blocking( - &self, - block_num: BlockNumber, - data: &[u8], - ) -> Result<(), std::io::Error> { + pub fn save_block_blocking(&self, block_num: BlockNumber, data: &[u8]) -> std::io::Result<()> { let (epoch_path, block_path) = self.epoch_block_path(block_num)?; if !epoch_path.exists() { fs_err::create_dir_all(epoch_path)?; @@ -107,6 +96,34 @@ impl BlockStore { fs_err::write(block_path, data) } + // PROOF STORAGE + // -------------------------------------------------------------------------------------------- + + #[expect(dead_code)] + pub async fn load_proof(&self, block_num: BlockNumber) -> std::io::Result>> { + match tokio::fs::read(self.proof_path(block_num)).await { + Ok(data) => Ok(Some(data)), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err), + } + } + + #[instrument( + target = COMPONENT, + name = "store.block_store.save_proof", + skip(self, data), + err, + fields(proof_size = data.len()) + )] + pub async fn save_proof(&self, block_num: BlockNumber, data: &[u8]) -> std::io::Result<()> { + let (epoch_path, proof_path) = self.epoch_proof_path(block_num)?; + if !epoch_path.exists() { + tokio::fs::create_dir_all(epoch_path).await?; + } + + tokio::fs::write(proof_path, data).await + } + // HELPER FUNCTIONS // -------------------------------------------------------------------------------------------- @@ -117,16 +134,27 @@ impl BlockStore { epoch_dir.join(format!("block_{block_num:08x}.dat")) } - fn epoch_block_path( - &self, - block_num: BlockNumber, - ) -> Result<(PathBuf, PathBuf), std::io::Error> { + fn proof_path(&self, block_num: BlockNumber) -> PathBuf { + let block_num = block_num.as_u32(); + let epoch = block_num >> 16; + let epoch_dir = self.store_dir.join(format!("{epoch:04x}")); + epoch_dir.join(format!("proof_{block_num:08x}.dat")) + } + + fn epoch_block_path(&self, block_num: BlockNumber) -> std::io::Result<(PathBuf, PathBuf)> { let block_path = self.block_path(block_num); let epoch_path = block_path.parent().ok_or(std::io::Error::from(ErrorKind::NotFound))?; Ok((epoch_path.to_path_buf(), block_path)) } + fn epoch_proof_path(&self, block_num: BlockNumber) -> std::io::Result<(PathBuf, PathBuf)> { + let proof_path = self.proof_path(block_num); + let epoch_path = proof_path.parent().ok_or(std::io::Error::from(ErrorKind::NotFound))?; + + Ok((epoch_path.to_path_buf(), proof_path)) + } + pub fn display(&self) -> std::path::Display<'_> { self.store_dir.display() } diff --git a/crates/store/src/db/migrations/2025062000000_setup/up.sql b/crates/store/src/db/migrations/2025062000000_setup/up.sql index 5b640dbc7..6dcbb026e 100644 --- a/crates/store/src/db/migrations/2025062000000_setup/up.sql +++ b/crates/store/src/db/migrations/2025062000000_setup/up.sql @@ -1,13 +1,16 @@ CREATE TABLE block_headers ( - block_num INTEGER NOT NULL, - block_header BLOB NOT NULL, - signature BLOB NOT NULL, - commitment BLOB NOT NULL, + block_num INTEGER NOT NULL, + block_header BLOB NOT NULL, + signature BLOB NOT NULL, + commitment BLOB NOT NULL, + proving_inputs BLOB, -- Serialized BlockProofRequest needed for deferred proving. NULL if it has been proven or never proven (genesis block). PRIMARY KEY (block_num), CONSTRAINT block_header_block_num_is_u32 CHECK (block_num BETWEEN 0 AND 0xFFFFFFFF) ); +CREATE INDEX block_headers_proven_desc ON block_headers(block_num DESC) WHERE proving_inputs IS NULL; + CREATE TABLE account_codes ( code_commitment BLOB NOT NULL, code BLOB NOT NULL, diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 43df59d5e..49caa6f40 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use anyhow::Context; use diesel::{Connection, QueryableByName, RunQueryDsl, SqliteConnection}; use miden_node_proto::domain::account::AccountInfo; -use miden_node_proto::generated as proto; +use miden_node_proto::{BlockProofRequest, generated as proto}; use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; @@ -279,12 +279,15 @@ impl Db { conn.transaction(move |conn| { models::queries::apply_block( conn, - genesis.header(), - genesis.signature(), - &[], - &[], - genesis.body().updated_accounts(), - genesis.body().transactions(), + models::queries::ApplyBlockData { + block_header: genesis.header(), + signature: genesis.signature(), + notes: &[], + nullifiers: &[], + accounts: genesis.body().updated_accounts(), + transactions: genesis.body().transactions(), + proving_inputs: None, // Genesis block is never proven. + }, ) }) .context("failed to insert genesis block")?; @@ -564,16 +567,20 @@ impl Db { acquire_done: oneshot::Receiver<()>, signed_block: SignedBlock, notes: Vec<(NoteRecord, Option)>, + proving_inputs: Option, ) -> Result<()> { self.transact("apply block", move |conn| -> Result<()> { models::queries::apply_block( conn, - signed_block.header(), - signed_block.signature(), - ¬es, - signed_block.body().created_nullifiers(), - signed_block.body().updated_accounts(), - signed_block.body().transactions(), + models::queries::ApplyBlockData { + block_header: signed_block.header(), + signature: signed_block.signature(), + notes: ¬es, + nullifiers: signed_block.body().created_nullifiers(), + accounts: signed_block.body().updated_accounts(), + transactions: signed_block.body().transactions(), + proving_inputs, + }, )?; // XXX FIXME TODO free floating mutex MUST NOT exist @@ -591,6 +598,55 @@ impl Db { .await } + /// Marks a previously committed block as proven. + /// + /// Clears the `proving_inputs` for the given block number. + #[instrument(target = COMPONENT, skip_all, err)] + pub async fn mark_block_proven(&self, block_num: BlockNumber) -> Result<()> { + self.transact("mark block proven", move |conn| { + models::queries::mark_block_proven(conn, block_num) + }) + .await?; + Ok(()) + } + + /// Returns the proving inputs for a given block number, if stored. + #[instrument(level = "debug", target = COMPONENT, skip_all, err)] + pub async fn select_block_proving_inputs( + &self, + block_num: BlockNumber, + ) -> Result> { + self.transact("select block proving inputs", move |conn| { + models::queries::select_block_proving_inputs(conn, block_num) + }) + .await + } + + /// Returns unproven block numbers greater than `after`, in ascending order, up to `limit`. + #[instrument(level = "debug", target = COMPONENT, skip_all, err)] + pub async fn select_unproven_blocks( + &self, + after: BlockNumber, + limit: usize, + ) -> Result> { + self.transact("select unproven blocks", move |conn| { + models::queries::select_unproven_blocks(conn, after, limit) + }) + .await + } + + /// Returns the highest block number that has been proven, or `None` if no blocks have been + /// proven yet. + /// + /// This includes the genesis block, which is not technically proven, but treated as such. + #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] + pub async fn select_latest_proven_block_num(&self) -> Result> { + self.transact("select latest proven block num", |conn| { + models::queries::select_latest_proven_block_num(conn) + }) + .await + } + /// Selects storage map values for syncing storage maps for a specific account ID. /// /// The returned values are the latest known values up to `block_range.end()`, and no values diff --git a/crates/store/src/db/models/queries/block_headers.rs b/crates/store/src/db/models/queries/block_headers.rs index bfcd34ee7..db75d04b3 100644 --- a/crates/store/src/db/models/queries/block_headers.rs +++ b/crates/store/src/db/models/queries/block_headers.rs @@ -13,6 +13,7 @@ use diesel::{ }; use miden_crypto::Word; use miden_crypto::dsa::ecdsa_k256_keccak::Signature; +use miden_node_proto::BlockProofRequest; use miden_node_utils::limiter::{QueryParamBlockLimit, QueryParamLimiter}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::utils::{Deserializable, Serializable}; @@ -205,16 +206,7 @@ pub struct BlockHeaderInsert { pub block_header: Vec, pub signature: Vec, pub commitment: Vec, -} -impl From<(&BlockHeader, &Signature)> for BlockHeaderInsert { - fn from((header, signature): (&BlockHeader, &Signature)) -> Self { - Self { - block_num: header.block_num().to_raw_sql(), - block_header: header.to_bytes(), - signature: signature.to_bytes(), - commitment: BlockHeaderCommitment::new(header).to_raw_sql(), - } - } + pub proving_inputs: Option>, } /// Insert a [`BlockHeader`] to the DB using the given [`SqliteConnection`]. @@ -236,10 +228,131 @@ pub(crate) fn insert_block_header( conn: &mut SqliteConnection, block_header: &BlockHeader, signature: &Signature, + proving_inputs: Option, +) -> Result { + let row = BlockHeaderInsert { + block_num: block_header.block_num().to_raw_sql(), + block_header: block_header.to_bytes(), + signature: signature.to_bytes(), + commitment: BlockHeaderCommitment::new(block_header).to_raw_sql(), + proving_inputs: proving_inputs.map(|inputs| inputs.to_bytes()), + }; + let count = diesel::insert_into(schema::block_headers::table).values(&[row]).execute(conn)?; + Ok(count) +} + +/// Select the proving inputs for a given block number. +/// +/// # Returns +/// +/// `None` if the block does not exist or has no proving inputs stored. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT proving_inputs +/// FROM block_headers +/// WHERE block_num = ?1 +/// ``` +pub(crate) fn select_block_proving_inputs( + conn: &mut SqliteConnection, + block_num: BlockNumber, +) -> Result, DatabaseError> { + let inputs: Option>> = + SelectDsl::select(schema::block_headers::table, schema::block_headers::proving_inputs) + .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql())) + .get_result(conn) + .optional()?; + inputs + .flatten() + .map(|bytes| BlockProofRequest::read_from_bytes(&bytes)) + .transpose() + .map_err(Into::into) +} + +/// Mark a committed block as proven by clearing its proving inputs. +/// +/// Sets `proving_inputs` to `NULL` for the row with the given `block_num`. +/// +/// # Returns +/// +/// The number of affected rows (expected: 1). +#[tracing::instrument( + target = COMPONENT, + skip_all, + fields(block.number = %block_num), + err, +)] +pub(crate) fn mark_block_proven( + conn: &mut SqliteConnection, + block_num: BlockNumber, ) -> Result { - let block_header = BlockHeaderInsert::from((block_header, signature)); - let count = diesel::insert_into(schema::block_headers::table) - .values(&[block_header]) - .execute(conn)?; + let count = diesel::update( + schema::block_headers::table + .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql())), + ) + .set(schema::block_headers::proving_inputs.eq(None::>)) + .execute(conn)?; Ok(count) } + +/// Select unproven block numbers greater than `after`, in ascending order, up to `limit`. +/// +/// A block is unproven when its `proving_inputs` are non-NULL. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT block_num +/// FROM block_headers +/// WHERE proving_inputs IS NOT NULL +/// AND block_num > ? +/// ORDER BY block_num ASC +/// LIMIT ? +/// ``` +pub(crate) fn select_unproven_blocks( + conn: &mut SqliteConnection, + after: BlockNumber, + limit: usize, +) -> Result, DatabaseError> { + let block_nums: Vec = + SelectDsl::select(schema::block_headers::table, schema::block_headers::block_num) + .filter(schema::block_headers::proving_inputs.is_not_null()) + .filter(schema::block_headers::block_num.gt(after.to_raw_sql())) + .order(schema::block_headers::block_num.asc()) + .limit(i64::try_from(limit).expect("unproven block number limit should fit in i64")) + .load(conn)?; + + block_nums + .into_iter() + .map(BlockNumber::from_raw_sql) + .collect::, _>>() + .map_err(Into::into) +} + +/// Select the highest block number that has been proven. +/// +/// A block is considered proven when its `proving_inputs` are `NULL`. This includes the genesis +/// block, which is not technically proven, but treated as such. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT block_num +/// FROM block_headers +/// WHERE proving_inputs IS NULL +/// ORDER BY block_num DESC +/// LIMIT 1 +/// ``` +pub(crate) fn select_latest_proven_block_num( + conn: &mut SqliteConnection, +) -> Result, DatabaseError> { + let block_num: Option = + SelectDsl::select(schema::block_headers::table, schema::block_headers::block_num) + .filter(schema::block_headers::proving_inputs.is_null()) + .order(schema::block_headers::block_num.desc()) + .first(conn) + .optional()?; + + block_num.map(BlockNumber::from_raw_sql).transpose().map_err(Into::into) +} diff --git a/crates/store/src/db/models/queries/mod.rs b/crates/store/src/db/models/queries/mod.rs index ad2010f84..2de9e33fa 100644 --- a/crates/store/src/db/models/queries/mod.rs +++ b/crates/store/src/db/models/queries/mod.rs @@ -27,6 +27,7 @@ use diesel::SqliteConnection; use miden_crypto::dsa::ecdsa_k256_keccak::Signature; +use miden_node_proto::BlockProofRequest; use miden_protocol::block::{BlockAccountUpdate, BlockHeader}; use miden_protocol::note::Nullifier; use miden_protocol::transaction::OrderedTransactionHeaders; @@ -46,6 +47,17 @@ pub(crate) use nullifiers::*; mod notes; pub(crate) use notes::*; +/// All data needed to apply a new block to the database. +pub(crate) struct ApplyBlockData<'a> { + pub block_header: &'a BlockHeader, + pub signature: &'a Signature, + pub notes: &'a [(NoteRecord, Option)], + pub nullifiers: &'a [Nullifier], + pub accounts: &'a [BlockAccountUpdate], + pub transactions: &'a OrderedTransactionHeaders, + pub proving_inputs: Option, +} + /// Apply a new block to the state /// /// # Arguments @@ -55,20 +67,15 @@ pub(crate) use notes::*; /// Number of records inserted and/or updated. pub(crate) fn apply_block( conn: &mut SqliteConnection, - block_header: &BlockHeader, - signature: &Signature, - notes: &[(NoteRecord, Option)], - nullifiers: &[Nullifier], - accounts: &[BlockAccountUpdate], - transactions: &OrderedTransactionHeaders, + data: ApplyBlockData<'_>, ) -> Result { let mut count = 0; // Note: ordering here is important as the relevant tables have FK dependencies. - count += insert_block_header(conn, block_header, signature)?; - count += upsert_accounts(conn, accounts, block_header.block_num())?; - count += insert_scripts(conn, notes.iter().map(|(note, _)| note))?; - count += insert_notes(conn, notes)?; - count += insert_transactions(conn, block_header.block_num(), transactions)?; - count += insert_nullifiers_for_block(conn, nullifiers, block_header.block_num())?; + count += insert_block_header(conn, data.block_header, data.signature, data.proving_inputs)?; + count += upsert_accounts(conn, data.accounts, data.block_header.block_num())?; + count += insert_scripts(conn, data.notes.iter().map(|(note, _)| note))?; + count += insert_notes(conn, data.notes)?; + count += insert_transactions(conn, data.block_header.block_num(), data.transactions)?; + count += insert_nullifiers_for_block(conn, data.nullifiers, data.block_header.block_num())?; Ok(count) } diff --git a/crates/store/src/db/schema.rs b/crates/store/src/db/schema.rs index 60660afcb..4272cdaa1 100644 --- a/crates/store/src/db/schema.rs +++ b/crates/store/src/db/schema.rs @@ -49,6 +49,7 @@ diesel::table! { block_header -> Binary, signature -> Binary, commitment -> Binary, + proving_inputs -> Nullable, } } diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index a9f72b99a..3026138b4 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -102,8 +102,10 @@ fn create_block(conn: &mut SqliteConnection, block_num: BlockNumber) { ); let dummy_signature = SecretKey::new().sign(block_header.commitment()); - conn.transaction(|conn| queries::insert_block_header(conn, &block_header, &dummy_signature)) - .unwrap(); + conn.transaction(|conn| { + queries::insert_block_header(conn, &block_header, &dummy_signature, None) + }) + .unwrap(); } #[test] @@ -739,7 +741,7 @@ fn db_block_header() { // test insertion let dummy_signature = SecretKey::new().sign(block_header.commitment()); - queries::insert_block_header(conn, &block_header, &dummy_signature).unwrap(); + queries::insert_block_header(conn, &block_header, &dummy_signature, None).unwrap(); // test fetch unknown block header let block_number = 1; @@ -771,7 +773,7 @@ fn db_block_header() { ); let dummy_signature = SecretKey::new().sign(block_header2.commitment()); - queries::insert_block_header(conn, &block_header2, &dummy_signature).unwrap(); + queries::insert_block_header(conn, &block_header2, &dummy_signature, None).unwrap(); let res = queries::select_block_header_by_block_num(conn, None).unwrap(); assert_eq!(res.unwrap(), block_header2); @@ -2006,7 +2008,7 @@ fn db_roundtrip_block_header() { // Insert let dummy_signature = SecretKey::new().sign(block_header.commitment()); - queries::insert_block_header(&mut conn, &block_header, &dummy_signature).unwrap(); + queries::insert_block_header(&mut conn, &block_header, &dummy_signature, None).unwrap(); // Retrieve let retrieved = diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index ef624dd69..0ed39de5c 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -30,6 +30,19 @@ use tonic::Status; use crate::account_state_forest::{AccountStateForestError, WitnessError}; use crate::db::models::conv::DatabaseTypeConversionError; +// PROOF SCHEDULER ERRORS +// ================================================================================================= + +#[derive(Debug, Error)] +pub enum ProofSchedulerError { + #[error("no proving inputs found for block {0}")] + MissingProvingInputs(BlockNumber), + #[error("failed to deserialize proving inputs for block")] + DeserializationFailed(#[source] DeserializationError), + #[error("invalid remote prover endpoint: {0}")] + InvalidProverEndpoint(String), +} + // DATABASE ERRORS // ================================================================================================= @@ -254,6 +267,11 @@ pub enum SyncChainMmrError { }, #[error("malformed block number")] DeserializationFailed(#[source] ConversionError), + #[error("no proven blocks available")] + NoProvenBlocks, + #[error("database error")] + #[grpc(internal)] + DatabaseError(#[source] DatabaseError), } impl From for StateSyncError { diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 3122cbf8f..a4134aa33 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -15,6 +15,7 @@ pub use db::models::conv::SqlTypeConvert; pub use errors::DatabaseError; pub use genesis::GenesisState; pub use server::block_prover_client::BlockProver; +pub use server::proof_scheduler::DEFAULT_MAX_CONCURRENT_PROOFS; pub use server::{DataDirectory, Store}; // CONSTANTS diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index 56bfcafb4..3391c9033 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -9,12 +9,13 @@ use miden_protocol::account::AccountId; use miden_protocol::batch::OrderedBatches; use miden_protocol::block::{BlockInputs, BlockNumber}; use miden_protocol::note::Nullifier; +use tokio::sync::watch; use tonic::{Request, Response, Status}; use tracing::{info, instrument}; +use crate::COMPONENT; use crate::errors::GetBlockInputsError; use crate::state::State; -use crate::{BlockProver, COMPONENT}; // STORE API // ================================================================================================ @@ -22,7 +23,8 @@ use crate::{BlockProver, COMPONENT}; #[derive(Clone)] pub struct StoreApi { pub(super) state: Arc, - pub(super) block_prover: Arc, + /// Sender used to notify the proof scheduler of the latest committed block number. + pub(super) chain_tip_sender: watch::Sender, } impl StoreApi { diff --git a/crates/store/src/server/block_producer.rs b/crates/store/src/server/block_producer.rs index 25f6b05f6..b32b03c29 100644 --- a/crates/store/src/server/block_producer.rs +++ b/crates/store/src/server/block_producer.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; -use futures::TryFutureExt; use miden_crypto::dsa::ecdsa_k256_keccak::Signature; +use miden_node_proto::domain::proof_request::BlockProofRequest; use miden_node_proto::errors::MissingFieldHelper; use miden_node_proto::generated::store::block_producer_server; use miden_node_proto::generated::{self as proto}; @@ -13,7 +13,7 @@ use miden_protocol::batch::OrderedBatches; use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, SignedBlock}; use miden_protocol::utils::Deserializable; use tonic::{Request, Response, Status}; -use tracing::Instrument; +use tracing::{Instrument, error}; use crate::errors::ApplyBlockError; use crate::server::api::{ @@ -89,41 +89,44 @@ impl block_producer_server::BlockProducer for StoreApi { span.set_attribute("block.output_notes.count", body.output_notes().count()); span.set_attribute("block.nullifiers.count", body.created_nullifiers().len()); - // We perform the apply/prove block work in a separate task. This prevents the caller + // Construct block proof request to be stored alongside the block for deferred block + // proving. + let proving_inputs = BlockProofRequest { + tx_batches: ordered_batches, + block_header: header.clone(), + block_inputs, + }; + + // We perform the apply block work in a separate task. This prevents the caller // cancelling the request and thereby cancelling the task at an arbitrary point of // execution. // // Normally this shouldn't be a problem, however our apply_block isn't quite ACID compliant // so things get a bit messy. This is more a temporary hack-around to minimize this risk. let this = self.clone(); - // TODO(sergerad): Use block proof. - let _block_proof = tokio::spawn( + tokio::spawn( async move { - // SAFETY: The header, body, and signature are assumed to - // correspond to each other because they are provided by the Block - // Producer. - let signed_block = SignedBlock::new_unchecked(header.clone(), body, signature); // TODO(sergerad): Use `SignedBlock::new()` when available. + let block_num = header.block_num(); + let signed_block = SignedBlock::new(header, body, signature) + .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; // Note: This is an internal endpoint, so its safe to expose the full error // report. this.state - .apply_block(signed_block) - .inspect_err(|err| { - span.set_error(err); + .apply_block(signed_block, Some(proving_inputs)) + .await + .inspect(|_| { + if let Err(err) = this.chain_tip_sender.send(block_num) { + error!("Failed to send chain tip: {:?}", err); + } }) .map_err(|err| { + span.set_error(&err); let code = match err { ApplyBlockError::InvalidBlockError(_) => tonic::Code::InvalidArgument, _ => tonic::Code::Internal, }; Status::new(code, err.as_report()) }) - .and_then(|_| { - this.block_prover - .prove(ordered_batches, block_inputs, &header) - .map_err(|err| Status::new(tonic::Code::Internal, err.as_report())) - }) - .await - .map(Response::new) } .in_current_span(), ) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 8e9e46118..863881876 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -1,3 +1,4 @@ +use std::num::NonZeroUsize; use std::ops::Not; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -31,6 +32,7 @@ mod api; mod block_producer; pub mod block_prover_client; mod ntx_builder; +pub mod proof_scheduler; mod rpc_api; /// The store server. @@ -41,6 +43,8 @@ pub struct Store { /// URL for the Block Prover client. Uses local prover if `None`. pub block_prover_url: Option, pub data_directory: PathBuf, + /// Maximum number of blocks being proven concurrently by the proof scheduler. + pub max_concurrent_proofs: NonZeroUsize, pub storage_options: StorageOptions, pub grpc_options: GrpcOptionsInternal, } @@ -103,18 +107,32 @@ impl Store { Arc::new(BlockProver::local()) }; + // Initialize the chain tip watch channel. + let chain_tip = state.latest_block_num().await; + let (chain_tip_sender, chain_tip_rx) = tokio::sync::watch::channel(chain_tip); + + // Spawn the proof scheduler as a background task. It will immediately pick up any + // unproven blocks from previous runs and begin proving them. + let proof_scheduler_task = proof_scheduler::spawn( + state.db().clone(), + block_prover, + state.block_store(), + chain_tip_rx, + self.max_concurrent_proofs, + ); + let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state), - block_prover: Arc::clone(&block_prover), + chain_tip_sender: chain_tip_sender.clone(), }); let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi { state: Arc::clone(&state), - block_prover: Arc::clone(&block_prover), + chain_tip_sender: chain_tip_sender.clone(), }); let block_producer_service = store::block_producer_server::BlockProducerServer::new(api::StoreApi { state: Arc::clone(&state), - block_prover: Arc::clone(&block_prover), + chain_tip_sender, }); let reflection_service = tonic_reflection::server::Builder::configure() .register_file_descriptor_set(store_rpc_api_descriptor()) @@ -178,6 +196,13 @@ impl Store { result = service => result, Some(err) = termination_signal.recv() => { Err(anyhow::anyhow!("received termination signal").context(err)) + }, + result = proof_scheduler_task => { + match result { + Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")), + Ok(Err(err)) => Err(err.context("proof scheduler fatal error")), + Err(join_err) => Err(join_err).context("proof scheduler panicked"), + } } } } diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs new file mode 100644 index 000000000..242d9723a --- /dev/null +++ b/crates/store/src/server/proof_scheduler.rs @@ -0,0 +1,269 @@ +//! Background task that drives deferred block proving. +//! +//! The [`proof_scheduler`] is spawned as an internal Store task. It: +//! +//! 1. Tracks `chain_tip` via a [`watch::Receiver`] and `latest_proven_block` locally. +//! 2. Maintains up to `max_concurrent_proofs` in-flight proving jobs via a [`JoinSet`]. +//! 3. Marks blocks as proven in the database **sequentially** — a block is only marked after all +//! its ancestors have been marked. +//! 4. On transient errors (DB reads, prover failures, timeouts), the failed block is retried +//! internally within its proving task. +//! 5. On fatal errors (e.g. deserialization failures, missing proving inputs), the scheduler +//! returns the error to the caller for node shutdown. + +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use miden_protocol::block::{BlockNumber, BlockProof}; +use miden_protocol::utils::Serializable; +use miden_remote_prover_client::RemoteProverClientError; +use thiserror::Error; +use tokio::sync::watch; +use tokio::task::{JoinHandle, JoinSet}; +use tracing::{error, info, instrument}; + +use crate::COMPONENT; +use crate::blocks::BlockStore; +use crate::db::Db; +use crate::errors::{DatabaseError, ProofSchedulerError}; +use crate::server::block_prover_client::{BlockProver, StoreProverError}; + +// CONSTANTS +// ================================================================================================ + +/// Overall timeout for proving a single block. +const BLOCK_PROVE_TIMEOUT: Duration = Duration::from_mins(4); + +/// Default maximum number of blocks being proven concurrently. +pub const DEFAULT_MAX_CONCURRENT_PROOFS: NonZeroUsize = NonZeroUsize::new(8).unwrap(); + +/// A wrapper around [`JoinSet`] whose `join_next` returns [`std::future::pending`] when empty +/// instead of `None`, making it safe to use directly in `tokio::select!` without a special case. +struct ProofTaskJoinSet(JoinSet>); + +impl ProofTaskJoinSet { + fn new() -> Self { + Self(JoinSet::new()) + } + + fn spawn( + &mut self, + db: &Arc, + block_prover: &Arc, + block_store: &Arc, + block_num: BlockNumber, + ) { + let db = Arc::clone(db); + let block_prover = Arc::clone(block_prover); + let block_store = Arc::clone(block_store); + self.0.spawn( + async move { prove_and_save(&db, &block_prover, &block_store, block_num).await }, + ); + } + + /// Returns the result of the next completed task, or pends forever if the set is empty. + async fn join_next(&mut self) -> anyhow::Result { + if self.0.is_empty() { + std::future::pending().await + } else { + self.0 + .join_next() + .await + .expect("join set is not empty") + .context("proving task panicked") + .flatten() + } + } +} + +// PROOF SCHEDULER +// ================================================================================================ + +/// Spawns the proof scheduler as a background tokio task. +/// +/// The scheduler uses `chain_tip_rx` to learn about newly committed blocks and queries the DB +/// for unproven blocks to prove. +/// +/// Returns a [`JoinHandle`] that resolves when the scheduler encounters a fatal error or +/// completes unexpectedly. +pub fn spawn( + db: Arc, + block_prover: Arc, + block_store: Arc, + chain_tip_rx: watch::Receiver, + max_concurrent_proofs: NonZeroUsize, +) -> JoinHandle> { + tokio::spawn(run(db, block_prover, block_store, chain_tip_rx, max_concurrent_proofs)) +} + +/// Main loop of the proof scheduler. +/// +/// Maintains a pool of concurrent proving jobs via [`JoinSet`], fills them up to +/// `max_concurrent_proofs`, and drains completed results. +/// +/// Unproven blocks are discovered by querying the database each iteration. +/// +/// Returns `Err` on irrecoverable errors (missing/corrupt proving inputs, DB write failures). +/// Transient errors are retried internally. +async fn run( + db: Arc, + block_prover: Arc, + block_store: Arc, + mut chain_tip_rx: watch::Receiver, + max_concurrent_proofs: NonZeroUsize, +) -> anyhow::Result<()> { + info!(target: COMPONENT, "Proof scheduler started"); + + // In-flight proving tasks. + let mut join_set = ProofTaskJoinSet::new(); + // Number of blocks currently being proven. + let mut inflight_count: usize = 0; + // Highest block number that is inflight or has been proven. Used to avoid re-querying + // blocks we've already scheduled. + let mut highest_scheduled = BlockNumber::GENESIS; + + loop { + // Query the DB for unproven blocks beyond what we've already scheduled. + let capacity = max_concurrent_proofs.get() - inflight_count; + if capacity > 0 { + let unproven = db.select_unproven_blocks(highest_scheduled, capacity).await?; + + inflight_count += unproven.len(); + if let Some(&last) = unproven.last() { + highest_scheduled = last; + } + + for block_num in unproven { + join_set.spawn(&db, &block_prover, &block_store, block_num); + } + } + + // Wait for either a job to complete or the chain tip to advance. + tokio::select! { + // Proving task completed. + result = join_set.join_next() => { + inflight_count = inflight_count.saturating_sub(1); + info!(target=COMPONENT, block.number=%result?, "Block proven"); + }, + + // New chain tip received — re-query for unproven blocks on next iteration. + result = chain_tip_rx.changed() => { + if result.is_err() { + info!(target: COMPONENT, "Chain tip channel closed, proof scheduler exiting"); + return Ok(()); + } + }, + } + } +} + +// PROVE BLOCK +// ================================================================================================ + +/// Proves a single block, saves the proof to the block store, and returns the block number. +/// +/// This function encapsulates the full lifecycle of a single proof job: loading inputs from the +/// DB, invoking the prover (with a timeout), and persisting the proof to disk, and marking the +/// block as proven in the DB. +#[instrument(target = COMPONENT, name = "prove_block", skip_all, fields(block.number=block_num.as_u32()), err)] +async fn prove_and_save( + db: &Db, + block_prover: &BlockProver, + block_store: &BlockStore, + block_num: BlockNumber, +) -> anyhow::Result { + const MAX_RETRIES: u32 = 10; + + for _ in 0..MAX_RETRIES { + match tokio::time::timeout(BLOCK_PROVE_TIMEOUT, prove_block(db, block_prover, block_num)) + .await + { + Ok(Ok(proof)) => { + save_block(block_store, block_num, &proof).await?; + db.mark_block_proven(block_num).await?; + return Ok(block_num); + }, + Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?, + Ok(Err(ProveBlockError::Transient(err))) => { + error!(target: COMPONENT, %block_num, err = ?err, "transient error proving block, retrying"); + }, + Err(elapsed) => { + error!(target: COMPONENT, %block_num, %elapsed, "block proving timed out, retrying"); + }, + } + } + + anyhow::bail!("maximum retries ({MAX_RETRIES}) exceeded"); +} + +/// Proves a single block by loading inputs from the DB and invoking the block prover. +/// +/// Records `block_commitment` on `parent_span` once the block header is available. +#[instrument(target = COMPONENT, name = "prove_block.prove", skip_all, fields(block.number=block_num.as_u32()), err)] +async fn prove_block( + db: &Db, + block_prover: &BlockProver, + block_num: BlockNumber, +) -> Result { + let request = db + .select_block_proving_inputs(block_num) + .await + .map_err(ProveBlockError::from_db_error)? + .ok_or_else(|| { + ProveBlockError::Fatal(ProofSchedulerError::MissingProvingInputs(block_num)) + })?; + + let proof = block_prover + .prove(request.tx_batches, request.block_inputs, &request.block_header) + .await + .map_err(ProveBlockError::from_prover_error)?; + + Ok(proof) +} + +/// Saves a block proof to the block store. +#[instrument(target = COMPONENT, name = "prove_block.save", skip_all, fields(block.number=block_num.as_u32()), err)] +async fn save_block( + block_store: &BlockStore, + block_num: BlockNumber, + proof: &BlockProof, +) -> anyhow::Result<()> { + block_store.save_proof(block_num, &proof.to_bytes()).await?; + Ok(()) +} + +// PROVE BLOCK ERROR +// ================================================================================================ + +/// Errors that can occur during block proving. +#[derive(Debug, Error)] +enum ProveBlockError { + /// An irrecoverable error that should cause node shutdown. + #[error("fatal error")] + Fatal(#[source] ProofSchedulerError), + /// A transient error (DB read, prover failure). The outer loop will retry. + #[error("transient error: {0}")] + Transient(Box), +} + +impl ProveBlockError { + fn from_db_error(err: DatabaseError) -> Self { + match err { + DatabaseError::DeserializationError(err) => { + Self::Fatal(ProofSchedulerError::DeserializationFailed(err)) + }, + _ => Self::Transient(err.into()), + } + } + + fn from_prover_error(err: StoreProverError) -> Self { + match err { + StoreProverError::RemoteProvingFailed(RemoteProverClientError::InvalidEndpoint( + uri, + )) => Self::Fatal(ProofSchedulerError::InvalidProverEndpoint(uri)), + _ => Self::Transient(err.into()), + } + } +} diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index a12fcafae..0f97cc905 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -166,12 +166,25 @@ impl rpc_server::Rpc for StoreApi { .ok_or_else(|| proto::rpc::SyncChainMmrRequest::missing_field(stringify!(block_range))) .map_err(SyncChainMmrError::DeserializationFailed)?; + // Determine the effective tip based on the requested finality level. + let effective_tip = match request.finality() { + proto::rpc::Finality::Unspecified | proto::rpc::Finality::Committed => chain_tip, + proto::rpc::Finality::Proven => self + .state + .db() + .select_latest_proven_block_num() + .await + .map_err(SyncChainMmrError::DatabaseError)? + .ok_or(SyncChainMmrError::NoProvenBlocks)?, + }; + let block_from = BlockNumber::from(block_range.block_from); - if block_from > chain_tip { - Err(SyncChainMmrError::FutureBlock { chain_tip, block_from })?; + if block_from > effective_tip { + Err(SyncChainMmrError::FutureBlock { chain_tip: effective_tip, block_from })?; } - let block_to = block_range.block_to.map_or(chain_tip, BlockNumber::from).min(chain_tip); + let block_to = + block_range.block_to.map_or(effective_tip, BlockNumber::from).min(effective_tip); if block_from > block_to { Err(SyncChainMmrError::InvalidBlockRange(InvalidBlockRange::StartGreaterThanEnd { diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs index 7949fcbeb..223c396d8 100644 --- a/crates/store/src/state/apply_block.rs +++ b/crates/store/src/state/apply_block.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use miden_node_proto::BlockProofRequest; use miden_node_utils::ErrorReport; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::block::SignedBlock; @@ -41,7 +42,11 @@ impl State { // TODO: This span is logged in a root span, we should connect it to the parent span. #[expect(clippy::too_many_lines)] #[instrument(target = COMPONENT, skip_all, err)] - pub async fn apply_block(&self, signed_block: SignedBlock) -> Result<(), ApplyBlockError> { + pub async fn apply_block( + &self, + signed_block: SignedBlock, + proving_inputs: Option, + ) -> Result<(), ApplyBlockError> { let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?; let header = signed_block.header(); @@ -227,8 +232,11 @@ impl State { // spawned. let db = Arc::clone(&self.db); let db_update_task = tokio::spawn( - async move { db.apply_block(allow_acquire, acquire_done, signed_block, notes).await } - .in_current_span(), + async move { + db.apply_block(allow_acquire, acquire_done, signed_block, notes, proving_inputs) + .await + } + .in_current_span(), ); // Wait for the message from the DB update task, that we ready to commit the DB transaction. diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index a511eee95..418c890de 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -193,6 +193,16 @@ impl State { }) } + /// Returns the database. + pub(crate) fn db(&self) -> Arc { + Arc::clone(&self.db) + } + + /// Returns the block store. + pub(crate) fn block_store(&self) -> Arc { + Arc::clone(&self.block_store) + } + // STATE ACCESSORS // -------------------------------------------------------------------------------------------- diff --git a/proto/proto/rpc.proto b/proto/proto/rpc.proto index 1a218539e..b181162d5 100644 --- a/proto/proto/rpc.proto +++ b/proto/proto/rpc.proto @@ -483,14 +483,30 @@ message SyncNotesResponse { // SYNC CHAIN MMR // ================================================================================================ +// The finality level for chain data queries. +enum Finality { + // Return data up to the latest committed block. + FINALITY_UNSPECIFIED = 0; + // Return data up to the latest committed block. + FINALITY_COMMITTED = 1; + // Return data only up to the latest proven block. + FINALITY_PROVEN = 2; +} + // Chain MMR synchronization request. message SyncChainMmrRequest { // Block range from which to synchronize the chain MMR. // // The response will contain MMR delta starting after `block_range.block_from` up to - // `block_range.block_to` or the chain tip (whichever is lower). Set `block_from` to the last - // block already present in the caller's MMR so the delta begins at the next block. + // `block_range.block_to` or the effective tip (whichever is lower). Set `block_from` to the + // last block already present in the caller's MMR so the delta begins at the next block. BlockRange block_range = 1; + + // The finality level to use when clamping the upper bound of the block range. + // + // When set to `FINALITY_UNSPECIFIED` or `FINALITY_COMMITTED`, the upper bound is clamped to the chain tip. + // When set to `FINALITY_PROVEN`, the upper bound is clamped to the latest proven block. + Finality finality = 2; } // Represents the result of syncing chain MMR.