Skip to content

Commit e2d56c5

Browse files
committed
Simplify Kafka config to use properties file pattern
- Remove --metering-kafka-brokers CLI arg (use properties file instead) - Make --metering-kafka-properties-file the only required arg for Kafka - Default --metering-kafka-topic to tips-ingress - Make --metering-kafka-group-id optional (overrides properties file) - Load all rdkafka settings from properties file instead of hardcoding
1 parent 72f0667 commit e2d56c5

File tree

3 files changed

+49
-54
lines changed

3 files changed

+49
-54
lines changed

bin/node/src/cli.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,20 @@ pub struct Args {
4646
pub enable_metering: bool,
4747

4848
// --- Priority fee estimation args ---
49-
/// Kafka brokers for metering bundle events (comma-separated)
50-
#[arg(long = "metering-kafka-brokers")]
51-
pub metering_kafka_brokers: Option<String>,
49+
/// Path to Kafka properties file (required for priority fee estimation).
50+
/// The properties file should contain rdkafka settings like bootstrap.servers,
51+
/// group.id, session.timeout.ms, etc.
52+
#[arg(long = "metering-kafka-properties-file")]
53+
pub metering_kafka_properties_file: Option<String>,
5254

5355
/// Kafka topic for accepted bundle events
54-
#[arg(long = "metering-kafka-topic")]
55-
pub metering_kafka_topic: Option<String>,
56+
#[arg(long = "metering-kafka-topic", default_value = "tips-ingress")]
57+
pub metering_kafka_topic: String,
5658

57-
/// Kafka consumer group ID
59+
/// Kafka consumer group ID (overrides group.id in properties file if set)
5860
#[arg(long = "metering-kafka-group-id")]
5961
pub metering_kafka_group_id: Option<String>,
6062

61-
/// Optional path to Kafka properties file
62-
#[arg(long = "metering-kafka-properties-file")]
63-
pub metering_kafka_properties_file: Option<String>,
64-
6563
/// Gas limit per flashblock for priority fee estimation
6664
#[arg(long = "metering-gas-limit", default_value = "30000000")]
6765
pub metering_gas_limit: u64,
@@ -108,20 +106,12 @@ impl From<Args> for BaseNodeConfig {
108106
max_pending_blocks_depth: args.max_pending_blocks_depth,
109107
});
110108

111-
// Build Kafka config if all required fields are present
112-
let kafka = match (
113-
args.metering_kafka_brokers,
114-
args.metering_kafka_topic,
115-
args.metering_kafka_group_id,
116-
) {
117-
(Some(brokers), Some(topic), Some(group_id)) => Some(KafkaConfig {
118-
brokers,
119-
topic,
120-
group_id,
121-
properties_file: args.metering_kafka_properties_file,
122-
}),
123-
_ => None,
124-
};
109+
// Build Kafka config if properties file is provided
110+
let kafka = args.metering_kafka_properties_file.map(|properties_file| KafkaConfig {
111+
properties_file,
112+
topic: args.metering_kafka_topic,
113+
group_id_override: args.metering_kafka_group_id,
114+
});
125115

126116
let metering = MeteringConfig {
127117
enabled: args.enable_metering,

crates/runner/src/config.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,18 @@ pub struct MeteringConfig {
6767
}
6868

6969
/// Kafka connection configuration.
70+
///
71+
/// All rdkafka settings (bootstrap.servers, group.id, timeouts, etc.) should be
72+
/// specified in the properties file. The CLI only specifies the path to this file
73+
/// and the topic name.
7074
#[derive(Debug, Clone)]
7175
pub struct KafkaConfig {
72-
/// Comma-separated broker addresses.
73-
pub brokers: String,
76+
/// Path to the Kafka properties file containing rdkafka settings.
77+
pub properties_file: String,
7478
/// Topic name for accepted bundle events.
7579
pub topic: String,
76-
/// Consumer group ID.
77-
pub group_id: String,
78-
/// Optional path to properties file.
79-
pub properties_file: Option<String>,
80+
/// Optional consumer group ID override (takes precedence over properties file).
81+
pub group_id_override: Option<String>,
8082
}
8183

8284
/// Resource limits for priority fee estimation.

crates/runner/src/extensions/rpc.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl BaseNodeExtension for BaseRpcExtension {
154154
if metering.enabled && metering.kafka.is_none() {
155155
warn!(
156156
message = "Metering enabled but Kafka not configured",
157-
help = "Priority fee estimation requires --metering-kafka-brokers, --metering-kafka-topic, and --metering-kafka-group-id"
157+
help = "Priority fee estimation requires --metering-kafka-properties-file"
158158
);
159159
}
160160

@@ -201,31 +201,34 @@ impl BaseNodeExtension for BaseRpcExtension {
201201

202202
// Spawn Kafka consumer if configured
203203
if let (Some(runtime), Some(kafka_cfg)) = (&metering_runtime, &metering.kafka) {
204-
info!(message = "Starting Kafka consumer for metering");
204+
info!(
205+
message = "Starting Kafka consumer for metering",
206+
properties_file = %kafka_cfg.properties_file,
207+
topic = %kafka_cfg.topic
208+
);
205209

206-
let mut client_config = ClientConfig::new();
207-
client_config.set("bootstrap.servers", &kafka_cfg.brokers);
208-
client_config.set("group.id", &kafka_cfg.group_id);
209-
client_config.set("enable.partition.eof", "false");
210-
client_config.set("session.timeout.ms", "6000");
211-
client_config.set("enable.auto.commit", "true");
212-
client_config.set("auto.offset.reset", "earliest");
213-
214-
if let Some(path) = kafka_cfg.properties_file.as_ref() {
215-
match load_kafka_config_from_file(path) {
216-
Ok(props) => {
217-
for (key, value) in props {
218-
client_config.set(key, value);
219-
}
220-
}
221-
Err(err) => {
222-
warn!(
223-
message = "Failed to load Kafka properties file",
224-
file = %path,
225-
%err
226-
);
227-
}
210+
// Load all rdkafka settings from the properties file
211+
let props = match load_kafka_config_from_file(&kafka_cfg.properties_file) {
212+
Ok(props) => props,
213+
Err(err) => {
214+
error!(
215+
target: "metering::kafka",
216+
file = %kafka_cfg.properties_file,
217+
%err,
218+
"Failed to load Kafka properties file"
219+
);
220+
return Ok(());
228221
}
222+
};
223+
224+
let mut client_config = ClientConfig::new();
225+
for (key, value) in props {
226+
client_config.set(key, value);
227+
}
228+
229+
// Apply CLI override for group.id if specified
230+
if let Some(group_id) = &kafka_cfg.group_id_override {
231+
client_config.set("group.id", group_id);
229232
}
230233

231234
let tx_sender = runtime.tx_sender.clone();

0 commit comments

Comments
 (0)