Skip to content

Commit 946eff0

Browse files
p2p: make safety limits configurable
Add configurable safety limits for message/frame caps, per-peer/per-conn budgets, hop limits, and dial jitter/backoff caps. Wire these from node config into the networking layer and make the relay dedup cache bounds configurable as well. Made-with: Cursor
1 parent ff2d4bc commit 946eff0

5 files changed

Lines changed: 288 additions & 40 deletions

File tree

crates/catalyst-cli/src/config.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,62 @@ pub struct NetworkConfig {
141141

142142
/// Network timeouts
143143
pub timeouts: NetworkTimeouts,
144+
145+
/// P2P safety limits (DoS bounding) applied by the networking layer.
146+
#[serde(default)]
147+
pub safety_limits: P2pSafetyLimits,
148+
149+
/// Relay cache bounds (used for multi-hop rebroadcast + dedup).
150+
#[serde(default)]
151+
pub relay_cache: RelayCacheConfig,
152+
}
153+
154+
#[derive(Debug, Clone, Serialize, Deserialize)]
155+
pub struct P2pSafetyLimits {
156+
pub max_gossip_message_bytes: usize,
157+
pub per_peer_max_msgs_per_sec: u32,
158+
pub per_peer_max_bytes_per_sec: usize,
159+
pub max_tcp_frame_bytes: usize,
160+
pub per_conn_max_msgs_per_sec: u32,
161+
pub per_conn_max_bytes_per_sec: usize,
162+
pub max_hops: u8,
163+
pub dedup_cache_max_entries: usize,
164+
pub dial_jitter_max_ms: u64,
165+
pub dial_backoff_max_ms: u64,
166+
}
167+
168+
impl Default for P2pSafetyLimits {
169+
fn default() -> Self {
170+
Self {
171+
max_gossip_message_bytes: 8 * 1024 * 1024,
172+
per_peer_max_msgs_per_sec: 200,
173+
per_peer_max_bytes_per_sec: 8 * 1024 * 1024,
174+
max_tcp_frame_bytes: 8 * 1024 * 1024,
175+
per_conn_max_msgs_per_sec: 200,
176+
per_conn_max_bytes_per_sec: 8 * 1024 * 1024,
177+
max_hops: 10,
178+
dedup_cache_max_entries: 20_000,
179+
dial_jitter_max_ms: 250,
180+
dial_backoff_max_ms: 60_000,
181+
}
182+
}
183+
}
184+
185+
#[derive(Debug, Clone, Serialize, Deserialize)]
186+
pub struct RelayCacheConfig {
187+
pub max_entries: usize,
188+
pub target_entries: usize,
189+
pub retention_seconds: u64,
190+
}
191+
192+
impl Default for RelayCacheConfig {
193+
fn default() -> Self {
194+
Self {
195+
max_entries: 5000,
196+
target_entries: 4000,
197+
retention_seconds: 10 * 60,
198+
}
199+
}
144200
}
145201

146202
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -458,6 +514,8 @@ impl Default for NodeConfig {
458514
request_timeout: 10,
459515
keep_alive_interval: 30,
460516
},
517+
safety_limits: P2pSafetyLimits::default(),
518+
relay_cache: RelayCacheConfig::default(),
461519
},
462520
storage: StorageConfig {
463521
data_dir: PathBuf::from("data"),
@@ -669,6 +727,51 @@ impl NodeConfig {
669727
if self.network.listen_addresses.is_empty() {
670728
return Err(anyhow::anyhow!("At least one listen address must be specified"));
671729
}
730+
731+
// Validate P2P safety limits
732+
{
733+
let sl = &self.network.safety_limits;
734+
if sl.max_gossip_message_bytes == 0
735+
|| sl.max_tcp_frame_bytes == 0
736+
|| sl.per_peer_max_msgs_per_sec == 0
737+
|| sl.per_conn_max_msgs_per_sec == 0
738+
|| sl.per_peer_max_bytes_per_sec == 0
739+
|| sl.per_conn_max_bytes_per_sec == 0
740+
{
741+
return Err(anyhow::anyhow!("network.safety_limits values must be > 0"));
742+
}
743+
if sl.max_hops == 0 {
744+
return Err(anyhow::anyhow!("network.safety_limits.max_hops must be > 0"));
745+
}
746+
if sl.dedup_cache_max_entries == 0 {
747+
return Err(anyhow::anyhow!(
748+
"network.safety_limits.dedup_cache_max_entries must be > 0"
749+
));
750+
}
751+
if sl.dial_backoff_max_ms == 0 {
752+
return Err(anyhow::anyhow!(
753+
"network.safety_limits.dial_backoff_max_ms must be > 0"
754+
));
755+
}
756+
}
757+
758+
// Validate relay cache bounds
759+
{
760+
let rc = &self.network.relay_cache;
761+
if rc.max_entries == 0 || rc.target_entries == 0 {
762+
return Err(anyhow::anyhow!("network.relay_cache max/target must be > 0"));
763+
}
764+
if rc.max_entries < rc.target_entries {
765+
return Err(anyhow::anyhow!(
766+
"network.relay_cache.max_entries must be >= target_entries"
767+
));
768+
}
769+
if rc.retention_seconds == 0 {
770+
return Err(anyhow::anyhow!(
771+
"network.relay_cache.retention_seconds must be > 0"
772+
));
773+
}
774+
}
672775

673776
// Validate consensus configuration
674777
if self.consensus.cycle_duration_seconds < 10 {

crates/catalyst-cli/src/node.rs

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,37 @@ async fn persist_lsu_history(
133133
.await;
134134
}
135135

136-
#[derive(Debug, Default)]
136+
#[derive(Debug, Clone)]
137+
struct RelayCacheCfg {
138+
max_entries: usize,
139+
target_entries: usize,
140+
retention_ms: u64,
141+
}
142+
143+
impl Default for RelayCacheCfg {
144+
fn default() -> Self {
145+
Self {
146+
max_entries: 5000,
147+
target_entries: 4000,
148+
retention_ms: 10 * 60 * 1000,
149+
}
150+
}
151+
}
152+
153+
#[derive(Debug)]
137154
struct RelayCache {
155+
cfg: RelayCacheCfg,
138156
seen: std::collections::HashMap<String, u64>,
139157
}
140158

141159
impl RelayCache {
160+
fn new(cfg: RelayCacheCfg) -> Self {
161+
Self {
162+
cfg,
163+
seen: std::collections::HashMap::new(),
164+
}
165+
}
166+
142167
fn should_relay(&mut self, env: &MessageEnvelope, now_ms: u64) -> bool {
143168
if env.is_expired() {
144169
return false;
@@ -154,20 +179,18 @@ impl RelayCache {
154179
self.seen.insert(env.id.clone(), now_ms);
155180

156181
// Prune old ids (best-effort).
157-
let keep_after = now_ms.saturating_sub(10 * 60 * 1000);
182+
let keep_after = now_ms.saturating_sub(self.cfg.retention_ms);
158183
self.seen.retain(|_, ts| *ts >= keep_after);
159184

160185
// Cap size to prevent unbounded growth under attack.
161-
const MAX: usize = 5000;
162-
const TARGET: usize = 4000;
163-
if self.seen.len() > MAX {
186+
if self.seen.len() > self.cfg.max_entries {
164187
let mut v: Vec<(String, u64)> = self
165188
.seen
166189
.iter()
167190
.map(|(k, ts)| (k.clone(), *ts))
168191
.collect();
169192
v.sort_by_key(|(_, ts)| *ts);
170-
let drop_n = v.len().saturating_sub(TARGET);
193+
let drop_n = v.len().saturating_sub(self.cfg.target_entries);
171194
for (k, _) in v.into_iter().take(drop_n) {
172195
self.seen.remove(&k);
173196
}
@@ -1946,6 +1969,20 @@ impl CatalystNode {
19461969

19471970
// Put keypair in node dir (even if unused by simple transport).
19481971
net_cfg.peer.keypair_path = Some(self.config.storage.data_dir.join("p2p_keypair"));
1972+
net_cfg.peer.max_peers = self.config.network.max_peers as usize;
1973+
net_cfg.peer.min_peers = self.config.network.min_peers as usize;
1974+
1975+
// Wire safety limits from node config.
1976+
net_cfg.safety_limits.max_gossip_message_bytes = self.config.network.safety_limits.max_gossip_message_bytes;
1977+
net_cfg.safety_limits.per_peer_max_msgs_per_sec = self.config.network.safety_limits.per_peer_max_msgs_per_sec;
1978+
net_cfg.safety_limits.per_peer_max_bytes_per_sec = self.config.network.safety_limits.per_peer_max_bytes_per_sec;
1979+
net_cfg.safety_limits.max_tcp_frame_bytes = self.config.network.safety_limits.max_tcp_frame_bytes;
1980+
net_cfg.safety_limits.per_conn_max_msgs_per_sec = self.config.network.safety_limits.per_conn_max_msgs_per_sec;
1981+
net_cfg.safety_limits.per_conn_max_bytes_per_sec = self.config.network.safety_limits.per_conn_max_bytes_per_sec;
1982+
net_cfg.safety_limits.max_hops = self.config.network.safety_limits.max_hops;
1983+
net_cfg.safety_limits.dedup_cache_max_entries = self.config.network.safety_limits.dedup_cache_max_entries;
1984+
net_cfg.safety_limits.dial_jitter_max_ms = self.config.network.safety_limits.dial_jitter_max_ms;
1985+
net_cfg.safety_limits.dial_backoff_max_ms = self.config.network.safety_limits.dial_backoff_max_ms;
19491986
net_cfg.peer.bootstrap_peers = Vec::new();
19501987

19511988
let network = Arc::new(P2pService::new(net_cfg).await?);
@@ -2431,8 +2468,13 @@ impl CatalystNode {
24312468

24322469
let last_lsu: Arc<tokio::sync::RwLock<std::collections::HashMap<u64, catalyst_consensus::types::LedgerStateUpdate>>> =
24332470
Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
2471+
let relay_cfg = RelayCacheCfg {
2472+
max_entries: self.config.network.relay_cache.max_entries,
2473+
target_entries: self.config.network.relay_cache.target_entries,
2474+
retention_ms: self.config.network.relay_cache.retention_seconds.saturating_mul(1000),
2475+
};
24342476
let relay_cache: Arc<tokio::sync::Mutex<RelayCache>> =
2435-
Arc::new(tokio::sync::Mutex::new(RelayCache::default()));
2477+
Arc::new(tokio::sync::Mutex::new(RelayCache::new(relay_cfg)));
24362478
#[derive(Debug, Default)]
24372479
struct CatchupState {
24382480
observed_head_cycle: u64,

crates/catalyst-network/src/config.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ pub struct NetworkConfig {
103103

104104
/// Gossip protocol configuration
105105
pub gossip: GossipConfig,
106+
107+
/// Safety limits (DoS bounding) applied by transports.
108+
pub safety_limits: SafetyLimitsConfig,
106109

107110
/// Discovery configuration
108111
pub discovery: DiscoveryConfig,
@@ -120,6 +123,40 @@ pub struct NetworkConfig {
120123
pub monitoring: MonitoringConfig,
121124
}
122125

126+
/// Transport-agnostic DoS safety limits.
127+
#[derive(Debug, Clone, Serialize, Deserialize)]
128+
pub struct SafetyLimitsConfig {
129+
/// Maximum gossipsub message bytes accepted by the libp2p transport.
130+
pub max_gossip_message_bytes: usize,
131+
132+
/// Per-peer message rate limit (msgs/sec) for libp2p transport.
133+
pub per_peer_max_msgs_per_sec: u32,
134+
135+
/// Per-peer bandwidth cap (bytes/sec) for libp2p transport.
136+
pub per_peer_max_bytes_per_sec: usize,
137+
138+
/// Maximum TCP frame bytes accepted by the simple transport.
139+
pub max_tcp_frame_bytes: usize,
140+
141+
/// Per-connection message rate limit (msgs/sec) for simple transport.
142+
pub per_conn_max_msgs_per_sec: u32,
143+
144+
/// Per-connection bandwidth cap (bytes/sec) for simple transport.
145+
pub per_conn_max_bytes_per_sec: usize,
146+
147+
/// Maximum hops for multi-hop rebroadcast.
148+
pub max_hops: u8,
149+
150+
/// Maximum number of recently seen envelope ids stored for deduplication.
151+
pub dedup_cache_max_entries: usize,
152+
153+
/// Maximum dial jitter (milliseconds) applied to backoff scheduling.
154+
pub dial_jitter_max_ms: u64,
155+
156+
/// Maximum backoff cap (milliseconds) applied to exponential dial backoff.
157+
pub dial_backoff_max_ms: u64,
158+
}
159+
123160
/// Peer-specific configuration
124161
#[derive(Debug, Clone, Serialize, Deserialize)]
125162
pub struct PeerConfig {
@@ -1727,6 +1764,7 @@ impl Default for NetworkConfig {
17271764
peer: PeerConfig::default(),
17281765
transport: TransportConfig::default(),
17291766
gossip: GossipConfig::default(),
1767+
safety_limits: SafetyLimitsConfig::default(),
17301768
discovery: DiscoveryConfig::default(),
17311769
security: SecurityConfig::default(),
17321770
bandwidth: BandwidthConfig::default(),
@@ -1736,6 +1774,24 @@ impl Default for NetworkConfig {
17361774
}
17371775
}
17381776

1777+
impl Default for SafetyLimitsConfig {
1778+
fn default() -> Self {
1779+
Self {
1780+
// Match the previously hard-coded defaults in the transports.
1781+
max_gossip_message_bytes: 8 * 1024 * 1024,
1782+
per_peer_max_msgs_per_sec: 200,
1783+
per_peer_max_bytes_per_sec: 8 * 1024 * 1024,
1784+
max_tcp_frame_bytes: 8 * 1024 * 1024,
1785+
per_conn_max_msgs_per_sec: 200,
1786+
per_conn_max_bytes_per_sec: 8 * 1024 * 1024,
1787+
max_hops: 10,
1788+
dedup_cache_max_entries: 20_000,
1789+
dial_jitter_max_ms: 250,
1790+
dial_backoff_max_ms: 60_000,
1791+
}
1792+
}
1793+
}
1794+
17391795
impl Default for PeerConfig {
17401796
fn default() -> Self {
17411797
Self {
@@ -2537,6 +2593,38 @@ impl NetworkConfig {
25372593
));
25382594
}
25392595

2596+
// Validate safety limits
2597+
let sl = &self.safety_limits;
2598+
if sl.max_gossip_message_bytes == 0 || sl.max_tcp_frame_bytes == 0 {
2599+
return Err(NetworkError::ConfigError(
2600+
"safety_limits max message/frame bytes must be > 0".to_string(),
2601+
));
2602+
}
2603+
if sl.per_peer_max_msgs_per_sec == 0
2604+
|| sl.per_conn_max_msgs_per_sec == 0
2605+
|| sl.per_peer_max_bytes_per_sec == 0
2606+
|| sl.per_conn_max_bytes_per_sec == 0
2607+
{
2608+
return Err(NetworkError::ConfigError(
2609+
"safety_limits per-peer/per-conn budgets must be > 0".to_string(),
2610+
));
2611+
}
2612+
if sl.max_hops == 0 {
2613+
return Err(NetworkError::ConfigError(
2614+
"safety_limits.max_hops must be > 0".to_string(),
2615+
));
2616+
}
2617+
if sl.dedup_cache_max_entries == 0 {
2618+
return Err(NetworkError::ConfigError(
2619+
"safety_limits.dedup_cache_max_entries must be > 0".to_string(),
2620+
));
2621+
}
2622+
if sl.dial_backoff_max_ms == 0 {
2623+
return Err(NetworkError::ConfigError(
2624+
"safety_limits.dial_backoff_max_ms must be > 0".to_string(),
2625+
));
2626+
}
2627+
25402628
// Validate port ranges
25412629
if self.transport.tcp_port_range.0 >= self.transport.tcp_port_range.1 {
25422630
return Err(NetworkError::ConfigError(

0 commit comments

Comments
 (0)