Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ magicblock-test-storage/
.github/packages/npm-package/lib
.github/packages/npm-package/node_modules
.github/packages/npm-package/package-lock.json

# AI related
**/CLAUDE.md
19 changes: 8 additions & 11 deletions magicblock-committor-service/src/intent_executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::error;
use crate::{
tasks::{
task_builder::TaskBuilderError, task_strategist::TaskStrategistError,
BaseTask, TaskType,
BaseTaskImpl,
},
transaction_preparator::error::TransactionPreparatorError,
};
Expand Down Expand Up @@ -217,7 +217,7 @@ impl TransactionStrategyExecutionError {
pub fn try_from_transaction_error(
err: TransactionError,
signature: Option<Signature>,
tasks: &[Box<dyn BaseTask>],
tasks: &[BaseTaskImpl],
) -> Result<Self, TransactionError> {
// Commit Nonce order error
const NONCE_OUT_OF_ORDER: u32 =
Expand Down Expand Up @@ -257,15 +257,12 @@ impl TransactionStrategyExecutionError {
return Err(tx_err_helper(instruction_err));
};

let Some(task_type) = tasks
.get(action_index as usize)
.map(|task| task.task_type())
else {
let Some(task) = tasks.get(action_index as usize) else {
return Err(tx_err_helper(instruction_err));
};

match (task_type, instruction_err) {
(TaskType::Commit, instruction_err) => match instruction_err
match (task, instruction_err) {
(BaseTaskImpl::Commit(_), instruction_err) => match instruction_err
{
InstructionError::Custom(NONCE_OUT_OF_ORDER) => Ok(
TransactionStrategyExecutionError::CommitIDError(
Expand Down Expand Up @@ -294,13 +291,13 @@ impl TransactionStrategyExecutionError {
}
err => Err(tx_err_helper(err)),
},
(TaskType::Action, instruction_err) => {
(BaseTaskImpl::BaseAction(_), instruction_err) => {
Ok(TransactionStrategyExecutionError::ActionsError(
tx_err_helper(instruction_err),
signature,
))
}
(TaskType::Undelegate, instruction_err) => Ok(
(BaseTaskImpl::Undelegate(_), instruction_err) => Ok(
TransactionStrategyExecutionError::UndelegationError(
tx_err_helper(instruction_err),
signature,
Expand Down Expand Up @@ -335,7 +332,7 @@ impl metrics::LabelValue for TransactionStrategyExecutionError {
}

pub(crate) struct IntentTransactionErrorMapper<'a> {
pub tasks: &'a [Box<dyn BaseTask>],
pub tasks: &'a [BaseTaskImpl],
}
impl TransactionErrorMapper for IntentTransactionErrorMapper<'_> {
type ExecutionError = TransactionStrategyExecutionError;
Expand Down
34 changes: 15 additions & 19 deletions magicblock-committor-service/src/intent_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ use crate::{
task_strategist::{
StrategyExecutionMode, TaskStrategist, TransactionStrategy,
},
task_visitors::utility_visitor::TaskVisitorUtils,
BaseTask, TaskType,
BaseTaskImpl,
},
transaction_preparator::{
delivery_preparator::BufferExecutionError,
Expand Down Expand Up @@ -311,24 +310,20 @@ where
committed_pubkeys: &[Pubkey],
strategy: &mut TransactionStrategy,
) -> Result<TransactionStrategy, TaskBuilderError> {
let tasks_and_metas: Vec<_> = strategy
let commit_tasks: Vec<_> = strategy
.optimized_tasks
.iter_mut()
.filter_map(|task| {
let mut visitor = TaskVisitorUtils::GetCommitMeta(None);
task.visit(&mut visitor);
if let TaskVisitorUtils::GetCommitMeta(Some(commit_meta)) =
visitor
{
Some((task, commit_meta))
if let BaseTaskImpl::Commit(commit_task) = task {
Some(commit_task)
} else {
None
}
})
.collect();
let min_context_slot = tasks_and_metas
let min_context_slot = commit_tasks
.iter()
.map(|(_, meta)| meta.remote_slot)
.map(|task| task.committed_account.remote_slot)
.max()
.unwrap_or_default();

Expand All @@ -345,17 +340,18 @@ where
// Here we find the broken tasks and reset them
// Broken tasks are prepared incorrectly so they have to be cleaned up
let mut to_cleanup = Vec::new();
for (task, commit_meta) in tasks_and_metas {
let Some(commit_id) = commit_ids.get(&commit_meta.committed_pubkey)
for task in commit_tasks {
let Some(commit_id) =
commit_ids.get(&task.committed_account.pubkey)
else {
continue;
};
if commit_id == &commit_meta.commit_id {
if commit_id == &task.commit_id {
continue;
}

// Handle invalid tasks
to_cleanup.push(task.clone());
to_cleanup.push(BaseTaskImpl::Commit(task.clone()));
task.reset_commit_id(*commit_id);
}

Expand All @@ -376,7 +372,7 @@ where
let (optimized_tasks, action_tasks) = strategy
.optimized_tasks
.drain(..)
.partition(|el| el.task_type() != TaskType::Action);
.partition(|el| !matches!(el, BaseTaskImpl::BaseAction(_)));
strategy.optimized_tasks = optimized_tasks;

let old_alts = strategy.dummy_revaluate_alts(&self.authority.pubkey());
Expand Down Expand Up @@ -404,7 +400,7 @@ where
strategy
.optimized_tasks
.into_iter()
.partition(|el| el.task_type() == TaskType::Commit);
.partition(|el| matches!(el, BaseTaskImpl::Commit(_)));

let commit_alt_pubkeys = if strategy.lookup_tables_keys.is_empty() {
vec![]
Expand Down Expand Up @@ -450,7 +446,7 @@ where
let position = strategy
.optimized_tasks
.iter()
.position(|el| el.task_type() == TaskType::Undelegate);
.position(|el| matches!(el, BaseTaskImpl::Undelegate(_)));

if let Some(position) = position {
// Remove everything after undelegation including post undelegation actions
Expand Down Expand Up @@ -643,7 +639,7 @@ where
async fn execute_message_with_retries(
&self,
prepared_message: VersionedMessage,
tasks: &[Box<dyn BaseTask>],
tasks: &[BaseTaskImpl],
) -> IntentExecutorResult<Signature, TransactionStrategyExecutionError>
{
struct IntentErrorMapper<TxMap> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ use crate::{
},
persist::{IntentPersister, IntentPersisterImpl},
tasks::{
args_task::{ArgsTask, ArgsTaskType},
task_strategist::TransactionStrategy,
task_visitors::utility_visitor::TaskVisitorUtils,
BaseTask,
commit_task::CommitTask, task_strategist::TransactionStrategy,
BaseTaskImpl, FinalizeTask,
},
transaction_preparator::TransactionPreparator,
};
Expand Down Expand Up @@ -159,14 +157,12 @@ where
) => {
let optimized_tasks =
transaction_strategy.optimized_tasks.as_slice();
if let Some(task) = err
if let Some(BaseTaskImpl::Commit(task)) = err
.task_index()
.and_then(|index| optimized_tasks.get(index as usize))
{
Self::handle_unfinalized_account_error(
inner,
signature,
task.as_ref(),
inner, signature, task,
)
.await
} else {
Expand Down Expand Up @@ -204,14 +200,12 @@ where
async fn handle_unfinalized_account_error(
inner: &IntentExecutorImpl<T, F>,
failed_signature: &Option<Signature>,
task: &dyn BaseTask,
task: &CommitTask,
) -> IntentExecutorResult<ControlFlow<(), TransactionStrategy>> {
let Some(commit_meta) = TaskVisitorUtils::commit_meta(task) else {
// Can't recover - break execution
return Ok(ControlFlow::Break(()));
};
let finalize_task: Box<dyn BaseTask> =
Box::new(ArgsTask::new(ArgsTaskType::Finalize(commit_meta.into())));
let finalize_task: BaseTaskImpl = FinalizeTask {
delegated_account: task.committed_account.pubkey,
}
.into();
inner
.prepare_and_execute_strategy(
&mut TransactionStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use crate::{
},
persist::{IntentPersister, IntentPersisterImpl},
tasks::{
args_task::{ArgsTask, ArgsTaskType},
task_strategist::TransactionStrategy,
task_visitors::utility_visitor::TaskVisitorUtils,
BaseTask,
commit_task::CommitTask, task_strategist::TransactionStrategy,
BaseTaskImpl, FinalizeTask,
},
transaction_preparator::TransactionPreparator,
};
Expand Down Expand Up @@ -165,13 +163,11 @@ where
let optimized_tasks =
commit_strategy.optimized_tasks.as_slice();
let task_index = err.task_index();
if let Some(task) = task_index
if let Some(BaseTaskImpl::Commit(task)) = task_index
.and_then(|index| optimized_tasks.get(index as usize))
{
Self::handle_unfinalized_account_error(
inner,
signature,
task.as_ref(),
inner, signature, task,
)
.await
} else {
Expand Down Expand Up @@ -217,14 +213,12 @@ where
async fn handle_unfinalized_account_error(
inner: &IntentExecutorImpl<T, F>,
failed_signature: &Option<Signature>,
task: &dyn BaseTask,
task: &CommitTask,
) -> IntentExecutorResult<ControlFlow<(), TransactionStrategy>> {
let Some(commit_meta) = TaskVisitorUtils::commit_meta(task) else {
// Can't recover - break execution
return Ok(ControlFlow::Break(()));
};
let finalize_task: Box<dyn BaseTask> =
Box::new(ArgsTask::new(ArgsTaskType::Finalize(commit_meta.into())));
let finalize_task: BaseTaskImpl = FinalizeTask {
delegated_account: task.committed_account.pubkey,
}
.into();
inner
.prepare_and_execute_strategy(
&mut TransactionStrategy {
Expand Down
Loading