Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ async def run_node(

event_source = await LiveNetworkEventSource.create()

# Set the fork digest for incoming message validation.
#
# Without this, the event source defaults to "0x00000000" and rejects
# all messages from other clients that use "devnet0".
event_source.set_fork_digest(GOSSIP_FORK_DIGEST)

# Subscribe to gossip topics.
#
# We subscribe before connecting to bootnodes so that when
Expand Down
59 changes: 29 additions & 30 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
+------------------+---------------------------------------------+
| data_length | Varint: byte length of compressed data |
+------------------+---------------------------------------------+
| data | Snappy-framed SSZ-encoded message |
| data | Snappy-compressed SSZ-encoded message |
+------------------+---------------------------------------------+

Varints use LEB128 encoding (1-10 bytes depending on value).
Expand Down Expand Up @@ -78,7 +78,7 @@
- Fixed overhead: Known sizes enable buffer pre-allocation.

Snappy compression reduces bandwidth by 50-70% for typical blocks.
The framing format adds CRC32C checksums for corruption detection.
Gossip uses raw Snappy block format. Req-resp uses Snappy framing with CRC32C.


GOSSIPSUB v1.1 REQUIREMENTS
Expand All @@ -105,7 +105,7 @@
from dataclasses import dataclass, field
from typing import Final, Protocol, Self

from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.snappy import SnappyDecompressionError, decompress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
from lean_spec.subspecs.networking.config import (
Expand Down Expand Up @@ -218,7 +218,7 @@ class GossipHandler:
Topics contain:

- Fork digest: 4-byte identifier derived from genesis + fork version.
- Message type: "block" or "attestation".
- Message type: "blocks" or "attestation".
- Encoding: Always "ssz_snappy" for Ethereum.

Validating the topic prevents:
Expand Down Expand Up @@ -273,7 +273,7 @@ def decode_message(
ForkMismatchError.

Args:
topic_str: Full topic string (e.g., "/leanconsensus/0x.../block/ssz_snappy").
topic_str: Full topic string (e.g., "/leanconsensus/0x.../blocks/ssz_snappy").
compressed_data: Snappy-compressed SSZ data.

Returns:
Expand All @@ -296,19 +296,14 @@ def decode_message(
except ValueError as e:
raise GossipMessageError(f"Invalid topic: {e}") from e

# Step 2: Decompress Snappy-framed data.
# Step 2: Decompress raw Snappy data.
#
# Ethereum uses Snappy framing format for gossip (same as req/resp).
# Framed Snappy includes stream identifier and CRC32C checksums.
#
# Decompression fails if:
# - Stream identifier is missing or invalid.
# - CRC checksum mismatch (data corruption).
# - Compressed data is truncated.
# Gossip uses raw Snappy block format (not framing).
# This matches libp2p gossipsub's SnappyTransform behavior.
#
# Failed decompression indicates network corruption or a malicious peer.
try:
ssz_bytes = frame_decompress(compressed_data)
ssz_bytes = decompress(compressed_data)
except SnappyDecompressionError as e:
raise GossipMessageError(f"Snappy decompression failed: {e}") from e

Expand Down Expand Up @@ -689,7 +684,7 @@ def subscribe_gossip_topic(self, topic: str) -> None:
in mesh management via GRAFT/PRUNE.

Args:
topic: Full topic string (e.g., "/leanconsensus/0x.../block/ssz_snappy").
topic: Full topic string (e.g., "/leanconsensus/0x.../blocks/ssz_snappy").
"""
self._gossipsub_behavior.subscribe(topic)
logger.debug("Subscribed to gossip topic %s", topic)
Expand Down Expand Up @@ -734,7 +729,8 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None:
"""
Handle a message received via GossipSub.

Decodes the message and emits the appropriate event type.
Event data is already decompressed by the gossipsub behavior.
Decodes SSZ bytes directly based on topic kind.

Args:
event: GossipSub message event from the behavior.
Expand All @@ -743,20 +739,23 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None:
# Parse the topic to determine message type.
topic = self._gossip_handler.get_topic(event.topic)

# Decompress and decode the message.
message = self._gossip_handler.decode_message(event.topic, event.data)

# Emit the appropriate event.
match topic.kind:
case TopicKind.BLOCK:
if isinstance(message, SignedBlockWithAttestation):
await self._emit_gossip_block(message, event.peer_id)
case TopicKind.ATTESTATION_SUBNET:
if isinstance(message, SignedAttestation):
await self._emit_gossip_attestation(message, event.peer_id)
case TopicKind.AGGREGATED_ATTESTATION:
if isinstance(message, SignedAggregatedAttestation):
await self._emit_gossip_aggregated_attestation(message, event.peer_id)
# Decode SSZ bytes directly.
#
# The gossipsub behavior already decompressed the Snappy payload
# during message ID computation. The event carries decompressed data.
try:
match topic.kind:
case TopicKind.BLOCK:
block = SignedBlockWithAttestation.decode_bytes(event.data)
await self._emit_gossip_block(block, event.peer_id)
case TopicKind.ATTESTATION_SUBNET:
att = SignedAttestation.decode_bytes(event.data)
await self._emit_gossip_attestation(att, event.peer_id)
case TopicKind.AGGREGATED_ATTESTATION:
agg = SignedAggregatedAttestation.decode_bytes(event.data)
await self._emit_gossip_aggregated_attestation(agg, event.peer_id)
except SSZSerializationError as e:
raise GossipMessageError(f"SSZ decode failed: {e}") from e

logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id)

Expand Down
8 changes: 4 additions & 4 deletions src/lean_spec/subspecs/networking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
response, including all chunks for multi-part responses like BlocksByRange.
"""

MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = DomainType(b"\x00")
"""1-byte domain for gossip message-id isolation of invalid snappy messages.
MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = DomainType(b"\x00\x00\x00\x00")
"""4-byte domain for gossip message-id isolation of invalid snappy messages.

Per Ethereum spec, prepended to the message hash when decompression fails.
"""

MESSAGE_DOMAIN_VALID_SNAPPY: Final[DomainType] = DomainType(b"\x01")
"""1-byte domain for gossip message-id isolation of valid snappy messages.
MESSAGE_DOMAIN_VALID_SNAPPY: Final[DomainType] = DomainType(b"\x01\x00\x00\x00")
"""4-byte domain for gossip message-id isolation of valid snappy messages.

Per Ethereum spec, prepended to the message hash when decompression succeeds.
"""
Expand Down
59 changes: 51 additions & 8 deletions src/lean_spec/subspecs/networking/gossipsub/behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@
from itertools import count
from typing import ClassVar, Final, cast

from lean_spec.subspecs.networking.config import PRUNE_BACKOFF
from lean_spec.snappy import decompress as snappy_raw_decompress
from lean_spec.subspecs.networking.config import (
MESSAGE_DOMAIN_INVALID_SNAPPY,
MESSAGE_DOMAIN_VALID_SNAPPY,
PRUNE_BACKOFF,
)
from lean_spec.subspecs.networking.gossipsub.mcache import MessageCache, SeenCache
from lean_spec.subspecs.networking.gossipsub.mesh import MeshState
from lean_spec.subspecs.networking.gossipsub.message import GossipsubMessage
Expand All @@ -89,6 +94,22 @@
logger = logging.getLogger(__name__)


def _try_decompress(data: bytes) -> tuple[bytes, bytes]:
"""Attempt Snappy decompression and return data with domain.

Decompress once upfront so that message ID computation and event delivery share the result.

Returns:
Tuple of (data_for_hash, domain_bytes).
On success: (decompressed_data, VALID_SNAPPY).
On failure: (raw_data, INVALID_SNAPPY).
"""
try:
return snappy_raw_decompress(data), MESSAGE_DOMAIN_VALID_SNAPPY
except Exception:
return data, MESSAGE_DOMAIN_INVALID_SNAPPY


@dataclass(slots=True)
class GossipsubMessageEvent:
"""Event emitted when a valid message is received."""
Expand All @@ -100,7 +121,7 @@ class GossipsubMessageEvent:
"""Topic the message belongs to."""

data: bytes
"""Message payload (may be compressed)."""
"""Message payload (decompressed if Snappy decompression succeeded)."""

message_id: MessageId
"""Computed message ID."""
Expand Down Expand Up @@ -417,15 +438,26 @@ async def publish(self, topic: str, data: bytes) -> None:
data: Message payload.
"""
msg = Message(topic=topic, data=data)
msg_id = GossipsubMessage.compute_id(topic.encode("utf-8"), data)
topic_bytes = topic.encode("utf-8")

# Decompress once for message ID computation.
#
# Data is decompressed before the message ID function runs.
# The domain indicates whether:
# - decompression succeeded (VALID_SNAPPY),
# - failed (INVALID_SNAPPY).
decompressed, domain = _try_decompress(data)
msg_id = GossipsubMessage.compute_id(topic_bytes, decompressed, domain=domain)

if self.seen_cache.has(msg_id):
logger.debug("Skipping duplicate message %s", msg_id.hex()[:8])
return

self.seen_cache.add(msg_id, time.time())

cache_msg = GossipsubMessage(topic=topic.encode("utf-8"), raw_data=data)
# Cache stores compressed data for IWANT responses.
cache_msg = GossipsubMessage(topic=topic_bytes, raw_data=data)
cache_msg._cached_id = msg_id
self.message_cache.put(topic, cache_msg)

if topic in self.mesh.subscriptions:
Expand Down Expand Up @@ -547,15 +579,24 @@ async def _handle_message(self, peer_id: PeerId, msg: Message) -> None:
if not msg.topic:
return

msg_id = GossipsubMessage.compute_id(msg.topic.encode("utf-8"), msg.data)
topic_bytes = msg.topic.encode("utf-8")

# Decompress once for message ID computation and event delivery.
#
# Data is decompressed before the message ID function runs.
# The decompressed data is also passed to the event consumer,
# eliminating a second decompression there.
decompressed, domain = _try_decompress(msg.data)
msg_id = GossipsubMessage.compute_id(topic_bytes, decompressed, domain=domain)

# Deduplicate: each message is processed at most once.
if self.seen_cache.has(msg_id):
return
self.seen_cache.add(msg_id, time.time())

# Cache for IWANT responses to peers who receive our IHAVE gossip.
cache_msg = GossipsubMessage(topic=msg.topic.encode("utf-8"), raw_data=msg.data)
# Cache stores compressed data for IWANT responses.
cache_msg = GossipsubMessage(topic=topic_bytes, raw_data=msg.data)
cache_msg._cached_id = msg_id
self.message_cache.put(msg.topic, cache_msg)

# Only forward on topics we participate in (have a mesh for).
Expand Down Expand Up @@ -584,8 +625,10 @@ async def _handle_message(self, peer_id: PeerId, msg: Message) -> None:
if mesh_peer != peer_id:
await self._send_rpc(mesh_peer, idontwant_rpc)

# Event carries decompressed data for the consumer.
# This eliminates the need for a second decompression in the event handler.
event = GossipsubMessageEvent(
peer_id=peer_id, topic=msg.topic, data=msg.data, message_id=msg_id
peer_id=peer_id, topic=msg.topic, data=decompressed, message_id=msg_id
)
await self._event_queue.put(event)

Expand Down
14 changes: 11 additions & 3 deletions src/lean_spec/subspecs/networking/gossipsub/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

**Domain Bytes:**

- ``0x01`` (VALID_SNAPPY): Snappy decompression succeeded, use decompressed data
- ``0x00`` (INVALID_SNAPPY): Decompression failed or no decompressor, use raw data
- ``0x01000000`` (VALID_SNAPPY): Snappy decompression succeeded, use decompressed data
- ``0x00000000`` (INVALID_SNAPPY): Decompression failed or no decompressor, use raw data

This ensures messages with compression issues get different IDs,
preventing cache pollution from invalid variants.
Expand Down Expand Up @@ -135,6 +135,8 @@ def compute_id(
topic: bytes,
data: bytes,
snappy_decompress: SnappyDecompressor | None = None,
*,
domain: bytes | None = None,
) -> MessageId:
"""Compute a 20-byte message ID from raw data.

Expand All @@ -145,6 +147,8 @@ def compute_id(
Domain Selection
----------------

- If `domain` is explicitly provided:
use it directly (data is assumed pre-processed by the caller)
- If `snappy_decompress` is provided and succeeds:
domain = 0x01, use decompressed data
- Otherwise:
Expand All @@ -154,11 +158,15 @@ def compute_id(
topic: Topic string as bytes.
data: Message payload (potentially compressed).
snappy_decompress: Optional decompression function.
domain: Explicit domain bytes. When provided, data is used as-is.

Returns:
20-byte message ID.
"""
if snappy_decompress is not None:
if domain is not None:
# Caller already determined the domain (e.g., after pre-decompression).
data_for_hash = data
elif snappy_decompress is not None:
try:
data_for_hash = snappy_decompress(data)
domain = MESSAGE_DOMAIN_VALID_SNAPPY
Expand Down
8 changes: 4 additions & 4 deletions src/lean_spec/subspecs/networking/gossipsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/{prefix}/{fork_digest}/{topic_name}/{encoding}

Example: /leanconsensus/0x12345678/block/ssz_snappy
Example: /leanconsensus/0x12345678/blocks/ssz_snappy

**Components:**

Expand All @@ -28,7 +28,7 @@
+----------------+----------------------------------------------------------+
| fork_digest | 4-byte fork identifier as hex (`0x12345678`) |
+----------------+----------------------------------------------------------+
| topic_name | Message type (`block`, `attestation`) |
| topic_name | Message type (`blocks`, `attestation`) |
+----------------+----------------------------------------------------------+
| encoding | Serialization format (always `ssz_snappy`) |
+----------------+----------------------------------------------------------+
Expand All @@ -46,7 +46,7 @@
+----------------+----------------------------------------------------------+
| Topic | Content |
+================+==========================================================+
| block | Signed beacon blocks |
| blocks | Signed beacon blocks |
+----------------+----------------------------------------------------------+
| attestation | Signed attestations |
+----------------+----------------------------------------------------------+
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(self, expected: str, actual: str) -> None:
with Snappy compression.
"""

BLOCK_TOPIC_NAME: Final = "block"
BLOCK_TOPIC_NAME: Final = "blocks"
"""Topic name for block messages.

Used in the topic string to identify signed beacon block messages.
Expand Down
8 changes: 4 additions & 4 deletions src/lean_spec/subspecs/networking/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from lean_spec.snappy import frame_compress
from lean_spec.snappy import compress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
from lean_spec.subspecs.containers.validator import SubnetId
Expand Down Expand Up @@ -209,7 +209,7 @@ async def publish_block(self, block: SignedBlockWithAttestation) -> None:
"""
topic = GossipTopic.block(self.fork_digest)
ssz_bytes = block.encode_bytes()
compressed = frame_compress(ssz_bytes)
compressed = compress(ssz_bytes)

await self.event_source.publish(str(topic), compressed)
logger.debug("Published block at slot %s", block.message.block.slot)
Expand All @@ -229,7 +229,7 @@ async def publish_attestation(
"""
topic = GossipTopic.attestation_subnet(self.fork_digest, subnet_id)
ssz_bytes = attestation.encode_bytes()
compressed = frame_compress(ssz_bytes)
compressed = compress(ssz_bytes)

await self.event_source.publish(str(topic), compressed)
logger.debug("Published attestation for slot %s", attestation.data.slot)
Expand All @@ -245,7 +245,7 @@ async def publish_aggregated_attestation(
"""
topic = GossipTopic.committee_aggregation(self.fork_digest)
ssz_bytes = signed_attestation.encode_bytes()
compressed = frame_compress(ssz_bytes)
compressed = compress(ssz_bytes)

await self.event_source.publish(str(topic), compressed)
logger.debug("Published aggregated attestation for slot %s", signed_attestation.data.slot)
Loading
Loading