diff --git a/.gitignore b/.gitignore index 9ea638117..377396019 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ magicblock-test-storage/ .github/packages/npm-package/lib .github/packages/npm-package/node_modules .github/packages/npm-package/package-lock.json + +# AI related +**/CLAUDE.md diff --git a/magicblock-committor-service/src/intent_executor/error.rs b/magicblock-committor-service/src/intent_executor/error.rs index e534776c4..344fa482a 100644 --- a/magicblock-committor-service/src/intent_executor/error.rs +++ b/magicblock-committor-service/src/intent_executor/error.rs @@ -11,7 +11,7 @@ use tracing::error; use crate::{ tasks::{ task_builder::TaskBuilderError, task_strategist::TaskStrategistError, - BaseTask, TaskType, + BaseTaskImpl, }, transaction_preparator::error::TransactionPreparatorError, }; @@ -217,7 +217,7 @@ impl TransactionStrategyExecutionError { pub fn try_from_transaction_error( err: TransactionError, signature: Option, - tasks: &[Box], + tasks: &[BaseTaskImpl], ) -> Result { // Commit Nonce order error const NONCE_OUT_OF_ORDER: u32 = @@ -257,15 +257,12 @@ impl TransactionStrategyExecutionError { return Err(tx_err_helper(instruction_err)); }; - let Some(task_type) = tasks - .get(action_index as usize) - .map(|task| task.task_type()) - else { + let Some(task) = tasks.get(action_index as usize) else { return Err(tx_err_helper(instruction_err)); }; - match (task_type, instruction_err) { - (TaskType::Commit, instruction_err) => match instruction_err + match (task, instruction_err) { + (BaseTaskImpl::Commit(_), instruction_err) => match instruction_err { InstructionError::Custom(NONCE_OUT_OF_ORDER) => Ok( TransactionStrategyExecutionError::CommitIDError( @@ -294,13 +291,13 @@ impl TransactionStrategyExecutionError { } err => Err(tx_err_helper(err)), }, - (TaskType::Action, instruction_err) => { + (BaseTaskImpl::BaseAction(_), instruction_err) => { Ok(TransactionStrategyExecutionError::ActionsError( tx_err_helper(instruction_err), signature, )) } - (TaskType::Undelegate, instruction_err) => Ok( + (BaseTaskImpl::Undelegate(_), instruction_err) => Ok( TransactionStrategyExecutionError::UndelegationError( tx_err_helper(instruction_err), signature, @@ -335,7 +332,7 @@ impl metrics::LabelValue for TransactionStrategyExecutionError { } pub(crate) struct IntentTransactionErrorMapper<'a> { - pub tasks: &'a [Box], + pub tasks: &'a [BaseTaskImpl], } impl TransactionErrorMapper for IntentTransactionErrorMapper<'_> { type ExecutionError = TransactionStrategyExecutionError; diff --git a/magicblock-committor-service/src/intent_executor/mod.rs b/magicblock-committor-service/src/intent_executor/mod.rs index fb1426705..997cfbc9e 100644 --- a/magicblock-committor-service/src/intent_executor/mod.rs +++ b/magicblock-committor-service/src/intent_executor/mod.rs @@ -47,8 +47,7 @@ use crate::{ task_strategist::{ StrategyExecutionMode, TaskStrategist, TransactionStrategy, }, - task_visitors::utility_visitor::TaskVisitorUtils, - BaseTask, TaskType, + BaseTaskImpl, }, transaction_preparator::{ delivery_preparator::BufferExecutionError, @@ -311,24 +310,20 @@ where committed_pubkeys: &[Pubkey], strategy: &mut TransactionStrategy, ) -> Result { - let tasks_and_metas: Vec<_> = strategy + let commit_tasks: Vec<_> = strategy .optimized_tasks .iter_mut() .filter_map(|task| { - let mut visitor = TaskVisitorUtils::GetCommitMeta(None); - task.visit(&mut visitor); - if let TaskVisitorUtils::GetCommitMeta(Some(commit_meta)) = - visitor - { - Some((task, commit_meta)) + if let BaseTaskImpl::Commit(commit_task) = task { + Some(commit_task) } else { None } }) .collect(); - let min_context_slot = tasks_and_metas + let min_context_slot = commit_tasks .iter() - .map(|(_, meta)| meta.remote_slot) + .map(|task| task.committed_account.remote_slot) .max() .unwrap_or_default(); @@ -345,17 +340,18 @@ where // Here we find the broken tasks and reset them // Broken tasks are prepared incorrectly so they have to be cleaned up let mut to_cleanup = Vec::new(); - for (task, commit_meta) in tasks_and_metas { - let Some(commit_id) = commit_ids.get(&commit_meta.committed_pubkey) + for task in commit_tasks { + let Some(commit_id) = + commit_ids.get(&task.committed_account.pubkey) else { continue; }; - if commit_id == &commit_meta.commit_id { + if commit_id == &task.commit_id { continue; } // Handle invalid tasks - to_cleanup.push(task.clone()); + to_cleanup.push(BaseTaskImpl::Commit(task.clone())); task.reset_commit_id(*commit_id); } @@ -376,7 +372,7 @@ where let (optimized_tasks, action_tasks) = strategy .optimized_tasks .drain(..) - .partition(|el| el.task_type() != TaskType::Action); + .partition(|el| !matches!(el, BaseTaskImpl::BaseAction(_))); strategy.optimized_tasks = optimized_tasks; let old_alts = strategy.dummy_revaluate_alts(&self.authority.pubkey()); @@ -404,7 +400,7 @@ where strategy .optimized_tasks .into_iter() - .partition(|el| el.task_type() == TaskType::Commit); + .partition(|el| matches!(el, BaseTaskImpl::Commit(_))); let commit_alt_pubkeys = if strategy.lookup_tables_keys.is_empty() { vec![] @@ -450,7 +446,7 @@ where let position = strategy .optimized_tasks .iter() - .position(|el| el.task_type() == TaskType::Undelegate); + .position(|el| matches!(el, BaseTaskImpl::Undelegate(_))); if let Some(position) = position { // Remove everything after undelegation including post undelegation actions @@ -643,7 +639,7 @@ where async fn execute_message_with_retries( &self, prepared_message: VersionedMessage, - tasks: &[Box], + tasks: &[BaseTaskImpl], ) -> IntentExecutorResult { struct IntentErrorMapper { diff --git a/magicblock-committor-service/src/intent_executor/single_stage_executor.rs b/magicblock-committor-service/src/intent_executor/single_stage_executor.rs index a5362a6ce..17bcf5910 100644 --- a/magicblock-committor-service/src/intent_executor/single_stage_executor.rs +++ b/magicblock-committor-service/src/intent_executor/single_stage_executor.rs @@ -15,10 +15,8 @@ use crate::{ }, persist::{IntentPersister, IntentPersisterImpl}, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - task_strategist::TransactionStrategy, - task_visitors::utility_visitor::TaskVisitorUtils, - BaseTask, + commit_task::CommitTask, task_strategist::TransactionStrategy, + BaseTaskImpl, FinalizeTask, }, transaction_preparator::TransactionPreparator, }; @@ -159,14 +157,12 @@ where ) => { let optimized_tasks = transaction_strategy.optimized_tasks.as_slice(); - if let Some(task) = err + if let Some(BaseTaskImpl::Commit(task)) = err .task_index() .and_then(|index| optimized_tasks.get(index as usize)) { Self::handle_unfinalized_account_error( - inner, - signature, - task.as_ref(), + inner, signature, task, ) .await } else { @@ -204,14 +200,12 @@ where async fn handle_unfinalized_account_error( inner: &IntentExecutorImpl, failed_signature: &Option, - task: &dyn BaseTask, + task: &CommitTask, ) -> IntentExecutorResult> { - let Some(commit_meta) = TaskVisitorUtils::commit_meta(task) else { - // Can't recover - break execution - return Ok(ControlFlow::Break(())); - }; - let finalize_task: Box = - Box::new(ArgsTask::new(ArgsTaskType::Finalize(commit_meta.into()))); + let finalize_task: BaseTaskImpl = FinalizeTask { + delegated_account: task.committed_account.pubkey, + } + .into(); inner .prepare_and_execute_strategy( &mut TransactionStrategy { diff --git a/magicblock-committor-service/src/intent_executor/two_stage_executor.rs b/magicblock-committor-service/src/intent_executor/two_stage_executor.rs index c9f5ea6ea..bdb622fcf 100644 --- a/magicblock-committor-service/src/intent_executor/two_stage_executor.rs +++ b/magicblock-committor-service/src/intent_executor/two_stage_executor.rs @@ -16,10 +16,8 @@ use crate::{ }, persist::{IntentPersister, IntentPersisterImpl}, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - task_strategist::TransactionStrategy, - task_visitors::utility_visitor::TaskVisitorUtils, - BaseTask, + commit_task::CommitTask, task_strategist::TransactionStrategy, + BaseTaskImpl, FinalizeTask, }, transaction_preparator::TransactionPreparator, }; @@ -165,13 +163,11 @@ where let optimized_tasks = commit_strategy.optimized_tasks.as_slice(); let task_index = err.task_index(); - if let Some(task) = task_index + if let Some(BaseTaskImpl::Commit(task)) = task_index .and_then(|index| optimized_tasks.get(index as usize)) { Self::handle_unfinalized_account_error( - inner, - signature, - task.as_ref(), + inner, signature, task, ) .await } else { @@ -217,14 +213,12 @@ where async fn handle_unfinalized_account_error( inner: &IntentExecutorImpl, failed_signature: &Option, - task: &dyn BaseTask, + task: &CommitTask, ) -> IntentExecutorResult> { - let Some(commit_meta) = TaskVisitorUtils::commit_meta(task) else { - // Can't recover - break execution - return Ok(ControlFlow::Break(())); - }; - let finalize_task: Box = - Box::new(ArgsTask::new(ArgsTaskType::Finalize(commit_meta.into()))); + let finalize_task: BaseTaskImpl = FinalizeTask { + delegated_account: task.committed_account.pubkey, + } + .into(); inner .prepare_and_execute_strategy( &mut TransactionStrategy { diff --git a/magicblock-committor-service/src/tasks/args_task.rs b/magicblock-committor-service/src/tasks/args_task.rs deleted file mode 100644 index 3f883fbbf..000000000 --- a/magicblock-committor-service/src/tasks/args_task.rs +++ /dev/null @@ -1,278 +0,0 @@ -use dlp::{ - args::{CommitDiffArgs, CommitStateArgs}, - compute_diff, - instruction_builder::{ - call_handler_size_budget, call_handler_v2_size_budget, - commit_diff_size_budget, commit_size_budget, finalize_size_budget, - undelegate_size_budget, - }, - AccountSizeClass, -}; -use magicblock_metrics::metrics::LabelValue; -use solana_account::ReadableAccount; -use solana_instruction::Instruction; -use solana_pubkey::Pubkey; - -#[cfg(test)] -use crate::tasks::TaskStrategy; -use crate::tasks::{ - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - BaseActionTask, BaseActionV2Task, BaseTask, BaseTaskError, BaseTaskResult, - CommitDiffTask, CommitTask, FinalizeTask, PreparationState, TaskType, - UndelegateTask, -}; - -/// Task that will be executed on Base layer via arguments -#[derive(Clone)] -pub enum ArgsTaskType { - Commit(CommitTask), - CommitDiff(CommitDiffTask), - Finalize(FinalizeTask), - Undelegate(UndelegateTask), // Special action really - BaseAction(BaseActionTask), - BaseActionV2(BaseActionV2Task), -} - -#[derive(Clone)] -pub struct ArgsTask { - preparation_state: PreparationState, - pub task_type: ArgsTaskType, -} - -impl From for ArgsTask { - fn from(value: ArgsTaskType) -> Self { - Self::new(value) - } -} - -impl ArgsTask { - pub fn new(task_type: ArgsTaskType) -> Self { - Self { - preparation_state: PreparationState::NotNeeded, - task_type, - } - } -} - -impl BaseTask for ArgsTask { - fn program_id(&self) -> Pubkey { - dlp::id() - } - - fn instruction(&self, validator: &Pubkey) -> Instruction { - match &self.task_type { - ArgsTaskType::Commit(value) => { - let args = CommitStateArgs { - nonce: value.commit_id, - lamports: value.committed_account.account.lamports, - data: value.committed_account.account.data.clone(), - allow_undelegation: value.allow_undelegation, - }; - dlp::instruction_builder::commit_state( - *validator, - value.committed_account.pubkey, - value.committed_account.account.owner, - args, - ) - } - ArgsTaskType::CommitDiff(value) => { - let args = CommitDiffArgs { - nonce: value.commit_id, - lamports: value.committed_account.account.lamports, - diff: compute_diff( - value.base_account.data(), - value.committed_account.account.data(), - ) - .to_vec(), - allow_undelegation: value.allow_undelegation, - }; - - dlp::instruction_builder::commit_diff( - *validator, - value.committed_account.pubkey, - value.committed_account.account.owner, - args, - ) - } - ArgsTaskType::Finalize(value) => { - dlp::instruction_builder::finalize( - *validator, - value.delegated_account, - ) - } - ArgsTaskType::Undelegate(value) => { - dlp::instruction_builder::undelegate( - *validator, - value.delegated_account, - value.owner_program, - value.rent_reimbursement, - ) - } - ArgsTaskType::BaseAction(value) => { - let action = &value.action; - dlp::instruction_builder::call_handler( - *validator, - action.destination_program, - action.escrow_authority, - value.account_metas(), - value.call_handler_args(), - ) - } - ArgsTaskType::BaseActionV2(value) => { - let action = &value.action; - dlp::instruction_builder::call_handler_v2( - *validator, - action.destination_program, - value.source_program, - action.escrow_authority, - value.account_metas(), - value.call_handler_args(), - ) - } - } - } - - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box> { - match self.task_type { - ArgsTaskType::Commit(value) => { - Ok(Box::new(BufferTask::new_preparation_required( - BufferTaskType::Commit(value), - ))) - } - ArgsTaskType::CommitDiff(value) => { - Ok(Box::new(BufferTask::new_preparation_required( - BufferTaskType::CommitDiff(value), - ))) - } - ArgsTaskType::BaseAction(_) - | ArgsTaskType::BaseActionV2(_) - | ArgsTaskType::Finalize(_) - | ArgsTaskType::Undelegate(_) => Err(self), - } - } - - /// Nothing to prepare for [`ArgsTaskType`] type - fn preparation_state(&self) -> &PreparationState { - &self.preparation_state - } - - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()> { - if !matches!(new_state, PreparationState::NotNeeded) { - Err(BaseTaskError::PreparationStateTransitionError) - } else { - // Do nothing - Ok(()) - } - } - - fn compute_units(&self) -> u32 { - match &self.task_type { - ArgsTaskType::Commit(_) => 70_000, - ArgsTaskType::CommitDiff(_) => 70_000, - ArgsTaskType::BaseAction(task) => task.action.compute_units, - ArgsTaskType::BaseActionV2(task) => task.action.compute_units, - ArgsTaskType::Undelegate(_) => 70_000, - ArgsTaskType::Finalize(_) => 70_000, - } - } - - fn accounts_size_budget(&self) -> u32 { - match &self.task_type { - ArgsTaskType::Commit(task) => { - commit_size_budget(AccountSizeClass::Dynamic( - task.committed_account.account.data.len() as u32, - )) - } - ArgsTaskType::CommitDiff(task) => { - commit_diff_size_budget(AccountSizeClass::Dynamic( - task.committed_account.account.data.len() as u32, - )) - } - ArgsTaskType::BaseAction(task) => { - // assume all other accounts are Small accounts. - let other_accounts_budget = - task.action.account_metas_per_program.len() as u32 - * AccountSizeClass::Small.size_budget(); - - call_handler_size_budget( - AccountSizeClass::Medium, - other_accounts_budget, - ) - } - ArgsTaskType::BaseActionV2(task) => { - // assume all other accounts are Small accounts. - let other_accounts_budget = - task.action.account_metas_per_program.len() as u32 - * AccountSizeClass::Small.size_budget(); - - call_handler_v2_size_budget( - AccountSizeClass::Medium, - AccountSizeClass::Medium, - other_accounts_budget, - ) - } - ArgsTaskType::Undelegate(_) => { - undelegate_size_budget(AccountSizeClass::Huge) - } - ArgsTaskType::Finalize(_) => { - finalize_size_budget(AccountSizeClass::Huge) - } - } - } - - #[cfg(test)] - fn strategy(&self) -> TaskStrategy { - TaskStrategy::Args - } - - fn task_type(&self) -> TaskType { - match &self.task_type { - ArgsTaskType::Commit(_) => TaskType::Commit, - ArgsTaskType::CommitDiff(_) => TaskType::Commit, - ArgsTaskType::BaseAction(_) | ArgsTaskType::BaseActionV2(_) => { - TaskType::Action - } - ArgsTaskType::Undelegate(_) => TaskType::Undelegate, - ArgsTaskType::Finalize(_) => TaskType::Finalize, - } - } - - /// For tasks using Args strategy call corresponding `Visitor` method - fn visit(&self, visitor: &mut dyn Visitor) { - visitor.visit_args_task(self); - } - - fn reset_commit_id(&mut self, commit_id: u64) { - match &mut self.task_type { - ArgsTaskType::Commit(task) => { - task.commit_id = commit_id; - } - ArgsTaskType::CommitDiff(task) => { - task.commit_id = commit_id; - } - ArgsTaskType::BaseAction(_) - | ArgsTaskType::BaseActionV2(_) - | ArgsTaskType::Finalize(_) - | ArgsTaskType::Undelegate(_) => {} - }; - } -} - -impl LabelValue for ArgsTask { - fn value(&self) -> &str { - match self.task_type { - ArgsTaskType::Commit(_) => "args_commit", - ArgsTaskType::CommitDiff(_) => "args_commit_diff", - ArgsTaskType::BaseAction(_) => "args_action", - ArgsTaskType::BaseActionV2(_) => "args_action_v2", - ArgsTaskType::Finalize(_) => "args_finalize", - ArgsTaskType::Undelegate(_) => "args_undelegate", - } - } -} diff --git a/magicblock-committor-service/src/tasks/buffer_task.rs b/magicblock-committor-service/src/tasks/buffer_task.rs deleted file mode 100644 index fc8440a1a..000000000 --- a/magicblock-committor-service/src/tasks/buffer_task.rs +++ /dev/null @@ -1,236 +0,0 @@ -use dlp::{ - args::CommitStateFromBufferArgs, - compute_diff, - instruction_builder::{commit_diff_size_budget, commit_size_budget}, - AccountSizeClass, -}; -use magicblock_committor_program::Chunks; -use magicblock_metrics::metrics::LabelValue; -use solana_instruction::Instruction; -use solana_pubkey::Pubkey; - -#[cfg(any(test, feature = "dev-context-only-utils"))] -use super::args_task::ArgsTaskType; -#[cfg(test)] -use crate::tasks::TaskStrategy; -use crate::{ - consts::MAX_WRITE_CHUNK_SIZE, - tasks::{ - visitor::Visitor, BaseTask, BaseTaskError, BaseTaskResult, - CommitDiffTask, CommitTask, PreparationState, PreparationTask, - TaskType, - }, -}; - -/// Tasks that could be executed using buffers -#[derive(Clone)] -pub enum BufferTaskType { - Commit(CommitTask), - CommitDiff(CommitDiffTask), - // Action in the future -} - -#[derive(Clone)] -pub struct BufferTask { - preparation_state: PreparationState, - pub task_type: BufferTaskType, -} - -impl BufferTask { - pub fn new_preparation_required(task_type: BufferTaskType) -> Self { - Self { - preparation_state: Self::preparation_required(&task_type), - task_type, - } - } - - pub fn new( - preparation_state: PreparationState, - task_type: BufferTaskType, - ) -> Self { - Self { - preparation_state, - task_type, - } - } - - fn preparation_required(task_type: &BufferTaskType) -> PreparationState { - match task_type { - BufferTaskType::Commit(task) => { - let data = task.committed_account.account.data.clone(); - let chunks = - Chunks::from_data_length(data.len(), MAX_WRITE_CHUNK_SIZE); - - PreparationState::Required(PreparationTask { - commit_id: task.commit_id, - pubkey: task.committed_account.pubkey, - committed_data: data, - chunks, - }) - } - - BufferTaskType::CommitDiff(task) => { - let diff = compute_diff( - &task.base_account.data, - &task.committed_account.account.data, - ) - .to_vec(); - let chunks = - Chunks::from_data_length(diff.len(), MAX_WRITE_CHUNK_SIZE); - - PreparationState::Required(PreparationTask { - commit_id: task.commit_id, - pubkey: task.committed_account.pubkey, - committed_data: diff, - chunks, - }) - } - } - } -} - -#[cfg(any(test, feature = "dev-context-only-utils"))] -impl From for BufferTaskType { - fn from(value: ArgsTaskType) -> Self { - match value { - ArgsTaskType::Commit(task) => BufferTaskType::Commit(task), - ArgsTaskType::CommitDiff(task) => BufferTaskType::CommitDiff(task), - _ => unimplemented!( - "Only commit task can be BufferTask currently. Fix your tests" - ), - } - } -} - -impl BaseTask for BufferTask { - fn program_id(&self) -> Pubkey { - dlp::id() - } - - fn instruction(&self, validator: &Pubkey) -> Instruction { - match &self.task_type { - BufferTaskType::Commit(task) => { - let commit_id_slice = task.commit_id.to_le_bytes(); - let (commit_buffer_pubkey, _) = - magicblock_committor_program::pdas::buffer_pda( - validator, - &task.committed_account.pubkey, - &commit_id_slice, - ); - - dlp::instruction_builder::commit_state_from_buffer( - *validator, - task.committed_account.pubkey, - task.committed_account.account.owner, - commit_buffer_pubkey, - CommitStateFromBufferArgs { - nonce: task.commit_id, - lamports: task.committed_account.account.lamports, - allow_undelegation: task.allow_undelegation, - }, - ) - } - BufferTaskType::CommitDiff(task) => { - let commit_id_slice = task.commit_id.to_le_bytes(); - let (commit_buffer_pubkey, _) = - magicblock_committor_program::pdas::buffer_pda( - validator, - &task.committed_account.pubkey, - &commit_id_slice, - ); - - dlp::instruction_builder::commit_diff_from_buffer( - *validator, - task.committed_account.pubkey, - task.committed_account.account.owner, - commit_buffer_pubkey, - CommitStateFromBufferArgs { - nonce: task.commit_id, - lamports: task.committed_account.account.lamports, - allow_undelegation: task.allow_undelegation, - }, - ) - } - } - } - - /// No further optimizations - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box> { - Err(self) - } - - fn preparation_state(&self) -> &PreparationState { - &self.preparation_state - } - - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()> { - if matches!(new_state, PreparationState::NotNeeded) { - Err(BaseTaskError::PreparationStateTransitionError) - } else { - self.preparation_state = new_state; - Ok(()) - } - } - - fn compute_units(&self) -> u32 { - match self.task_type { - BufferTaskType::Commit(_) => 70_000, - BufferTaskType::CommitDiff(_) => 70_000, - } - } - - fn accounts_size_budget(&self) -> u32 { - match self.task_type { - BufferTaskType::Commit(_) => { - commit_size_budget(AccountSizeClass::Huge) - } - BufferTaskType::CommitDiff(_) => { - commit_diff_size_budget(AccountSizeClass::Huge) - } - } - } - - #[cfg(test)] - fn strategy(&self) -> TaskStrategy { - TaskStrategy::Buffer - } - - fn task_type(&self) -> TaskType { - match self.task_type { - BufferTaskType::Commit(_) => TaskType::Commit, - BufferTaskType::CommitDiff(_) => TaskType::Commit, - } - } - - /// For tasks using Args strategy call corresponding `Visitor` method - fn visit(&self, visitor: &mut dyn Visitor) { - visitor.visit_buffer_task(self); - } - - fn reset_commit_id(&mut self, commit_id: u64) { - match &mut self.task_type { - BufferTaskType::Commit(task) => { - task.commit_id = commit_id; - } - BufferTaskType::CommitDiff(task) => { - task.commit_id = commit_id; - } - }; - - self.preparation_state = Self::preparation_required(&self.task_type) - } -} - -impl LabelValue for BufferTask { - fn value(&self) -> &str { - match self.task_type { - BufferTaskType::Commit(_) => "buffer_commit", - BufferTaskType::CommitDiff(_) => "buffer_commit_diff", - } - } -} diff --git a/magicblock-committor-service/src/tasks/commit_task.rs b/magicblock-committor-service/src/tasks/commit_task.rs new file mode 100644 index 000000000..70d047537 --- /dev/null +++ b/magicblock-committor-service/src/tasks/commit_task.rs @@ -0,0 +1,307 @@ +use dlp::{ + args::{CommitDiffArgs, CommitStateArgs, CommitStateFromBufferArgs}, + compute_diff, + instruction_builder::{commit_diff_size_budget, commit_size_budget}, + AccountSizeClass, +}; +use magicblock_committor_program::Chunks; +use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use solana_account::{Account, ReadableAccount}; +use solana_instruction::Instruction; +use solana_pubkey::Pubkey; + +use crate::{ + consts::MAX_WRITE_CHUNK_SIZE, + tasks::{BaseTask, BaseTaskImpl, CleanupTask, PreparationTask}, +}; + +/// Lifecycle stage of a buffer used for commit delivery. +/// Tracks whether the on-chain buffer still needs to be initialized +/// or is ready to be cleaned up after a successful commit. +#[derive(Clone, Debug)] +pub enum CommitBufferStage { + Preparation(PreparationTask), + Cleanup(CleanupTask), +} + +/// Describes how commit data is delivered to the base layer. +/// +/// Small accounts send data directly in instruction args. +/// Large accounts use an on-chain buffer to avoid transaction size limits. +/// When a base account is available, a diff is computed to reduce payload size. +#[derive(Clone, Debug)] +pub enum CommitDelivery { + StateInArgs, + StateInBuffer { + stage: CommitBufferStage, + }, + DiffInArgs { + base_account: Account, + }, + DiffInBuffer { + base_account: Account, + stage: CommitBufferStage, + }, +} + +/// A task that commits a delegated account's state to the base layer. +/// +/// The delivery strategy ([`CommitDelivery`]) determines how the data reaches +/// the chain (inline args vs buffer, full state vs diff). +#[derive(Clone, Debug)] +pub struct CommitTask { + pub commit_id: u64, + pub allow_undelegation: bool, + pub committed_account: CommittedAccount, + pub delivery_details: CommitDelivery, +} + +impl CommitTask { + #[inline(always)] + fn commit_state_ix(&self, validator: &Pubkey) -> Instruction { + let args = CommitStateArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + data: self.committed_account.account.data.clone(), + allow_undelegation: self.allow_undelegation, + }; + dlp::instruction_builder::commit_state( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + args, + ) + } + + #[inline(always)] + fn commit_state_from_buffer_ix(&self, validator: &Pubkey) -> Instruction { + let (commit_buffer_pubkey, _) = + magicblock_committor_program::pdas::buffer_pda( + validator, + &self.committed_account.pubkey, + &self.commit_id.to_le_bytes(), + ); + dlp::instruction_builder::commit_state_from_buffer( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + commit_buffer_pubkey, + CommitStateFromBufferArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + allow_undelegation: self.allow_undelegation, + }, + ) + } + + #[inline(always)] + fn commit_diff_ix( + &self, + validator: &Pubkey, + base_account: &Account, + ) -> Instruction { + let args = CommitDiffArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + diff: compute_diff( + base_account.data(), + self.committed_account.account.data(), + ) + .to_vec(), + allow_undelegation: self.allow_undelegation, + }; + dlp::instruction_builder::commit_diff( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + args, + ) + } + + #[inline(always)] + fn commit_diff_from_buffer_ix(&self, validator: &Pubkey) -> Instruction { + let (commit_buffer_pubkey, _) = + magicblock_committor_program::pdas::buffer_pda( + validator, + &self.committed_account.pubkey, + &self.commit_id.to_le_bytes(), + ); + dlp::instruction_builder::commit_diff_from_buffer( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + commit_buffer_pubkey, + CommitStateFromBufferArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + allow_undelegation: self.allow_undelegation, + }, + ) + } + + pub fn stage(&self) -> Option<&CommitBufferStage> { + match &self.delivery_details { + CommitDelivery::DiffInBuffer { + base_account: _, + stage, + } + | CommitDelivery::StateInBuffer { stage } => Some(stage), + CommitDelivery::StateInArgs | CommitDelivery::DiffInArgs { .. } => { + None + } + } + } + + pub fn stage_mut(&mut self) -> Option<&mut CommitBufferStage> { + match &mut self.delivery_details { + CommitDelivery::DiffInBuffer { + base_account: _, + stage, + } + | CommitDelivery::StateInBuffer { stage } => Some(stage), + CommitDelivery::StateInArgs | CommitDelivery::DiffInArgs { .. } => { + None + } + } + } + + pub fn is_buffer(&self) -> bool { + matches!( + self.delivery_details, + CommitDelivery::StateInBuffer { .. } + | CommitDelivery::DiffInBuffer { .. } + ) + } + + pub fn state_preparation_stage(&self) -> CommitBufferStage { + let committed_data = self.committed_account.account.data.clone(); + self.preparation_stage(committed_data) + } + + fn diff_preparation_stage(&self, base_data: &[u8]) -> CommitBufferStage { + let diff = + compute_diff(base_data, &self.committed_account.account.data) + .to_vec(); + self.preparation_stage(diff) + } + + fn preparation_stage(&self, committed_data: Vec) -> CommitBufferStage { + let chunks = Chunks::from_data_length( + committed_data.len(), + MAX_WRITE_CHUNK_SIZE, + ); + CommitBufferStage::Preparation(PreparationTask { + commit_id: self.commit_id, + pubkey: self.committed_account.pubkey, + committed_data, + chunks, + }) + } + + pub fn reset_commit_id(&mut self, commit_id: u64) { + self.commit_id = commit_id; + let new_stage = match &self.delivery_details { + CommitDelivery::StateInBuffer { .. } => { + self.state_preparation_stage() + } + CommitDelivery::DiffInBuffer { + base_account, + stage: _, + } => { + let slice = base_account.data.as_slice(); + self.diff_preparation_stage(slice) + } + _ => return, + }; + + match &mut self.delivery_details { + CommitDelivery::StateInBuffer { stage } => { + *stage = new_stage; + } + CommitDelivery::DiffInBuffer { + base_account: _, + stage, + } => { + *stage = new_stage; + } + _ => {} + } + } +} + +impl BaseTask for CommitTask { + fn program_id(&self) -> Pubkey { + dlp::id() + } + + fn instruction(&self, validator: &Pubkey) -> Instruction { + match &self.delivery_details { + CommitDelivery::StateInArgs => self.commit_state_ix(validator), + CommitDelivery::StateInBuffer { .. } => { + self.commit_state_from_buffer_ix(validator) + } + CommitDelivery::DiffInArgs { base_account } => { + self.commit_diff_ix(validator, base_account) + } + CommitDelivery::DiffInBuffer { .. } => { + self.commit_diff_from_buffer_ix(validator) + } + } + } + + fn try_optimize_tx_size(&mut self) -> bool { + let details = std::mem::replace( + &mut self.delivery_details, + CommitDelivery::StateInArgs, + ); + match details { + CommitDelivery::StateInArgs => { + let stage = self.state_preparation_stage(); + self.delivery_details = CommitDelivery::StateInBuffer { stage }; + true + } + CommitDelivery::DiffInArgs { base_account } => { + let stage = self.diff_preparation_stage(base_account.data()); + self.delivery_details = CommitDelivery::DiffInBuffer { + base_account, + stage, + }; + true + } + other @ (CommitDelivery::StateInBuffer { .. } + | CommitDelivery::DiffInBuffer { .. }) => { + self.delivery_details = other; + false + } + } + } + + fn compute_units(&self) -> u32 { + 70_000 + } + + fn accounts_size_budget(&self) -> u32 { + match &self.delivery_details { + CommitDelivery::StateInArgs => { + commit_size_budget(AccountSizeClass::Dynamic( + self.committed_account.account.data.len() as u32, + )) + } + CommitDelivery::StateInBuffer { .. } + | CommitDelivery::DiffInBuffer { .. } => { + commit_size_budget(AccountSizeClass::Huge) + } + CommitDelivery::DiffInArgs { .. } => { + commit_diff_size_budget(AccountSizeClass::Dynamic( + self.committed_account.account.data.len() as u32, + )) + } + } + } +} + +impl From for BaseTaskImpl { + fn from(value: CommitTask) -> Self { + Self::Commit(value) + } +} diff --git a/magicblock-committor-service/src/tasks/mod.rs b/magicblock-committor-service/src/tasks/mod.rs index b4af90c58..2e763c604 100644 --- a/magicblock-committor-service/src/tasks/mod.rs +++ b/magicblock-committor-service/src/tasks/mod.rs @@ -1,5 +1,11 @@ -use dlp::args::CallHandlerArgs; -use dyn_clone::DynClone; +use dlp::{ + args::CallHandlerArgs, + instruction_builder::{ + call_handler_size_budget, call_handler_v2_size_budget, + finalize_size_budget, undelegate_size_budget, + }, + AccountSizeClass, +}; use magicblock_committor_program::{ instruction_builder::{ close_buffer::{create_close_ix, CreateCloseIxArgs}, @@ -18,20 +24,16 @@ use magicblock_program::magic_scheduled_base_intent::{ use solana_account::Account; use solana_instruction::{AccountMeta, Instruction}; use solana_pubkey::Pubkey; -use thiserror::Error; -use crate::tasks::visitor::Visitor; - -pub mod args_task; -pub mod buffer_task; +pub mod commit_task; pub mod task_builder; pub mod task_strategist; -pub(crate) mod task_visitors; pub mod utils; -pub mod visitor; pub use task_builder::TaskBuilderImpl; +use crate::tasks::commit_task::CommitTask; + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum TaskType { Commit, @@ -40,21 +42,91 @@ pub enum TaskType { Action, } -#[derive(Clone, Debug)] -pub enum PreparationState { - NotNeeded, - Required(PreparationTask), - Cleanup(CleanupTask), -} - #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub enum TaskStrategy { Args, Buffer, } +#[derive(Clone, Debug)] +pub enum BaseTaskImpl { + Commit(CommitTask), + Finalize(FinalizeTask), + Undelegate(UndelegateTask), + BaseAction(BaseActionTask), +} + +impl BaseTask for BaseTaskImpl { + fn program_id(&self) -> Pubkey { + dlp::id() + } + + fn instruction(&self, validator: &Pubkey) -> Instruction { + match self { + Self::Commit(value) => value.instruction(validator), + Self::Finalize(value) => value.instruction(validator), + Self::Undelegate(value) => value.instruction(validator), + Self::BaseAction(value) => value.instruction(validator), + } + } + + fn try_optimize_tx_size(&mut self) -> bool { + match self { + Self::Commit(value) => value.try_optimize_tx_size(), + _ => false, + } + } + + fn compute_units(&self) -> u32 { + match self { + Self::Commit(value) => value.compute_units(), + Self::BaseAction(value) => value.compute_units(), + Self::Finalize(_) => 70_000, + Self::Undelegate(_) => 70_000, + } + } + + fn accounts_size_budget(&self) -> u32 { + match self { + Self::Commit(value) => value.accounts_size_budget(), + Self::BaseAction(value) => value.accounts_size_budget(), + Self::Finalize(_) => finalize_size_budget(AccountSizeClass::Huge), + Self::Undelegate(_) => { + undelegate_size_budget(AccountSizeClass::Huge) + } + } + } +} + +impl BaseTaskImpl { + pub fn strategy(&self) -> TaskStrategy { + match self { + Self::Commit(task) if task.is_buffer() => TaskStrategy::Buffer, + _ => TaskStrategy::Args, + } + } +} + +impl LabelValue for BaseTaskImpl { + fn value(&self) -> &str { + match self { + Self::Commit(task) => { + if task.is_buffer() { + "buffer_commit" + } else { + "args_commit" + } + } + Self::Finalize(_) => "args_finalize", + Self::Undelegate(_) => "args_undelegate", + Self::BaseAction(BaseActionTask::V1(_)) => "args_action", + Self::BaseAction(BaseActionTask::V2(_)) => "args_action_v2", + } + } +} + /// A trait representing a task that can be executed on Base layer -pub trait BaseTask: Send + Sync + DynClone + LabelValue { +pub trait BaseTask: Send + Sync + Clone { /// Gets all pubkeys that involved in Task's instruction fn involved_accounts(&self, validator: &Pubkey) -> Vec { self.instruction(validator) @@ -64,55 +136,24 @@ pub trait BaseTask: Send + Sync + DynClone + LabelValue { .collect() } - /// Gets instruction for task execution - fn instruction(&self, validator: &Pubkey) -> Instruction; - /// Gets target program for task execution fn program_id(&self) -> Pubkey; - /// Optimize for transaction size so that more instructions can be buddled together in a single - /// transaction. Return Ok(new_tx_optimized_task), else Err(self) if task cannot be optimized. - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box>; - - /// Returns [`PreparationTask`] if task needs to be prepared before executing, - /// otherwise returns None - fn preparation_state(&self) -> &PreparationState; + /// Gets instruction for task execution + fn instruction(&self, validator: &Pubkey) -> Instruction; - /// Switched [`PreparationTask`] to a new one - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()>; + /// Attempts to optimize the task for smaller transaction size by switching + /// to a buffer-based delivery. Returns `true` if optimization was applied. + /// + /// Deprecated: will be removed in the future. Optimization is a concern of + /// the transaction strategist, not the task itself. + fn try_optimize_tx_size(&mut self) -> bool; /// Returns [`Task`] budget fn compute_units(&self) -> u32; /// Returns the max accounts-data-size that can be used with SetLoadedAccountsDataSizeLimit fn accounts_size_budget(&self) -> u32; - - /// Returns current [`TaskStrategy`] - #[cfg(test)] - fn strategy(&self) -> TaskStrategy; - - /// Returns [`TaskType`] - fn task_type(&self) -> TaskType; - - /// Calls [`Visitor`] with specific task type - fn visit(&self, visitor: &mut dyn Visitor); - - /// Resets commit id - fn reset_commit_id(&mut self, commit_id: u64); -} - -dyn_clone::clone_trait_object!(BaseTask); - -#[derive(Clone)] -pub struct CommitTask { - pub commit_id: u64, - pub allow_undelegation: bool, - pub committed_account: CommittedAccount, } #[derive(Clone, Debug)] @@ -130,23 +171,121 @@ pub struct UndelegateTask { pub rent_reimbursement: Pubkey, } +impl UndelegateTask { + pub fn instruction(&self, validator: &Pubkey) -> Instruction { + dlp::instruction_builder::undelegate( + *validator, + self.delegated_account, + self.owner_program, + self.rent_reimbursement, + ) + } +} + +impl From for BaseTaskImpl { + fn from(value: UndelegateTask) -> Self { + Self::Undelegate(value) + } +} + #[derive(Clone, Debug)] pub struct FinalizeTask { pub delegated_account: Pubkey, } +impl FinalizeTask { + pub fn instruction(&self, validator: &Pubkey) -> Instruction { + dlp::instruction_builder::finalize(*validator, self.delegated_account) + } +} + +impl From for BaseTaskImpl { + fn from(value: FinalizeTask) -> Self { + Self::Finalize(value) + } +} + #[derive(Clone, Debug)] -pub struct BaseActionTask { - pub action: BaseAction, +pub enum BaseActionTask { + V1(BaseActionTaskV1), + V2(BaseActionTaskV2), } impl BaseActionTask { + pub fn instruction(&self, validator: &Pubkey) -> Instruction { + match self { + Self::V1(value) => value.instruction(validator), + Self::V2(value) => value.instruction(validator), + } + } + + pub fn action(&self) -> &BaseAction { + match self { + Self::V1(value) => &value.action, + Self::V2(value) => &value.action, + } + } + + pub fn compute_units(&self) -> u32 { + self.action().compute_units + } + + pub fn accounts_size_budget(&self) -> u32 { + let action = self.action(); + // assume all other accounts are Small accounts. + let other_accounts_budget = action.account_metas_per_program.len() + as u32 + * AccountSizeClass::Small.size_budget(); + + match self { + Self::V1(_) => call_handler_size_budget( + AccountSizeClass::Medium, + other_accounts_budget, + ), + Self::V2(_) => call_handler_v2_size_budget( + AccountSizeClass::Medium, + AccountSizeClass::Medium, + other_accounts_budget, + ), + } + } +} + +#[derive(Clone, Debug)] +pub struct BaseActionTaskV1 { + pub action: BaseAction, +} + +impl BaseActionTaskV1 { + pub fn instruction(&self, validator: &Pubkey) -> Instruction { + let action = &self.action; + let account_metas = action + .account_metas_per_program + .iter() + .map(|short_meta| AccountMeta { + pubkey: short_meta.pubkey, + is_writable: short_meta.is_writable, + is_signer: false, + }) + .collect(); + dlp::instruction_builder::call_handler( + *validator, + action.destination_program, + action.escrow_authority, + account_metas, + CallHandlerArgs { + data: action.data_per_program.data.clone(), + escrow_index: action.data_per_program.escrow_index, + }, + ) + } + pub fn account_metas(&self) -> Vec { - BaseActionTask::account_metas_static(&self.action) + BaseActionTaskV1::account_metas_static(&self.action) } pub fn call_handler_args(&self) -> CallHandlerArgs { - BaseActionTask::call_handler_args_static(&self.action) + BaseActionTaskV1::call_handler_args_static(&self.action) } fn account_metas_static(action: &BaseAction) -> Vec { @@ -169,19 +308,49 @@ impl BaseActionTask { } } +impl From for BaseActionTask { + fn from(value: BaseActionTaskV1) -> Self { + Self::V1(value) + } +} + +impl From for BaseTaskImpl { + fn from(value: BaseActionTask) -> Self { + Self::BaseAction(value) + } +} + #[derive(Clone, Debug)] -pub struct BaseActionV2Task { +pub struct BaseActionTaskV2 { pub action: BaseAction, pub source_program: Pubkey, } -impl BaseActionV2Task { +impl BaseActionTaskV2 { + pub fn instruction(&self, validator: &Pubkey) -> Instruction { + let action = &self.action; + dlp::instruction_builder::call_handler_v2( + *validator, + action.destination_program, + self.source_program, + action.escrow_authority, + self.account_metas(), + self.call_handler_args(), + ) + } + pub fn account_metas(&self) -> Vec { - BaseActionTask::account_metas_static(&self.action) + BaseActionTaskV1::account_metas_static(&self.action) } pub fn call_handler_args(&self) -> CallHandlerArgs { - BaseActionTask::call_handler_args_static(&self.action) + BaseActionTaskV1::call_handler_args_static(&self.action) + } +} + +impl From for BaseActionTask { + fn from(value: BaseActionTaskV2) -> Self { + Self::V2(value) } } @@ -345,14 +514,6 @@ impl CleanupTask { } } -#[derive(Error, Debug)] -pub enum BaseTaskError { - #[error("Invalid preparation state transition")] - PreparationStateTransitionError, -} - -pub type BaseTaskResult = Result; - #[cfg(test)] mod serialization_safety_test { @@ -363,8 +524,7 @@ mod serialization_safety_test { use crate::{ tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, + commit_task::{CommitBufferStage, CommitDelivery, CommitTask}, *, }, test_utils, @@ -374,50 +534,58 @@ mod serialization_safety_test { test_utils::init_test_logger(); } - // Test all ArgsTask variants - #[test] - fn test_args_task_instruction_serialization() { - setup(); - let validator = Pubkey::new_unique(); - - // Test Commit variant - let commit_task: ArgsTask = ArgsTaskType::Commit(CommitTask { - commit_id: 123, - allow_undelegation: true, + fn make_commit_task( + commit_id: u64, + allow_undelegation: bool, + data: Vec, + lamports: u64, + ) -> CommitTask { + CommitTask { + commit_id, + allow_undelegation, committed_account: CommittedAccount { pubkey: Pubkey::new_unique(), account: Account { - lamports: 1000, - data: vec![1, 2, 3], + lamports, + data, owner: Pubkey::new_unique(), executable: false, rent_epoch: 0, }, remote_slot: Default::default(), }, - }) - .into(); + delivery_details: CommitDelivery::StateInArgs, + } + } + + #[test] + fn test_args_task_instruction_serialization() { + setup(); + let validator = Pubkey::new_unique(); + + // Test Commit variant (StateInArgs) + let commit_task: BaseTaskImpl = + make_commit_task(123, true, vec![1, 2, 3], 1000).into(); assert_serializable(&commit_task.instruction(&validator)); // Test Finalize variant - let finalize_task = - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { - delegated_account: Pubkey::new_unique(), - })); + let finalize_task: BaseTaskImpl = FinalizeTask { + delegated_account: Pubkey::new_unique(), + } + .into(); assert_serializable(&finalize_task.instruction(&validator)); // Test Undelegate variant - let undelegate_task: ArgsTask = - ArgsTaskType::Undelegate(UndelegateTask { - delegated_account: Pubkey::new_unique(), - owner_program: Pubkey::new_unique(), - rent_reimbursement: Pubkey::new_unique(), - }) - .into(); + let undelegate_task: BaseTaskImpl = UndelegateTask { + delegated_account: Pubkey::new_unique(), + owner_program: Pubkey::new_unique(), + rent_reimbursement: Pubkey::new_unique(), + } + .into(); assert_serializable(&undelegate_task.instruction(&validator)); - // Test BaseAction variant - let base_action: ArgsTask = ArgsTaskType::BaseAction(BaseActionTask { + // Test BaseAction V1 variant + let base_action: BaseTaskImpl = BaseActionTask::V1(BaseActionTaskV1 { action: BaseAction { destination_program: Pubkey::new_unique(), source_program: None, @@ -435,59 +603,64 @@ mod serialization_safety_test { }) .into(); assert_serializable(&base_action.instruction(&validator)); + + // Test BaseAction V2 variant + let base_action_v2: BaseTaskImpl = + BaseActionTask::V2(BaseActionTaskV2 { + action: BaseAction { + destination_program: Pubkey::new_unique(), + source_program: Some(Pubkey::new_unique()), + escrow_authority: Pubkey::new_unique(), + account_metas_per_program: vec![ShortAccountMeta { + pubkey: Pubkey::new_unique(), + is_writable: true, + }], + data_per_program: ProgramArgs { + data: vec![7, 8, 9], + escrow_index: 2, + }, + compute_units: 15_000, + }, + source_program: Pubkey::new_unique(), + }) + .into(); + assert_serializable(&base_action_v2.instruction(&validator)); + } + + fn make_buffer_commit_task( + commit_id: u64, + allow_undelegation: bool, + data: Vec, + lamports: u64, + ) -> CommitTask { + let task = + make_commit_task(commit_id, allow_undelegation, data, lamports); + let stage = task.state_preparation_stage(); + CommitTask { + delivery_details: CommitDelivery::StateInBuffer { stage }, + ..task + } } - // Test BufferTask variants #[test] fn test_buffer_task_instruction_serialization() { let validator = Pubkey::new_unique(); - let buffer_task = BufferTask::new_preparation_required( - BufferTaskType::Commit(CommitTask { - commit_id: 456, - allow_undelegation: false, - committed_account: CommittedAccount { - pubkey: Pubkey::new_unique(), - account: Account { - lamports: 2000, - data: vec![7, 8, 9], - owner: Pubkey::new_unique(), - executable: false, - rent_epoch: 0, - }, - remote_slot: Default::default(), - }, - }), - ); - assert_serializable(&buffer_task.instruction(&validator)); + let commit_task = + make_buffer_commit_task(456, false, vec![7, 8, 9], 2000); + assert!(commit_task.is_buffer()); + assert_serializable(&commit_task.instruction(&validator)); } - // Test preparation instructions #[test] fn test_preparation_instructions_serialization() { let authority = Pubkey::new_unique(); - // Test BufferTask preparation - let buffer_task = BufferTask::new_preparation_required( - BufferTaskType::Commit(CommitTask { - commit_id: 789, - allow_undelegation: true, - committed_account: CommittedAccount { - pubkey: Pubkey::new_unique(), - account: Account { - lamports: 3000, - data: vec![0; 1024], // Larger data to test chunking - owner: Pubkey::new_unique(), - executable: false, - rent_epoch: 0, - }, - remote_slot: Default::default(), - }, - }), - ); + let commit_task = + make_buffer_commit_task(789, true, vec![0; 1024], 3000); - let PreparationState::Required(preparation_task) = - buffer_task.preparation_state() + let Some(CommitBufferStage::Preparation(preparation_task)) = + commit_task.stage() else { panic!("invalid preparation state on creation!"); }; @@ -500,7 +673,6 @@ mod serialization_safety_test { } } - // Helper function to assert serialization succeeds fn assert_serializable(ix: &Instruction) { bincode::serialize(ix).unwrap_or_else(|e| { panic!("Failed to serialize instruction {:?}: {}", ix, e) diff --git a/magicblock-committor-service/src/tasks/task_builder.rs b/magicblock-committor-service/src/tasks/task_builder.rs index 507d7358b..2fb0124dc 100644 --- a/magicblock-committor-service/src/tasks/task_builder.rs +++ b/magicblock-committor-service/src/tasks/task_builder.rs @@ -10,16 +10,15 @@ use solana_pubkey::Pubkey; use solana_signature::Signature; use tracing::error; -use super::{CommitDiffTask, CommitTask}; use crate::{ intent_executor::task_info_fetcher::{ TaskInfoFetcher, TaskInfoFetcherError, TaskInfoFetcherResult, }, persist::IntentPersister, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - BaseActionTask, BaseActionV2Task, BaseTask, FinalizeTask, - UndelegateTask, + commit_task::{CommitDelivery, CommitTask}, + BaseActionTask, BaseActionTaskV1, BaseActionTaskV2, BaseTaskImpl, + FinalizeTask, UndelegateTask, }, }; @@ -30,13 +29,13 @@ pub trait TasksBuilder { commit_id_fetcher: &Arc, base_intent: &ScheduledIntentBundle, persister: &Option

, - ) -> TaskBuilderResult>>; + ) -> TaskBuilderResult>; // Create tasks for finalize stage async fn finalize_tasks( info_fetcher: &Arc, base_intent: &ScheduledIntentBundle, - ) -> TaskBuilderResult>>; + ) -> TaskBuilderResult>; } /// V1 Task builder @@ -57,7 +56,7 @@ impl TaskBuilderImpl { allow_undelegation: bool, account: CommittedAccount, base_account: Option, - ) -> ArgsTask { + ) -> CommitTask { let base_account = if account.account.data.len() > COMMIT_STATE_SIZE_THRESHOLD { base_account @@ -65,43 +64,35 @@ impl TaskBuilderImpl { None }; - if let Some(base_account) = base_account { - ArgsTaskType::CommitDiff(CommitDiffTask { - commit_id, - allow_undelegation, - committed_account: account, - base_account, - }) + let delivery_details = if let Some(base_account) = base_account { + CommitDelivery::DiffInArgs { base_account } } else { - ArgsTaskType::Commit(CommitTask { - commit_id, - allow_undelegation, - committed_account: account, - }) + CommitDelivery::StateInArgs + }; + + CommitTask { + commit_id, + allow_undelegation, + committed_account: account, + delivery_details, } - .into() } fn create_action_tasks<'a>( actions: &'a [BaseAction], - ) -> impl Iterator> + 'a { - actions - .iter() - .map(|action| match action.source_program { - Some(source_program) => { - ArgsTaskType::BaseActionV2(BaseActionV2Task { - action: action.clone(), - source_program, - }) - } - None => ArgsTaskType::BaseAction(BaseActionTask { + ) -> impl Iterator + 'a { + actions.iter().map(|action| { + let task = match action.source_program { + Some(source_program) => BaseActionTask::V2(BaseActionTaskV2 { action: action.clone(), + source_program, }), - }) - .map(|task_type| { - let task = ArgsTask::new(task_type); - Box::new(task) as Box - }) + None => BaseActionTask::V1(BaseActionTaskV1 { + action: action.clone(), + }), + }; + task.into() + }) } async fn fetch_commit_nonces( @@ -145,7 +136,7 @@ impl TasksBuilder for TaskBuilderImpl { task_info_fetcher: &Arc, intent_bundle: &ScheduledIntentBundle, persister: &Option

, - ) -> TaskBuilderResult>> { + ) -> TaskBuilderResult> { let mut tasks = Vec::new(); tasks.extend(Self::create_action_tasks( intent_bundle.standalone_actions().as_slice(), @@ -215,12 +206,13 @@ impl TasksBuilder for TaskBuilderImpl { }); let base_account = base_accounts.remove(&account.pubkey); - Box::new(Self::create_commit_task( + Self::create_commit_task( commit_id, allow_undelegation, account.clone(), base_account, - )) as Box + ) + .into() }, ); tasks.extend(commit_tasks_iter); @@ -232,30 +224,30 @@ impl TasksBuilder for TaskBuilderImpl { async fn finalize_tasks( info_fetcher: &Arc, intent_bundle: &ScheduledIntentBundle, - ) -> TaskBuilderResult>> { + ) -> TaskBuilderResult> { // Helper to create a finalize task - fn finalize_task(account: &CommittedAccount) -> Box { - let task_type = ArgsTaskType::Finalize(FinalizeTask { + fn finalize_task(account: &CommittedAccount) -> BaseTaskImpl { + FinalizeTask { delegated_account: account.pubkey, - }); - Box::new(ArgsTask::new(task_type)) + } + .into() } // Helper to create an undelegate task fn undelegate_task( account: &CommittedAccount, rent_reimbursement: &Pubkey, - ) -> Box { - let task_type = ArgsTaskType::Undelegate(UndelegateTask { + ) -> BaseTaskImpl { + UndelegateTask { delegated_account: account.pubkey, owner_program: account.account.owner, rent_reimbursement: *rent_reimbursement, - }); - Box::new(ArgsTask::new(task_type)) + } + .into() } // Helper to process commit types - fn process_commit(commit: &CommitType) -> Vec> { + fn process_commit(commit: &CommitType) -> Vec { match commit { CommitType::Standalone(accounts) => { accounts.iter().map(finalize_task).collect() diff --git a/magicblock-committor-service/src/tasks/task_strategist.rs b/magicblock-committor-service/src/tasks/task_strategist.rs index de8a4740e..d7df5a14e 100644 --- a/magicblock-committor-service/src/tasks/task_strategist.rs +++ b/magicblock-committor-service/src/tasks/task_strategist.rs @@ -3,22 +3,19 @@ use std::collections::BinaryHeap; use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_signer::{Signer, SignerError}; +use tracing::error; use crate::{ - persist::IntentPersister, + persist::{CommitStrategy, IntentPersister}, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - task_visitors::persistor_visitor::{ - PersistorContext, PersistorVisitor, - }, - utils::TransactionUtils, - BaseTask, FinalizeTask, + commit_task::CommitDelivery, utils::TransactionUtils, BaseTask, + BaseTaskImpl, }, transactions::{serialize_and_encode_base64, MAX_ENCODED_TRANSACTION_SIZE}, }; pub struct TransactionStrategy { - pub optimized_tasks: Vec>, + pub optimized_tasks: Vec, pub lookup_tables_keys: Vec, } @@ -73,8 +70,8 @@ impl TaskStrategist { /// 1. Optimizes tasks to fit in TX /// 2. Chooses the fastest execution mode for Tasks pub fn build_execution_strategy( - commit_tasks: Vec>, - finalize_tasks: Vec>, + commit_tasks: Vec, + finalize_tasks: Vec, authority: &Pubkey, persister: &Option

, ) -> TaskStrategistResult { @@ -141,8 +138,8 @@ impl TaskStrategist { } fn build_two_stage( - commit_tasks: Vec>, - finalize_tasks: Vec>, + commit_tasks: Vec, + finalize_tasks: Vec, authority: &Pubkey, persister: &Option

, ) -> TaskStrategistResult { @@ -166,7 +163,7 @@ impl TaskStrategist { /// Returns [`TransactionStrategy`] for tasks /// Returns Error if all optimizations weren't enough pub fn build_strategy( - mut tasks: Vec>, + mut tasks: Vec, validator: &Pubkey, persistor: &Option

, ) -> TaskStrategistResult { @@ -176,15 +173,7 @@ impl TaskStrategist { { // Persist tasks strategy if let Some(persistor) = persistor { - let mut persistor_visitor = PersistorVisitor { - persistor, - context: PersistorContext::PersistStrategy { - uses_lookup_tables: false, - }, - }; - tasks - .iter() - .for_each(|task| task.visit(&mut persistor_visitor)); + Self::persist_tasks_strategy(persistor, &tasks, false); } Ok(TransactionStrategy { @@ -196,15 +185,9 @@ impl TaskStrategist { // attempt using lookup tables for all keys involved in tasks else if Self::attempt_lookup_tables(&tasks) { // Persist tasks strategy - let mut persistor_visitor = PersistorVisitor { - persistor, - context: PersistorContext::PersistStrategy { - uses_lookup_tables: true, - }, - }; - tasks - .iter() - .for_each(|task| task.visit(&mut persistor_visitor)); + if let Some(persistor) = persistor { + Self::persist_tasks_strategy(persistor, &tasks, true); + } // Get lookup table keys let lookup_tables_keys = @@ -221,7 +204,7 @@ impl TaskStrategist { /// Attempt to use ALTs for ALL keys in tx /// Returns `true` if ALTs make tx fit, otherwise `false` /// TODO(edwin): optimize to use only necessary amount of pubkeys - pub fn attempt_lookup_tables(tasks: &[Box]) -> bool { + pub fn attempt_lookup_tables(tasks: &[BaseTaskImpl]) -> bool { let placeholder = Keypair::new(); // Gather all involved keys in tx let budgets = TransactionUtils::tasks_compute_units(tasks); @@ -260,7 +243,7 @@ impl TaskStrategist { pub fn collect_lookup_table_keys( authority: &Pubkey, - tasks: &[Box], + tasks: &[BaseTaskImpl], ) -> Vec { let budgets = TransactionUtils::tasks_compute_units(tasks); let size_budgets = TransactionUtils::tasks_accounts_size_budget(tasks); @@ -277,15 +260,70 @@ impl TaskStrategist { ) } + fn persist_tasks_strategy( + persistor: &P, + tasks: &[BaseTaskImpl], + uses_lookup_tables: bool, + ) { + for task in tasks { + let BaseTaskImpl::Commit(commit_task) = task else { + continue; + }; + let commit_strategy = match &commit_task.delivery_details { + CommitDelivery::StateInArgs => { + if uses_lookup_tables { + CommitStrategy::StateArgsWithLookupTable + } else { + CommitStrategy::StateArgs + } + } + CommitDelivery::DiffInArgs { .. } => { + if uses_lookup_tables { + CommitStrategy::DiffArgsWithLookupTable + } else { + CommitStrategy::DiffArgs + } + } + CommitDelivery::StateInBuffer { .. } => { + if uses_lookup_tables { + CommitStrategy::StateBufferWithLookupTable + } else { + CommitStrategy::StateBuffer + } + } + CommitDelivery::DiffInBuffer { .. } => { + if uses_lookup_tables { + CommitStrategy::DiffBufferWithLookupTable + } else { + CommitStrategy::DiffBuffer + } + } + }; + if let Err(err) = persistor.set_commit_strategy( + commit_task.commit_id, + &commit_task.committed_account.pubkey, + commit_strategy, + ) { + error!( + commit_id = %commit_task.commit_id, + pubkey = %commit_task.committed_account.pubkey, + strategy = commit_strategy.as_str(), + error = ?err, + "Failed to persist commit strategy" + ); + } + } + } + /// Optimizes tasks so as to bring the transaction size within the limit [`MAX_ENCODED_TRANSACTION_SIZE`] /// Returns Ok(size of tx after optimizations) else Err(SignerError). /// Note that the returned size, though possibly optimized one, may still not be under /// the limit MAX_ENCODED_TRANSACTION_SIZE. The caller needs to check and make decision accordingly. fn try_optimize_tx_size_if_needed( - tasks: &mut [Box], + tasks: &mut [BaseTaskImpl], ) -> Result { // Get initial transaction size - let calculate_tx_length = |tasks: &[Box]| { + let calculate_tx_length = |tasks: &[BaseTaskImpl]| { match TransactionUtils::assemble_tasks_tx( &Keypair::new(), // placeholder tasks, @@ -327,39 +365,20 @@ impl TaskStrategist { break; } - let task = { - // This is tmp task that will be replaced by old or optimized one - let tmp_task = - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { - delegated_account: Pubkey::new_unique(), - })); - let tmp_task = Box::new(tmp_task) as Box; - std::mem::replace(&mut tasks[index], tmp_task) - }; - match task.try_optimize_tx_size() { + if tasks[index].try_optimize_tx_size() { // If we can decrease: // 1. Calculate new tx size & ix size // 2. Insert item's data back in the heap // 3. Update overall tx size - Ok(optimized_task) => { - tasks[index] = optimized_task; - let new_ix = - tasks[index].instruction(&Pubkey::new_unique()); - // Possible serialization failures are possible only due to size in our case - // In that case we set size to max - let new_ix_size = - bincode::serialized_size(&new_ix).unwrap_or(u64::MAX); - let new_ix_size = - usize::try_from(new_ix_size).unwrap_or(usize::MAX); - current_tx_length = calculate_tx_length(tasks)?; - map.push((new_ix_size, index)); - } - // That means el-t can't be optimized further - // We move it back with oldest state - // Heap forgets about this el-t - Err(old_task) => { - tasks[index] = old_task; - } + let new_ix = tasks[index].instruction(&Pubkey::new_unique()); + // Possible serialization failures are possible only due to size in our case + // In that case we set size to max + let new_ix_size = + bincode::serialized_size(&new_ix).unwrap_or(u64::MAX); + let new_ix_size = + usize::try_from(new_ix_size).unwrap_or(usize::MAX); + current_tx_length = calculate_tx_length(tasks)?; + map.push((new_ix_size, index)); } } @@ -397,10 +416,12 @@ mod tests { }, persist::IntentPersisterImpl, tasks::{ + commit_task::CommitTask, task_builder::{ TaskBuilderImpl, TasksBuilder, COMMIT_STATE_SIZE_THRESHOLD, }, - BaseActionTask, TaskStrategy, UndelegateTask, + BaseActionTask, BaseActionTaskV1, FinalizeTask, TaskStrategy, + UndelegateTask, }, test_utils, }; @@ -445,7 +466,7 @@ mod tests { commit_id: u64, data_size: usize, diff_len: usize, - ) -> ArgsTask { + ) -> CommitTask { let committed_account = CommittedAccount { pubkey: Pubkey::new_unique(), account: Account { @@ -484,8 +505,8 @@ mod tests { } // Helper to create a Base action task - fn create_test_base_action_task(len: usize) -> ArgsTask { - ArgsTask::new(ArgsTaskType::BaseAction(BaseActionTask { + fn create_test_base_action_task(len: usize) -> BaseActionTask { + BaseActionTaskV1 { action: BaseAction { destination_program: Pubkey::new_unique(), source_program: None, @@ -497,23 +518,24 @@ mod tests { }, compute_units: 30_000, }, - })) + } + .into() } // Helper to create a finalize task - fn create_test_finalize_task() -> ArgsTask { - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + fn create_test_finalize_task() -> FinalizeTask { + FinalizeTask { delegated_account: Pubkey::new_unique(), - })) + } } // Helper to create an undelegate task - fn create_test_undelegate_task() -> ArgsTask { - ArgsTask::new(ArgsTaskType::Undelegate(UndelegateTask { + fn create_test_undelegate_task() -> UndelegateTask { + UndelegateTask { delegated_account: Pubkey::new_unique(), owner_program: system_program_id(), rent_reimbursement: Pubkey::new_unique(), - })) + } } #[test] @@ -521,7 +543,7 @@ mod tests { test_utils::init_test_logger(); let validator = Pubkey::new_unique(); let task = create_test_commit_task(1, 100, 0); - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![task.into()]; let strategy = TaskStrategist::build_strategy( tasks, @@ -539,7 +561,7 @@ mod tests { let validator = Pubkey::new_unique(); let task = create_test_commit_task(1, 1000, 0); // Large task - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![task.into()]; let strategy = TaskStrategist::build_strategy( tasks, @@ -560,7 +582,7 @@ mod tests { let validator = Pubkey::new_unique(); let task = create_test_commit_task(1, 66_000, 0); // Large task - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![task.into()]; let strategy = TaskStrategist::build_strategy( tasks, @@ -582,7 +604,7 @@ mod tests { let task = create_test_commit_task(1, 66_000, COMMIT_STATE_SIZE_THRESHOLD); // large account but small diff - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![task.into()]; let strategy = TaskStrategist::build_strategy( tasks, @@ -602,7 +624,7 @@ mod tests { let task = create_test_commit_task(1, 66_000, COMMIT_STATE_SIZE_THRESHOLD + 1); // large account but small diff - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![task.into()]; let strategy = TaskStrategist::build_strategy( tasks, @@ -621,7 +643,7 @@ mod tests { let task = create_test_commit_task(1, 66_000, COMMIT_STATE_SIZE_THRESHOLD * 4); // large account but small diff - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![task.into()]; let strategy = TaskStrategist::build_strategy( tasks, @@ -647,7 +669,7 @@ mod tests { let tasks = (0..NUM_COMMITS) .map(|i| { let task = create_test_commit_task(i, 500, 0); // Large task - Box::new(task) as Box + task.into() }) .collect(); @@ -675,7 +697,7 @@ mod tests { .map(|i| { // Large task let task = create_test_commit_task(i, 10000, 0); - Box::new(task) as Box + task.into() }) .collect(); @@ -702,7 +724,7 @@ mod tests { .map(|i| { // Large task let task = create_test_commit_task(i, 1000, 0); - Box::new(task) as Box + task.into() }) .collect(); @@ -716,10 +738,10 @@ mod tests { #[test] fn test_optimize_strategy_prioritizes_largest_tasks() { - let mut tasks = [ - Box::new(create_test_commit_task(1, 100, 0)) as Box, - Box::new(create_test_commit_task(2, 1000, 0)) as Box, // Larger task - Box::new(create_test_commit_task(3, 1000, 0)) as Box, // Larger task + let mut tasks: [BaseTaskImpl; 3] = [ + create_test_commit_task(1, 100, 0).into(), + create_test_commit_task(2, 1000, 0).into(), // Larger task + create_test_commit_task(3, 1000, 0).into(), // Larger task ]; let _ = TaskStrategist::try_optimize_tx_size_if_needed(&mut tasks); @@ -731,11 +753,11 @@ mod tests { #[test] fn test_mixed_task_types_with_optimization() { let validator = Pubkey::new_unique(); - let tasks = vec![ - Box::new(create_test_commit_task(1, 1000, 0)) as Box, - Box::new(create_test_finalize_task()) as Box, - Box::new(create_test_base_action_task(500)) as Box, - Box::new(create_test_undelegate_task()) as Box, + let tasks: Vec = vec![ + create_test_commit_task(1, 1000, 0).into(), + create_test_finalize_task().into(), + create_test_base_action_task(500).into(), + create_test_undelegate_task().into(), ]; let strategy = TaskStrategist::build_strategy( diff --git a/magicblock-committor-service/src/tasks/task_visitors/mod.rs b/magicblock-committor-service/src/tasks/task_visitors/mod.rs deleted file mode 100644 index cf1399a78..000000000 --- a/magicblock-committor-service/src/tasks/task_visitors/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub(crate) mod persistor_visitor; -pub(crate) mod utility_visitor; diff --git a/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs b/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs deleted file mode 100644 index 3976395a0..000000000 --- a/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs +++ /dev/null @@ -1,122 +0,0 @@ -use tracing::error; - -use crate::{ - persist::{CommitStrategy, IntentPersister}, - tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - }, -}; - -pub enum PersistorContext { - PersistStrategy { uses_lookup_tables: bool }, - // Other possible persist -} - -pub struct PersistorVisitor<'a, P> { - pub persistor: &'a P, - pub context: PersistorContext, -} - -impl

Visitor for PersistorVisitor<'_, P> -where - P: IntentPersister, -{ - fn visit_args_task(&mut self, task: &ArgsTask) { - match self.context { - PersistorContext::PersistStrategy { uses_lookup_tables } => { - let commit_strategy = |is_diff: bool| { - if is_diff { - if uses_lookup_tables { - CommitStrategy::DiffArgsWithLookupTable - } else { - CommitStrategy::DiffArgs - } - } else if uses_lookup_tables { - CommitStrategy::StateArgsWithLookupTable - } else { - CommitStrategy::StateArgs - } - }; - - let (commit_id, pubkey, commit_strategy) = match &task.task_type - { - ArgsTaskType::Commit(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(false), - ), - ArgsTaskType::CommitDiff(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(true), - ), - _ => return, - }; - - if let Err(err) = self.persistor.set_commit_strategy( - commit_id, - pubkey, - commit_strategy, - ) { - error!( - %commit_id, - %pubkey, - strategy = commit_strategy.as_str(), - error = ?err, - "Failed to persist commit strategy" - ); - } - } - } - } - - fn visit_buffer_task(&mut self, task: &BufferTask) { - match self.context { - PersistorContext::PersistStrategy { uses_lookup_tables } => { - let commit_strategy = |is_diff: bool| { - if is_diff { - if uses_lookup_tables { - CommitStrategy::DiffBufferWithLookupTable - } else { - CommitStrategy::DiffBuffer - } - } else if uses_lookup_tables { - CommitStrategy::StateBufferWithLookupTable - } else { - CommitStrategy::StateBuffer - } - }; - - let (commit_id, pubkey, commit_strategy) = match &task.task_type - { - BufferTaskType::Commit(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(false), - ), - BufferTaskType::CommitDiff(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(true), - ), - }; - - if let Err(err) = self.persistor.set_commit_strategy( - commit_id, - pubkey, - commit_strategy, - ) { - error!( - %commit_id, - %pubkey, - strategy = commit_strategy.as_str(), - error = ?err, - "Failed to persist commit strategy" - ); - } - } - } - } -} diff --git a/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs b/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs deleted file mode 100644 index c0c62b966..000000000 --- a/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs +++ /dev/null @@ -1,82 +0,0 @@ -use solana_pubkey::Pubkey; - -use crate::tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - BaseTask, FinalizeTask, -}; - -pub struct CommitMeta { - pub committed_pubkey: Pubkey, - pub commit_id: u64, - pub remote_slot: u64, -} - -impl From for FinalizeTask { - fn from(value: CommitMeta) -> Self { - FinalizeTask { - delegated_account: value.committed_pubkey, - } - } -} - -pub enum TaskVisitorUtils { - GetCommitMeta(Option), -} - -impl TaskVisitorUtils { - pub fn commit_meta(task: &dyn BaseTask) -> Option { - let mut v = TaskVisitorUtils::GetCommitMeta(None); - task.visit(&mut v); - - match v { - TaskVisitorUtils::GetCommitMeta(meta) => meta, - } - } -} - -impl Visitor for TaskVisitorUtils { - fn visit_args_task(&mut self, task: &ArgsTask) { - let Self::GetCommitMeta(commit_meta) = self; - - match &task.task_type { - ArgsTaskType::Commit(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - remote_slot: task.committed_account.remote_slot, - }) - } - ArgsTaskType::CommitDiff(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - remote_slot: task.committed_account.remote_slot, - }) - } - _ => *commit_meta = None, - } - } - - fn visit_buffer_task(&mut self, task: &BufferTask) { - let Self::GetCommitMeta(commit_meta) = self; - - match &task.task_type { - BufferTaskType::Commit(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - remote_slot: task.committed_account.remote_slot, - }) - } - BufferTaskType::CommitDiff(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - remote_slot: task.committed_account.remote_slot, - }) - } - } - } -} diff --git a/magicblock-committor-service/src/tasks/utils.rs b/magicblock-committor-service/src/tasks/utils.rs index 3325285ca..0247c3ba6 100644 --- a/magicblock-committor-service/src/tasks/utils.rs +++ b/magicblock-committor-service/src/tasks/utils.rs @@ -12,7 +12,9 @@ use solana_pubkey::Pubkey; use solana_signer::Signer; use solana_transaction::versioned::VersionedTransaction; -use crate::tasks::{task_strategist::TaskStrategistResult, BaseTask}; +use crate::tasks::{ + task_strategist::TaskStrategistResult, BaseTask, BaseTaskImpl, +}; pub struct TransactionUtils; impl TransactionUtils { @@ -29,7 +31,7 @@ impl TransactionUtils { } pub fn unique_involved_pubkeys( - tasks: &[Box], + tasks: &[BaseTaskImpl], validator: &Pubkey, budget_instructions: &[Instruction], ) -> Vec { @@ -50,7 +52,7 @@ impl TransactionUtils { pub fn tasks_instructions( validator: &Pubkey, - tasks: &[Box], + tasks: &[BaseTaskImpl], ) -> Vec { tasks .iter() @@ -60,7 +62,7 @@ impl TransactionUtils { pub fn assemble_tasks_tx( authority: &Keypair, - tasks: &[Box], + tasks: &[BaseTaskImpl], compute_unit_price: u64, lookup_tables: &[AddressLookupTableAccount], ) -> TaskStrategistResult { @@ -125,25 +127,21 @@ impl TransactionUtils { Ok(tx) } - pub fn tasks_compute_units(tasks: &[impl AsRef]) -> u32 { - tasks.iter().map(|task| task.as_ref().compute_units()).sum() + pub fn tasks_compute_units(tasks: &[BaseTaskImpl]) -> u32 { + tasks.iter().map(|task| task.compute_units()).sum() } - pub fn tasks_accounts_size_budget( - tasks: &[impl AsRef], - ) -> u32 { + pub fn tasks_accounts_size_budget(tasks: &[BaseTaskImpl]) -> u32 { if tasks.is_empty() { return 0; } - let total_budget: u32 = tasks - .iter() - .map(|task| task.as_ref().accounts_size_budget()) - .sum(); + let total_budget: u32 = + tasks.iter().map(|task| task.accounts_size_budget()).sum(); let dlp_task_count: u32 = tasks .iter() - .filter(|task| task.as_ref().program_id() == dlp::id()) + .filter(|task| task.program_id() == dlp::id()) .count() as u32; if dlp_task_count > 0 { diff --git a/magicblock-committor-service/src/tasks/visitor.rs b/magicblock-committor-service/src/tasks/visitor.rs deleted file mode 100644 index 1b9940a09..000000000 --- a/magicblock-committor-service/src/tasks/visitor.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::tasks::{args_task::ArgsTask, buffer_task::BufferTask}; - -pub trait Visitor { - fn visit_args_task(&mut self, task: &ArgsTask); - fn visit_buffer_task(&mut self, task: &BufferTask); -} diff --git a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs index 1aee95d7f..a7f9f9990 100644 --- a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs +++ b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs @@ -30,8 +30,8 @@ use tracing::{error, info}; use crate::{ persist::{CommitStatus, IntentPersister}, tasks::{ - task_strategist::TransactionStrategy, BaseTask, BaseTaskError, - CleanupTask, PreparationState, PreparationTask, + commit_task::CommitBufferStage, task_strategist::TransactionStrategy, + BaseTaskImpl, CleanupTask, PreparationTask, }, utils::persist_status_update, ComputeBudgetConfig, @@ -67,7 +67,7 @@ impl DeliveryPreparator { strategy.optimized_tasks.iter_mut().map(|task| async move { let _timer = metrics::observe_committor_intent_task_preparation_time( - task.as_ref(), + &*task, ); self.prepare_task_handling_errors(authority, task, persister) .await @@ -95,12 +95,16 @@ impl DeliveryPreparator { pub async fn prepare_task( &self, authority: &Keypair, - task: &mut dyn BaseTask, + task: &mut BaseTaskImpl, persister: &Option

, ) -> DeliveryPreparatorResult<(), InternalError> { - let PreparationState::Required(preparation_task) = - task.preparation_state() - else { + let BaseTaskImpl::Commit(commit_task) = task else { + return Ok(()); + }; + let Some(stage) = commit_task.stage_mut() else { + return Ok(()); + }; + let CommitBufferStage::Preparation(preparation_task) = stage else { return Ok(()); }; @@ -139,7 +143,7 @@ impl DeliveryPreparator { ); let cleanup_task = preparation_task.cleanup_task(); - task.switch_preparation_state(PreparationState::Cleanup(cleanup_task))?; + *stage = CommitBufferStage::Cleanup(cleanup_task); Ok(()) } @@ -148,10 +152,10 @@ impl DeliveryPreparator { pub async fn prepare_task_handling_errors( &self, authority: &Keypair, - task: &mut Box, + task: &mut BaseTaskImpl, persister: &Option

, ) -> Result<(), InternalError> { - let res = self.prepare_task(authority, task.as_mut(), persister).await; + let res = self.prepare_task(authority, task, persister).await; match res { Err(InternalError::BufferExecutionError( BufferExecutionError::AccountAlreadyInitializedError( @@ -165,22 +169,25 @@ impl DeliveryPreparator { res => return res, } - // Prepare cleanup task - let PreparationState::Required(preparation_task) = - task.preparation_state().clone() - else { + // Prepare cleanup task - set stage to Cleanup before calling cleanup + let BaseTaskImpl::Commit(commit_task) = task else { return Ok(()); }; - task.switch_preparation_state(PreparationState::Cleanup( - preparation_task.cleanup_task(), - ))?; - self.cleanup(authority, std::slice::from_ref(task), &[]) - .await?; - task.switch_preparation_state(PreparationState::Required( - preparation_task, - ))?; + let Some(stage) = commit_task.stage_mut() else { + return Ok(()); + }; + let CommitBufferStage::Preparation(preparation_task) = stage else { + return Ok(()); + }; + let preparation_task = preparation_task.clone(); + let cleanup_task = preparation_task.cleanup_task(); + + self.cleanup(authority, &[cleanup_task], &[]).await?; + + // Restore preparation stage for retry + *stage = CommitBufferStage::Preparation(preparation_task); - self.prepare_task(authority, task.as_mut(), persister).await + self.prepare_task(authority, task, persister).await } /// Initializes buffer account for future writes @@ -420,7 +427,7 @@ impl DeliveryPreparator { pub async fn cleanup( &self, authority: &Keypair, - tasks: &[Box], + cleanup_tasks: &[CleanupTask], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), BufferExecutionError> { self.table_mania @@ -429,19 +436,6 @@ impl DeliveryPreparator { )) .await; - let cleanup_tasks: Vec<_> = tasks - .iter() - .filter_map(|task| { - if let PreparationState::Cleanup(cleanup_task) = - task.preparation_state() - { - Some(cleanup_task) - } else { - None - } - }) - .collect(); - if cleanup_tasks.is_empty() { return Ok(()); } @@ -549,8 +543,6 @@ pub enum InternalError { MagicBlockRpcClientError(Box), #[error("BufferExecutionError: {0}")] BufferExecutionError(#[from] BufferExecutionError), - #[error("BaseTaskError: {0}")] - BaseTaskError(#[from] BaseTaskError), } impl From for InternalError { diff --git a/magicblock-committor-service/src/transaction_preparator/mod.rs b/magicblock-committor-service/src/transaction_preparator/mod.rs index 4eef17995..07760606a 100644 --- a/magicblock-committor-service/src/transaction_preparator/mod.rs +++ b/magicblock-committor-service/src/transaction_preparator/mod.rs @@ -8,7 +8,8 @@ use solana_pubkey::Pubkey; use crate::{ persist::IntentPersister, tasks::{ - task_strategist::TransactionStrategy, utils::TransactionUtils, BaseTask, + commit_task::CommitBufferStage, task_strategist::TransactionStrategy, + utils::TransactionUtils, BaseTaskImpl, }, transaction_preparator::{ delivery_preparator::{ @@ -37,7 +38,7 @@ pub trait TransactionPreparator: Send + Sync + 'static { async fn cleanup_for_strategy( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[BaseTaskImpl], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), BufferExecutionError>; } @@ -112,11 +113,24 @@ impl TransactionPreparator for TransactionPreparatorImpl { async fn cleanup_for_strategy( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[BaseTaskImpl], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), BufferExecutionError> { + let cleanup_tasks: Vec<_> = tasks + .iter() + .filter_map(|task| match task { + BaseTaskImpl::Commit(commit_task) => commit_task.stage(), + _ => None, + }) + .filter_map(|stage| match stage { + CommitBufferStage::Cleanup(cleanup_task) => { + Some(cleanup_task.clone()) + } + _ => None, + }) + .collect(); self.delivery_preparator - .cleanup(authority, tasks, lookup_table_keys) + .cleanup(authority, &cleanup_tasks, lookup_table_keys) .await } } diff --git a/test-integration/test-committor-service/tests/common.rs b/test-integration/test-committor-service/tests/common.rs index 81adae320..67d989d30 100644 --- a/test-integration/test-committor-service/tests/common.rs +++ b/test-integration/test-committor-service/tests/common.rs @@ -15,7 +15,7 @@ use magicblock_committor_service::{ }, IntentExecutorImpl, }, - tasks::CommitTask, + tasks::commit_task::{CommitDelivery, CommitTask}, transaction_preparator::{ delivery_preparator::DeliveryPreparator, TransactionPreparatorImpl, }, @@ -191,6 +191,17 @@ pub fn create_commit_task(data: &[u8]) -> CommitTask { }, remote_slot: Default::default(), }, + delivery_details: CommitDelivery::StateInArgs, + } +} + +#[allow(dead_code)] +pub fn create_buffer_commit_task(data: &[u8]) -> CommitTask { + let task = create_commit_task(data); + let stage = task.state_preparation_stage(); + CommitTask { + delivery_details: CommitDelivery::StateInBuffer { stage }, + ..task } } diff --git a/test-integration/test-committor-service/tests/test_delivery_preparator.rs b/test-integration/test-committor-service/tests/test_delivery_preparator.rs index dded722ee..c6c32258b 100644 --- a/test-integration/test-committor-service/tests/test_delivery_preparator.rs +++ b/test-integration/test-committor-service/tests/test_delivery_preparator.rs @@ -1,18 +1,19 @@ use borsh::BorshDeserialize; -use futures::future::join_all; use magicblock_committor_program::Chunks; use magicblock_committor_service::{ persist::IntentPersisterImpl, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, + commit_task::{CommitBufferStage, CommitDelivery}, task_strategist::{TaskStrategist, TransactionStrategy}, - BaseTask, PreparationState, + BaseTaskImpl, }, }; use solana_sdk::signer::Signer; -use crate::common::{create_commit_task, generate_random_bytes, TestFixture}; +use crate::common::{ + create_buffer_commit_task, create_commit_task, generate_random_bytes, + TestFixture, +}; mod common; @@ -22,11 +23,8 @@ async fn test_prepare_10kb_buffer() { let preparator = fixture.create_delivery_preparator(); let data = generate_random_bytes(10 * 1024); - let buffer_task = BufferTaskType::Commit(create_commit_task(&data)); let mut strategy = TransactionStrategy { - optimized_tasks: vec![Box::new(BufferTask::new_preparation_required( - buffer_task, - ))], + optimized_tasks: vec![create_buffer_commit_task(&data).into()], lookup_tables_keys: vec![], }; @@ -42,10 +40,13 @@ async fn test_prepare_10kb_buffer() { assert!(result.is_ok(), "Preparation failed: {:?}", result.err()); // Verify the buffer account was created and initialized - let PreparationState::Cleanup(cleanup_task) = - strategy.optimized_tasks[0].preparation_state() + let BaseTaskImpl::Commit(ref commit_task) = strategy.optimized_tasks[0] + else { + panic!("unexpected task type"); + }; + let Some(CommitBufferStage::Cleanup(cleanup_task)) = commit_task.stage() else { - panic!("unexpected PreparationState"); + panic!("unexpected CommitStage"); }; let buffer_pda = cleanup_task.buffer_pda(&fixture.authority.pubkey()); @@ -87,12 +88,10 @@ async fn test_prepare_multiple_buffers() { generate_random_bytes(10), generate_random_bytes(500 * 1024), ]; - let buffer_tasks = join_all(datas.iter().map(|data| async { - let task = BufferTaskType::Commit(create_commit_task(data.as_slice())); - Box::new(BufferTask::new_preparation_required(task)) - as Box - })) - .await; + let buffer_tasks: Vec = datas + .iter() + .map(|data| create_buffer_commit_task(data).into()) + .collect(); let mut strategy = TransactionStrategy { optimized_tasks: buffer_tasks, lookup_tables_keys: vec![], @@ -110,16 +109,20 @@ async fn test_prepare_multiple_buffers() { assert!(result.is_ok(), "Preparation failed: {:?}", result.err()); // Verify the buffer account was created and initialized - let cleanup_tasks = strategy.optimized_tasks.iter().map(|el| { - let PreparationState::Cleanup(cleanup_task) = el.preparation_state() - else { - panic!("Unexpected preparation state!"); - }; - - cleanup_task - }); - - for (i, cleanup_task) in cleanup_tasks.enumerate() { + let cleanup_tasks: Vec<_> = strategy + .optimized_tasks + .iter() + .filter_map(|el| match el { + BaseTaskImpl::Commit(commit_task) => commit_task.stage(), + _ => None, + }) + .filter_map(|stage| match stage { + CommitBufferStage::Cleanup(cleanup_task) => Some(cleanup_task), + _ => None, + }) + .collect(); + + for (i, cleanup_task) in cleanup_tasks.iter().enumerate() { // Check buffer account exists let buffer_pda = cleanup_task.buffer_pda(&fixture.authority.pubkey()); let buffer_account = fixture @@ -163,11 +166,10 @@ async fn test_lookup_tables() { generate_random_bytes(20), generate_random_bytes(30), ]; - let tasks = join_all(datas.iter().map(|data| async { - let task = ArgsTaskType::Commit(create_commit_task(data.as_slice())); - Box::::new(task.into()) as Box - })) - .await; + let tasks: Vec = datas + .iter() + .map(|data| create_commit_task(data).into()) + .collect(); let lookup_tables_keys = TaskStrategist::collect_lookup_table_keys( &fixture.authority.pubkey(), @@ -207,12 +209,9 @@ async fn test_already_initialized_error_handled() { let preparator = fixture.create_delivery_preparator(); let data = generate_random_bytes(10 * 1024); - let mut task = create_commit_task(&data); - let buffer_task = BufferTaskType::Commit(task.clone()); + let mut commit_task = create_buffer_commit_task(&data); let mut strategy = TransactionStrategy { - optimized_tasks: vec![Box::new(BufferTask::new_preparation_required( - buffer_task, - ))], + optimized_tasks: vec![commit_task.clone().into()], lookup_tables_keys: vec![], }; @@ -227,10 +226,11 @@ async fn test_already_initialized_error_handled() { assert!(result.is_ok(), "Preparation failed: {:?}", result.err()); // Verify the buffer account was created and initialized - let PreparationState::Cleanup(cleanup_task) = - strategy.optimized_tasks[0].preparation_state() - else { - panic!("unexpected PreparationState"); + let BaseTaskImpl::Commit(ref ct) = strategy.optimized_tasks[0] else { + panic!("unexpected task type"); + }; + let Some(CommitBufferStage::Cleanup(cleanup_task)) = ct.stage() else { + panic!("unexpected CommitStage"); }; // Check buffer account exists let buffer_pda = cleanup_task.buffer_pda(&fixture.authority.pubkey()); @@ -244,14 +244,15 @@ async fn test_already_initialized_error_handled() { // Imitate commit to the non deleted buffer using different length // Keep same task with commit id, swap data - let data = - generate_random_bytes(task.committed_account.account.data.len() - 2); - task.committed_account.account.data = data.clone(); - let buffer_task = BufferTaskType::Commit(task); + let data = generate_random_bytes( + commit_task.committed_account.account.data.len() - 2, + ); + commit_task.committed_account.account.data = data.clone(); + commit_task.delivery_details = CommitDelivery::StateInBuffer { + stage: commit_task.state_preparation_stage(), + }; let mut strategy = TransactionStrategy { - optimized_tasks: vec![Box::new(BufferTask::new_preparation_required( - buffer_task, - ))], + optimized_tasks: vec![commit_task.into()], lookup_tables_keys: vec![], }; @@ -266,10 +267,11 @@ async fn test_already_initialized_error_handled() { assert!(result.is_ok(), "Preparation failed: {:?}", result.err()); // Verify the buffer account was created and initialized - let PreparationState::Cleanup(cleanup_task) = - strategy.optimized_tasks[0].preparation_state() - else { - panic!("unexpected PreparationState"); + let BaseTaskImpl::Commit(ref ct) = strategy.optimized_tasks[0] else { + panic!("unexpected task type"); + }; + let Some(CommitBufferStage::Cleanup(cleanup_task)) = ct.stage() else { + panic!("unexpected CommitStage"); }; // Check buffer account exists @@ -297,27 +299,16 @@ async fn test_prepare_cleanup_and_reprepare_mixed_tasks() { // Keep these around to modify data later (same commit IDs, different data) let mut commit_args = create_commit_task(&args_data); - let mut commit_a = create_commit_task(&buf_a_data); - let mut commit_b = create_commit_task(&buf_b_data); + let mut commit_a = create_buffer_commit_task(&buf_a_data); + let mut commit_b = create_buffer_commit_task(&buf_b_data); let mut strategy = TransactionStrategy { optimized_tasks: vec![ // Args task — shouldn't need buffers - { - let t = ArgsTaskType::Commit(commit_args.clone()); - Box::::new(t.into()) as Box - }, + commit_args.clone().into(), // Two buffer tasks - { - let t = BufferTaskType::Commit(commit_a.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, - { - let t = BufferTaskType::Commit(commit_b.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, + commit_a.clone().into(), + commit_b.clone().into(), ], lookup_tables_keys: vec![], }; @@ -333,12 +324,18 @@ async fn test_prepare_cleanup_and_reprepare_mixed_tasks() { assert!(res.is_ok(), "Initial prepare failed: {:?}", res.err()); // Collect cleanup states for the two buffer tasks, verify they wrote expected data+chunks - let mut buffer_cleanups = Vec::new(); - for t in &strategy.optimized_tasks { - if let PreparationState::Cleanup(c) = t.preparation_state() { - buffer_cleanups.push(c); - } - } + let buffer_cleanups: Vec<_> = strategy + .optimized_tasks + .iter() + .filter_map(|t| match t { + BaseTaskImpl::Commit(ct) => ct.stage(), + _ => None, + }) + .filter_map(|stage| match stage { + CommitBufferStage::Cleanup(c) => Some(c), + _ => None, + }) + .collect(); assert_eq!( buffer_cleanups.len(), 2, @@ -401,23 +398,20 @@ async fn test_prepare_cleanup_and_reprepare_mixed_tasks() { .truncate(buf_b_data.len() - 5); } + // Rebuild buffer stages with mutated data + commit_a.delivery_details = CommitDelivery::StateInBuffer { + stage: commit_a.state_preparation_stage(), + }; + commit_b.delivery_details = CommitDelivery::StateInBuffer { + stage: commit_b.state_preparation_stage(), + }; + // --- Step 4: re-prepare with the same logical tasks (same commit IDs, mutated data) --- let mut strategy2 = TransactionStrategy { optimized_tasks: vec![ - { - let t = ArgsTaskType::Commit(commit_args.clone()); - Box::::new(t.into()) as Box - }, - { - let t = BufferTaskType::Commit(commit_a.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, - { - let t = BufferTaskType::Commit(commit_b.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, + commit_args.clone().into(), + commit_a.clone().into(), + commit_b.clone().into(), ], lookup_tables_keys: vec![], }; @@ -436,12 +430,18 @@ async fn test_prepare_cleanup_and_reprepare_mixed_tasks() { ); // Verify buffers reflect the *new* data and chunks are complete again - let mut buffer_cleanups2 = Vec::new(); - for t in &strategy2.optimized_tasks { - if let PreparationState::Cleanup(c) = t.preparation_state() { - buffer_cleanups2.push(c); - } - } + let buffer_cleanups2: Vec<_> = strategy2 + .optimized_tasks + .iter() + .filter_map(|t| match t { + BaseTaskImpl::Commit(ct) => ct.stage(), + _ => None, + }) + .filter_map(|stage| match stage { + CommitBufferStage::Cleanup(c) => Some(c), + _ => None, + }) + .collect(); assert_eq!( buffer_cleanups2.len(), 2, diff --git a/test-integration/test-committor-service/tests/test_transaction_preparator.rs b/test-integration/test-committor-service/tests/test_transaction_preparator.rs index 8bf165771..cd01db1ec 100644 --- a/test-integration/test-committor-service/tests/test_transaction_preparator.rs +++ b/test-integration/test-committor-service/tests/test_transaction_preparator.rs @@ -3,11 +3,10 @@ use magicblock_committor_program::Chunks; use magicblock_committor_service::{ persist::IntentPersisterImpl, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::BufferTask, + commit_task::CommitBufferStage, task_strategist::{TaskStrategist, TransactionStrategy}, utils::TransactionUtils, - BaseActionTask, BaseTask, FinalizeTask, PreparationState, + BaseActionTask, BaseActionTaskV1, BaseTaskImpl, FinalizeTask, TaskBuilderImpl, UndelegateTask, }, transaction_preparator::TransactionPreparator, @@ -20,7 +19,8 @@ use solana_pubkey::Pubkey; use solana_sdk::{signer::Signer, system_program}; use crate::common::{ - create_committed_account, generate_random_bytes, TestFixture, + create_buffer_commit_task, create_committed_account, generate_random_bytes, + TestFixture, }; mod common; @@ -34,16 +34,18 @@ async fn test_prepare_commit_tx_with_single_account() { let account_data = vec![1, 2, 3, 4, 5]; let committed_account = create_committed_account(&account_data); - let tasks = vec![ - Box::new(TaskBuilderImpl::create_commit_task( + let tasks: Vec = vec![ + TaskBuilderImpl::create_commit_task( 1, true, committed_account.clone(), None, - )) as Box, - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + ) + .into(), + FinalizeTask { delegated_account: committed_account.pubkey, - }))), + } + .into(), ]; let mut tx_strategy = TransactionStrategy { optimized_tasks: tasks, @@ -90,35 +92,30 @@ async fn test_prepare_commit_tx_with_multiple_accounts() { let account2_data = generate_random_bytes(12); let committed_account2 = create_committed_account(&account2_data); - let buffer_commit_task = BufferTask::new_preparation_required( - TaskBuilderImpl::create_commit_task( - 1, - true, - committed_account2.clone(), - None, - ) - .task_type - .into(), - ); + let mut buffer_commit_task = create_buffer_commit_task(&account2_data); + buffer_commit_task.committed_account.pubkey = committed_account2.pubkey; // Create test data - let tasks = vec![ + let tasks: Vec = vec![ // account 1 - Box::new(TaskBuilderImpl::create_commit_task( + TaskBuilderImpl::create_commit_task( 1, true, committed_account1.clone(), None, - )) as Box, + ) + .into(), // account 2 - Box::new(buffer_commit_task), + buffer_commit_task.into(), // finalize account 1 - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + FinalizeTask { delegated_account: committed_account1.pubkey, - }))), + } + .into(), // finalize account 2 - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + FinalizeTask { delegated_account: committed_account2.pubkey, - }))), + } + .into(), ]; let mut tx_strategy = TransactionStrategy { optimized_tasks: tasks, @@ -149,13 +146,15 @@ async fn test_prepare_commit_tx_with_multiple_accounts() { actual_message.set_recent_blockhash(*expected_message.recent_blockhash()); assert_eq!(actual_message, expected_message); - for task in tx_strategy.optimized_tasks { - let cleanup_task = match task.preparation_state() { - PreparationState::NotNeeded => continue, - PreparationState::Required(_) => { - panic!("Expected state is: PreparationState::Cleanup!") - } - PreparationState::Cleanup(value) => value, + for task in &tx_strategy.optimized_tasks { + let commit_task = match task { + BaseTaskImpl::Commit(ct) => ct, + _ => continue, + }; + let Some(CommitBufferStage::Cleanup(cleanup_task)) = + commit_task.stage() + else { + continue; }; let chunks_pda = cleanup_task.chunks_pda(&fixture.authority.pubkey()); let chunks_account = fixture @@ -192,27 +191,22 @@ async fn test_prepare_commit_tx_with_base_actions() { }], }; - let buffer_commit_task = BufferTask::new_preparation_required( - TaskBuilderImpl::create_commit_task( - 1, - true, - committed_account.clone(), - None, - ) - .task_type - .into(), - ); - let tasks = vec![ + let mut buffer_commit_task = + create_buffer_commit_task(&committed_account.account.data); + buffer_commit_task.committed_account.pubkey = committed_account.pubkey; + let tasks: Vec = vec![ // commit account - Box::new(buffer_commit_task.clone()) as Box, + buffer_commit_task.into(), // finalize account - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + FinalizeTask { delegated_account: committed_account.pubkey, - }))), + } + .into(), // BaseAction - Box::new(ArgsTask::new(ArgsTaskType::BaseAction(BaseActionTask { + BaseActionTask::V1(BaseActionTaskV1 { action: base_action, - }))), + }) + .into(), ]; // Test preparation @@ -246,11 +240,15 @@ async fn test_prepare_commit_tx_with_base_actions() { assert_eq!(actual_message, expected_message); // Now we verify that buffers were created - for task in tx_strategy.optimized_tasks { - let cleanup_task = match task.preparation_state() { - PreparationState::NotNeeded => continue, - PreparationState::Required(_) => panic!("Expected Cleanup state!"), - PreparationState::Cleanup(value) => value, + for task in &tx_strategy.optimized_tasks { + let commit_task = match task { + BaseTaskImpl::Commit(ct) => ct, + _ => continue, + }; + let Some(CommitBufferStage::Cleanup(cleanup_task)) = + commit_task.stage() + else { + continue; }; let chunks_pda = cleanup_task.chunks_pda(&fixture.authority.pubkey()); @@ -273,17 +271,19 @@ async fn test_prepare_finalize_tx_with_undelegate_with_atls() { // Create test data let committed_account = create_committed_account(&[1, 2, 3]); - let tasks: Vec> = vec![ + let tasks: Vec = vec![ // finalize account - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + FinalizeTask { delegated_account: committed_account.pubkey, - }))), - // BaseAction - Box::new(ArgsTask::new(ArgsTaskType::Undelegate(UndelegateTask { + } + .into(), + // Undelegate + UndelegateTask { delegated_account: committed_account.pubkey, owner_program: Pubkey::new_unique(), rent_reimbursement: Pubkey::new_unique(), - }))), + } + .into(), ]; let lookup_tables_keys = TaskStrategist::collect_lookup_table_keys(