Skip to content

Commit 7adbbac

Browse files
committed
integrate with audit
1 parent f6b0911 commit 7adbbac

File tree

12 files changed

+1273
-182
lines changed

12 files changed

+1273
-182
lines changed

Cargo.lock

Lines changed: 975 additions & 150 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,6 @@ opentelemetry = { version = "0.31", features = ["trace"] }
216216

217217
# Base Path
218218
concurrent-queue = "2.5.0"
219-
tips-core = { git = "https://github.com/base/tips", rev = "c08eaa4fe10c26de8911609b41ddab4918698325", default-features = false }
219+
tips-core = { git = "https://github.com/base/tips", rev = "d571797179fd0cc9e932373ba9a4eb5c2be0c0c9", default-features = false }
220+
tips-audit = { git = "https://github.com/base/tips", rev = "d571797179fd0cc9e932373ba9a4eb5c2be0c0c9" }
221+
rdkafka = { version = "0.37" }

crates/op-rbuilder/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ reqwest = "0.12.23"
135135
k256 = "0.13.4"
136136

137137
rollup-boost.workspace = true
138+
tips-audit.workspace = true
139+
rdkafka.workspace = true
138140

139141
nanoid = { version = "0.4", optional = true }
140142
reth-ipc = { workspace = true, optional = true }

crates/op-rbuilder/src/args/op.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,21 @@ pub struct OpRbuilderArgs {
6262
#[arg(long = "builder.backrun-bundle-buffer-size", default_value = "10000")]
6363
pub backrun_bundle_buffer_size: usize,
6464

65+
/// Path to Kafka properties file for audit events (enables Kafka audit if set)
66+
#[arg(
67+
long = "builder.audit-kafka-properties",
68+
env = "AUDIT_KAFKA_PROPERTIES_FILE"
69+
)]
70+
pub audit_kafka_properties: Option<String>,
71+
72+
/// Kafka topic for audit events
73+
#[arg(
74+
long = "builder.audit-kafka-topic",
75+
env = "AUDIT_KAFKA_TOPIC",
76+
default_value = "tips-audit"
77+
)]
78+
pub audit_kafka_topic: String,
79+
6580
/// Path to builder playgorund to automatically start up the node connected to it
6681
#[arg(
6782
long = "builder.playground",

crates/op-rbuilder/src/builders/context.rs

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ use reth_revm::{State, context::Block};
3636
use reth_transaction_pool::{BestTransactionsAttributes, PoolTransaction};
3737
use revm::{DatabaseCommit, context::result::ResultAndState, interpreter::as_u64_saturated};
3838
use std::{sync::Arc, time::Instant};
39+
use tips_audit::BundleEvent;
3940
use tokio_util::sync::CancellationToken;
40-
use tracing::{debug, info, trace};
41+
use tracing::{debug, info, trace, warn};
4142

4243
use crate::{
44+
bundles::AuditSender,
4345
gas_limiter::AddressGasLimiter,
4446
metrics::OpRBuilderMetrics,
4547
primitives::reth::{ExecutionInfo, TxnExecutionResult},
@@ -82,6 +84,8 @@ pub struct OpPayloadBuilderCtx<ExtraCtx: Debug + Default = ()> {
8284
pub resource_metering: ResourceMetering,
8385
/// Backrun bundle store for storing backrun transactions
8486
pub backrun_bundle_store: crate::bundles::BackrunBundleStore,
87+
/// Audit event channel for backrun bundle lifecycle tracking
88+
pub audit_tx: Option<AuditSender>,
8589
}
8690

8791
impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
@@ -93,6 +97,19 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
9397
Self { extra_ctx, ..self }
9498
}
9599

100+
/// Send an audit event if the audit channel is configured
101+
fn send_audit_event(&self, event: BundleEvent) {
102+
if let Some(ref audit_tx) = self.audit_tx {
103+
if let Err(e) = audit_tx.send(event) {
104+
warn!(
105+
target: "payload_builder",
106+
error = %e,
107+
"Failed to send audit event"
108+
);
109+
}
110+
}
111+
}
112+
96113
/// Returns the parent block the payload will be build on.
97114
pub fn parent(&self) -> &SealedHeader {
98115
&self.config.parent_header
@@ -447,6 +464,17 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
447464

448465
num_txs_considered += 1;
449466

467+
// Look up bundle_id for this tx (registered via base_txBundleId RPC)
468+
let bundle_id = self.backrun_bundle_store.get_tx_bundle_id(&tx_hash);
469+
470+
// Emit StartExecuting audit event - tx is about to be executed
471+
self.send_audit_event(BundleEvent::StartExecuting {
472+
bundle_id,
473+
tx_hash,
474+
block_number: self.block_number(),
475+
timestamp_ms: chrono::Utc::now().timestamp_millis(),
476+
});
477+
450478
let _resource_usage = self.resource_metering.get(&tx_hash);
451479

452480
// TODO: ideally we should get this from the txpool stream
@@ -605,13 +633,88 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
605633
info.executed_senders.push(tx.signer());
606634
info.executed_transactions.push(tx.into_inner());
607635

608-
info!(message = "Executed transaction", tx_hash = ?tx_hash);
636+
// Emit Executed audit event - tx successfully executed and committed
637+
self.send_audit_event(BundleEvent::Executed {
638+
bundle_id,
639+
tx_hash,
640+
block_number: self.block_number(),
641+
gas_used,
642+
timestamp_ms: chrono::Utc::now().timestamp_millis(),
643+
});
609644

610645
// Execute backrun bundles for this transaction if it succeeded
611646
if is_success && let Some(backrun_bundles) = self.backrun_bundle_store.get(&tx_hash) {
612-
info!(target: "backrun_bundles", target_tx = ?tx_hash, bundle_count = backrun_bundles.len(), "Found backrun bundles for transaction");
613647
self.metrics.backrun_target_txs_found_total.increment(1);
614-
// TODO: Execute backrun bundles
648+
649+
for stored_bundle in backrun_bundles {
650+
let bundle_id = stored_bundle.bundle_id;
651+
for backrun_tx in stored_bundle.backrun_txs {
652+
let ResultAndState { result, state } = match evm.transact(&backrun_tx) {
653+
Ok(res) => res,
654+
Err(err) => {
655+
return Err(PayloadBuilderError::evm(err));
656+
}
657+
};
658+
659+
let backrun_gas_used = result.gas_used();
660+
let is_backrun_success = result.is_success();
661+
662+
if !is_backrun_success {
663+
info!(message = "Backrun transaction reverted", tx_hash = ?backrun_tx.tx_hash(), bundle_id = %bundle_id);
664+
665+
// Emit BackrunBundleExecuted audit event for reverted tx
666+
self.send_audit_event(BundleEvent::BackrunBundleExecuted {
667+
bundle_id,
668+
target_tx_hash: tx_hash,
669+
backrun_tx_hash: backrun_tx.tx_hash(),
670+
block_number: self.block_number(),
671+
gas_used: backrun_gas_used,
672+
success: false,
673+
timestamp_ms: chrono::Utc::now().timestamp_millis(),
674+
});
675+
676+
continue;
677+
}
678+
679+
info!(message = "Backrun transaction succeeded", tx_hash = ?backrun_tx.tx_hash(), bundle_id = %bundle_id);
680+
681+
info.cumulative_gas_used += backrun_gas_used;
682+
info.cumulative_da_bytes_used += backrun_tx.encoded_2718().len() as u64;
683+
684+
let ctx = ReceiptBuilderCtx {
685+
tx: backrun_tx.inner(),
686+
evm: &evm,
687+
result,
688+
state: &state,
689+
cumulative_gas_used: info.cumulative_gas_used,
690+
};
691+
info.receipts.push(self.build_receipt(ctx, None));
692+
693+
// commit changes
694+
evm.db_mut().commit(state);
695+
696+
// Emit BackrunBundleExecuted audit event for successful tx
697+
self.send_audit_event(BundleEvent::BackrunBundleExecuted {
698+
bundle_id,
699+
target_tx_hash: tx_hash,
700+
backrun_tx_hash: backrun_tx.tx_hash(),
701+
block_number: self.block_number(),
702+
gas_used: backrun_gas_used,
703+
success: true,
704+
timestamp_ms: chrono::Utc::now().timestamp_millis(),
705+
});
706+
707+
// update add to total fees
708+
let miner_fee = backrun_tx
709+
.effective_tip_per_gas(base_fee)
710+
.expect("fee is always valid; execution succeeded");
711+
info.total_fees += U256::from(miner_fee) * U256::from(backrun_gas_used);
712+
713+
// append sender and transaction to the respective lists
714+
info.executed_senders.push(backrun_tx.signer());
715+
info.executed_transactions.push(backrun_tx.into_inner());
716+
}
717+
}
615718
}
616719
}
617720

crates/op-rbuilder/src/builders/flashblocks/ctx.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
builders::{BuilderConfig, OpPayloadBuilderCtx, flashblocks::FlashblocksConfig},
3-
bundles::BackrunBundleStore,
3+
bundles::{AuditSender, BackrunBundleStore},
44
gas_limiter::{AddressGasLimiter, args::GasLimiterArgs},
55
metrics::OpRBuilderMetrics,
66
resource_metering::ResourceMetering,
@@ -35,6 +35,8 @@ pub(super) struct OpPayloadSyncerCtx {
3535
resource_metering: ResourceMetering,
3636
/// Backrun bundle store
3737
backrun_bundle_store: BackrunBundleStore,
38+
/// Audit event channel
39+
audit_tx: Option<AuditSender>,
3840
}
3941

4042
impl OpPayloadSyncerCtx {
@@ -48,6 +50,7 @@ impl OpPayloadSyncerCtx {
4850
Client: ClientBounds,
4951
{
5052
let chain_spec = client.chain_spec();
53+
let audit_tx = builder_config.backrun_bundle_store.audit_tx();
5154
Ok(Self {
5255
evm_config,
5356
da_config: builder_config.da_config.clone(),
@@ -56,6 +59,7 @@ impl OpPayloadSyncerCtx {
5659
metrics,
5760
resource_metering: builder_config.resource_metering,
5861
backrun_bundle_store: builder_config.backrun_bundle_store,
62+
audit_tx,
5963
})
6064
}
6165

@@ -90,6 +94,7 @@ impl OpPayloadSyncerCtx {
9094
address_gas_limiter: AddressGasLimiter::new(GasLimiterArgs::default()),
9195
resource_metering: self.resource_metering.clone(),
9296
backrun_bundle_store: self.backrun_bundle_store.clone(),
97+
audit_tx: self.audit_tx,
9398
}
9499
}
95100
}

crates/op-rbuilder/src/builders/flashblocks/payload.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ where
284284
address_gas_limiter: self.address_gas_limiter.clone(),
285285
resource_metering: self.config.resource_metering.clone(),
286286
backrun_bundle_store: self.config.backrun_bundle_store.clone(),
287+
audit_tx: self.config.backrun_bundle_store.audit_tx(),
287288
})
288289
}
289290

crates/op-rbuilder/src/builders/standard/payload.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ where
253253
address_gas_limiter: self.address_gas_limiter.clone(),
254254
resource_metering: self.config.resource_metering.clone(),
255255
backrun_bundle_store: self.config.backrun_bundle_store.clone(),
256+
audit_tx: self.config.backrun_bundle_store.audit_tx(),
256257
};
257258

258259
let builder = OpBuilder::new(best);

0 commit comments

Comments
 (0)