Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions magicblock-account-cloner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use magicblock_chainlink::{
};
use magicblock_committor_service::{BaseIntentCommittor, CommittorService};
use magicblock_config::config::ChainLinkConfig;
use magicblock_core::link::transactions::TransactionSchedulerHandle;
use magicblock_core::link::transactions::{
with_encoded, TransactionSchedulerHandle,
};
use magicblock_ledger::LatestBlock;
use magicblock_magic_program_api::{
args::ScheduleTaskArgs,
Expand Down Expand Up @@ -102,7 +104,7 @@ impl ChainlinkCloner {

async fn send_tx(&self, tx: Transaction) -> ClonerResult<Signature> {
let sig = tx.signatures[0];
self.tx_scheduler.execute(tx).await?;
self.tx_scheduler.execute(with_encoded(tx)?).await?;
Ok(sig)
Comment thread
bmuddha marked this conversation as resolved.
}

Expand Down
14 changes: 9 additions & 5 deletions magicblock-accounts/src/scheduled_commits_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use magicblock_committor_service::{
intent_execution_manager::BroadcastedIntentExecutionResult,
intent_executor::ExecutionOutput, BaseIntentCommittor, CommittorService,
};
use magicblock_core::link::transactions::TransactionSchedulerHandle;
use magicblock_core::link::transactions::{
with_encoded, TransactionSchedulerHandle,
};
use magicblock_metrics::metrics;
use magicblock_program::{
magic_scheduled_base_intent::ScheduledIntentBundle,
Expand Down Expand Up @@ -200,10 +202,12 @@ impl ScheduledCommitsProcessorImpl {
let sent_commit =
Self::build_sent_commit(intent_id, intent_meta, result);
register_scheduled_commit_sent(sent_commit);
match internal_transaction_scheduler
.execute(intent_sent_transaction)
.await
{
let Ok(txn) = with_encoded(intent_sent_transaction) else {
// Unreachable case, all intent transactions are smaller than 64KB by construction
error!("Failed to bincode intent transaction");
return;
};
match internal_transaction_scheduler.execute(txn).await {
Ok(()) => {
Comment thread
bmuddha marked this conversation as resolved.
debug!("Sent commit signaled")
}
Expand Down
20 changes: 14 additions & 6 deletions magicblock-aperture/src/requests/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use hyper::{
Request, Response,
};
use magicblock_accounts_db::traits::AccountsBank;
use magicblock_core::link::transactions::SanitizeableTransaction;
use magicblock_core::link::transactions::{
SanitizeableTransaction, WithEncoded,
};
use magicblock_metrics::metrics::{AccountFetchOrigin, ENSURE_ACCOUNTS_TIME};
use prelude::JsonBody;
use solana_account::AccountSharedData;
Expand Down Expand Up @@ -170,14 +172,19 @@ impl HttpDispatcher {
/// 3. Validates the transaction's `recent_blockhash` against the ledger, optionally
/// replacing it with the latest one.
/// 4. Sanitizes the transaction, which includes verifying signatures unless disabled.
///
/// Returns `WithEncoded<SanitizedTransaction>` with the original wire bytes.
/// For execution (replace_blockhash=false), bytes are preserved for replication.
/// For simulation (replace_blockhash=true), bytes are unused.
fn prepare_transaction(
&self,
txn: &str,
encoding: UiTransactionEncoding,
sigverify: bool,
replace_blockhash: bool,
) -> RpcResult<SanitizedTransaction> {
let decoded = match encoding {
) -> RpcResult<WithEncoded<SanitizedTransaction>> {
// parse the string as bincode serialized bytes
let encoded = match encoding {
Comment thread
bmuddha marked this conversation as resolved.
UiTransactionEncoding::Base58 => {
bs58::decode(txn).into_vec().map_err(RpcError::parse_error)
}
Expand All @@ -190,7 +197,7 @@ impl HttpDispatcher {
}?;

let mut transaction: VersionedTransaction =
bincode::deserialize(&decoded).map_err(RpcError::invalid_params)?;
bincode::deserialize(&encoded).map_err(RpcError::invalid_params)?;

if replace_blockhash {
transaction
Expand All @@ -205,7 +212,8 @@ impl HttpDispatcher {
};
}

Ok(transaction.sanitize(sigverify)?)
let txn = transaction.sanitize(sigverify)?;
Ok(WithEncoded { txn, encoded })
Comment thread
bmuddha marked this conversation as resolved.
}

/// Ensures all accounts required for a transaction are present in the `AccountsDb`.
Expand All @@ -224,7 +232,7 @@ impl HttpDispatcher {
{
Ok(res) if res.is_ok() => Ok(()),
Ok(res) => {
debug!(result = %res, "Transaction account resolution encountered issues");
debug!(%res, "Transaction account resolution encountered issues");
Ok(())
}
Err(err) => {
Expand Down
12 changes: 7 additions & 5 deletions magicblock-aperture/src/requests/http/request_airdrop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use magicblock_core::link::transactions::SanitizeableTransaction;
use magicblock_core::link::transactions::with_encoded;

use super::prelude::*;

Expand Down Expand Up @@ -35,11 +35,13 @@ impl HttpDispatcher {
lamports,
self.blocks.get_latest().hash,
);
// we don't need to verify transaction that we just signed
let txn = txn.sanitize(false)?;
let signature = SerdeSignature(*txn.signature());
// we just signed the transaction, it must have a signature
let signature =
SerdeSignature(txn.signatures.first().cloned().unwrap_or_default());
Comment thread
bmuddha marked this conversation as resolved.

self.transactions_scheduler.execute(txn).await?;
self.transactions_scheduler
.execute(with_encoded(txn)?)
.await?;
Comment thread
bmuddha marked this conversation as resolved.

Ok(ResponsePayload::encode_no_context(&request.id, signature))
}
Expand Down
10 changes: 4 additions & 6 deletions magicblock-aperture/src/requests/http/send_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,23 @@ impl HttpDispatcher {
.inspect_err(
|err| debug!(error = ?err, "Failed to prepare transaction"),
)?;
let signature = *transaction.signature();
let signature = *transaction.txn.signature();

// Perform a replay check and reserve the signature in the cache. This prevents
// a transaction from being processed twice within the blockhash validity period.
// Perform a replay check and reserve the signature in the cache
if self.transactions.contains(&signature)
|| !self.transactions.push(signature, None)
{
return Err(TransactionError::AlreadyProcessed.into());
}
self.ensure_transaction_accounts(&transaction).await?;

self.ensure_transaction_accounts(&transaction.txn).await?;

// Based on the preflight flag, either execute and await the result,
// or schedule (fire-and-forget) for background processing.
if config.skip_preflight {
TRANSACTION_SKIP_PREFLIGHT.inc();
self.transactions_scheduler.schedule(transaction).await?;
trace!("Transaction scheduled");
} else {
trace!("Transaction executing");
self.transactions_scheduler.execute(transaction).await?;
}

Expand Down
4 changes: 2 additions & 2 deletions magicblock-aperture/src/requests/http/simulate_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl HttpDispatcher {
.inspect_err(|err| {
debug!(error = ?err, "Failed to prepare transaction to simulate")
})?;
self.ensure_transaction_accounts(&transaction).await?;
self.ensure_transaction_accounts(&transaction.txn).await?;

let replacement_blockhash = config
.replace_recent_blockhash
Expand All @@ -52,7 +52,7 @@ impl HttpDispatcher {
// Submit the transaction to the scheduler for simulation.
let result = self
.transactions_scheduler
.simulate(transaction)
.simulate(transaction.txn)
.await
Comment thread
bmuddha marked this conversation as resolved.
.map_err(RpcError::transaction_simulation)?;

Expand Down
8 changes: 7 additions & 1 deletion magicblock-api/src/tickers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use std::{
use magicblock_accounts::ScheduledCommitsProcessor;
use magicblock_accounts_db::{traits::AccountsBank, AccountsDb};
use magicblock_core::link::{
blocks::BlockUpdateTx, transactions::TransactionSchedulerHandle,
blocks::BlockUpdateTx,
transactions::{with_encoded, TransactionSchedulerHandle},
};
use magicblock_ledger::{LatestBlock, Ledger};
use magicblock_magic_program_api as magic_program;
Expand Down Expand Up @@ -89,6 +90,11 @@ async fn handle_scheduled_commits<C: ScheduledCommitsProcessor>(
let tx = InstructionUtils::accept_scheduled_commits(
latest_block.load().blockhash,
);
let Ok(tx) = with_encoded(tx) else {
// Unreachable case, all schedule commit txns are smaller than 64KB by construction
error!("Failed to bincode intent transaction");
return;
};
Comment thread
bmuddha marked this conversation as resolved.
if let Err(err) = transaction_scheduler.execute(tx).await {
error!(error = ?err, "Failed to accept scheduled commits");
Comment thread
bmuddha marked this conversation as resolved.
return;
Expand Down
1 change: 1 addition & 0 deletions magicblock-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition.workspace = true

tokio = { workspace = true, features = ["sync"] }
flume = { workspace = true }
bincode = { workspace = true }

serde = { workspace = true, features = ["derive"] }
solana-account = { workspace = true }
Expand Down
71 changes: 67 additions & 4 deletions magicblock-core/src/link/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use flume::{Receiver as MpmcReceiver, Sender as MpmcSender};
use magicblock_magic_program_api::args::TaskRequest;
use serde::Serialize;
use solana_program::message::{
inner_instruction::InnerInstructionsList, SimpleAddressLoader,
};
Expand Down Expand Up @@ -69,6 +70,9 @@ pub struct TransactionStatus {
pub struct ProcessableTransaction {
pub transaction: SanitizedTransaction,
pub mode: TransactionProcessingMode,
/// Pre-encoded bincode bytes for the transaction.
/// Used by the replicator to avoid redundant serialization.
pub encoded: Option<Vec<u8>>,
}

/// An enum that specifies how a transaction should be processed by the scheduler.
Expand Down Expand Up @@ -115,6 +119,57 @@ pub trait SanitizeableTransaction {
self,
verify: bool,
) -> Result<SanitizedTransaction, TransactionError>;

/// Sanitizes the transaction and optionally provides pre-encoded bincode bytes.
///
/// Default implementation delegates to `sanitize()` and returns `None` for encoded bytes.
/// Override this method when you have pre-encoded bytes (e.g., from the wire) to avoid
/// redundant serialization.
fn sanitize_with_encoded(
self,
verify: bool,
) -> Result<(SanitizedTransaction, Option<Vec<u8>>), TransactionError>
where
Self: Sized,
{
let txn = self.sanitize(verify)?;
Ok((txn, None))
}
}

/// Wraps a transaction with its pre-encoded bincode representation.
/// Use for internally-constructed transactions that need encoded bytes.
pub struct WithEncoded<T> {
Comment thread
bmuddha marked this conversation as resolved.
pub txn: T,
pub encoded: Vec<u8>,
}

impl<T: SanitizeableTransaction> SanitizeableTransaction for WithEncoded<T> {
fn sanitize(
self,
verify: bool,
) -> Result<SanitizedTransaction, TransactionError> {
self.txn.sanitize(verify)
}

fn sanitize_with_encoded(
self,
verify: bool,
) -> Result<(SanitizedTransaction, Option<Vec<u8>>), TransactionError> {
let txn = self.txn.sanitize(verify)?;
Ok((txn, Some(self.encoded)))
}
}

/// Encodes a transaction to bincode and wraps it with its encoded form.
/// Use for internally-constructed transactions that need the encoded bytes.
pub fn with_encoded<T>(txn: T) -> Result<WithEncoded<T>, TransactionError>
where
T: Serialize,
{
let encoded = bincode::serialize(&txn)
.map_err(|_| TransactionError::SanitizeFailure)?;
Ok(WithEncoded { txn, encoded })
Comment thread
bmuddha marked this conversation as resolved.
}
Comment thread
bmuddha marked this conversation as resolved.

impl SanitizeableTransaction for SanitizedTransaction {
Expand Down Expand Up @@ -162,9 +217,13 @@ impl TransactionSchedulerHandle {
&self,
txn: impl SanitizeableTransaction,
) -> TransactionResult {
let transaction = txn.sanitize(true)?;
let (transaction, encoded) = txn.sanitize_with_encoded(true)?;
let mode = TransactionProcessingMode::Execution(None);
let txn = ProcessableTransaction { transaction, mode };
let txn = ProcessableTransaction {
transaction,
mode,
encoded,
};
let r = self.0.send(txn).await;
r.map_err(|_| TransactionError::ClusterMaintenance)
}
Expand Down Expand Up @@ -208,10 +267,14 @@ impl TransactionSchedulerHandle {
txn: impl SanitizeableTransaction,
mode: fn(oneshot::Sender<R>) -> TransactionProcessingMode,
) -> Result<R, TransactionError> {
let transaction = txn.sanitize(true)?;
let (transaction, encoded) = txn.sanitize_with_encoded(true)?;
let (tx, rx) = oneshot::channel();
let mode = mode(tx);
let txn = ProcessableTransaction { transaction, mode };
let txn = ProcessableTransaction {
transaction,
mode,
encoded,
};
self.0
.send(txn)
.await
Expand Down
1 change: 1 addition & 0 deletions magicblock-processor/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fn mock_txn(accounts: &[(Pubkey, bool)]) -> TransactionWithId {
TransactionWithId::new(ProcessableTransaction {
transaction: transaction.sanitize(false).unwrap(),
mode: TransactionProcessingMode::Execution(None),
encoded: None,
})
}

Expand Down
1 change: 1 addition & 0 deletions test-integration/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.