From 9c8d69977257347cecef01460bb4d13daae7d675 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 04:35:45 +0100 Subject: [PATCH 1/8] Split HLC coordinator supporting types into separate files --- src/Distributed/HlcClusterRegistry.cs | 69 +++++++ src/Distributed/HlcCoordinator.cs | 282 -------------------------- src/Distributed/HlcMessageHeader.cs | 114 +++++++++++ src/Distributed/HlcStatistics.cs | 127 ++++++++++++ 4 files changed, 310 insertions(+), 282 deletions(-) create mode 100644 src/Distributed/HlcClusterRegistry.cs create mode 100644 src/Distributed/HlcMessageHeader.cs create mode 100644 src/Distributed/HlcStatistics.cs diff --git a/src/Distributed/HlcClusterRegistry.cs b/src/Distributed/HlcClusterRegistry.cs new file mode 100644 index 0000000..51fc3a2 --- /dev/null +++ b/src/Distributed/HlcClusterRegistry.cs @@ -0,0 +1,69 @@ +using System.Collections.Concurrent; + +namespace Clockworks.Distributed; + +/// +/// Registry for tracking multiple HLC nodes in a cluster. +/// Useful for testing and simulation of distributed systems. +/// +public sealed class HlcClusterRegistry +{ + private readonly ConcurrentDictionary _nodes = new(); + private readonly TimeProvider _sharedTimeProvider; + + /// + /// Creates a registry for tracking multiple HLC nodes that share a single . + /// + public HlcClusterRegistry(TimeProvider timeProvider) + { + _sharedTimeProvider = timeProvider; + } + + /// + /// Register a node in the cluster. + /// + public HlcGuidFactory RegisterNode(ushort nodeId, HlcOptions? options = null) + { + return _nodes.GetOrAdd(nodeId, id => new HlcGuidFactory(_sharedTimeProvider, id, options)); + } + + /// + /// Get a registered node. + /// + public HlcGuidFactory? GetNode(ushort nodeId) + { + return _nodes.TryGetValue(nodeId, out var factory) ? factory : null; + } + + /// + /// Simulate sending a message from one node to another. + /// Updates receiver's clock based on sender's timestamp. + /// + public void SimulateMessage(ushort senderId, ushort receiverId) + { + if (!_nodes.TryGetValue(senderId, out var sender)) + throw new ArgumentException($"Sender node {senderId} not registered"); + if (!_nodes.TryGetValue(receiverId, out var receiver)) + throw new ArgumentException($"Receiver node {receiverId} not registered"); + + var (_, senderTimestamp) = sender.NewGuidWithHlc(); + receiver.Witness(senderTimestamp); + } + + /// + /// Get all registered nodes. + /// + public IEnumerable<(ushort NodeId, HlcGuidFactory Factory)> GetAllNodes() + { + return _nodes.Select(kvp => (kvp.Key, kvp.Value)); + } + + /// + /// Get the maximum logical time across all nodes. + /// Useful for determining global ordering. + /// + public long GetMaxLogicalTime() + { + return _nodes.Values.Max(f => f.CurrentTimestamp.WallTimeMs); + } +} diff --git a/src/Distributed/HlcCoordinator.cs b/src/Distributed/HlcCoordinator.cs index abd2fc3..676c58d 100644 --- a/src/Distributed/HlcCoordinator.cs +++ b/src/Distributed/HlcCoordinator.cs @@ -1,5 +1,3 @@ -using System.Collections.Concurrent; - namespace Clockworks.Distributed; /// @@ -92,283 +90,3 @@ public Guid NewLocalEventGuid() /// public HlcTimestamp CurrentTimestamp => _factory.CurrentTimestamp; } - -/// -/// Statistics tracking for HLC behavior analysis. -/// Thread-safe for concurrent updates. -/// -public sealed class HlcStatistics -{ - private long _localEventCount; - private long _sendCount; - private long _receiveCount; - private long _clockAdvances; // Times we advanced due to remote - private long _maxObservedDrift; - private long _maxRemoteAheadMs; - private long _maxRemoteBehindMs; - private long _remoteAheadCount; - - /// - /// Total number of local events recorded. - /// - public long LocalEventCount => Volatile.Read(ref _localEventCount); - - /// - /// Total number of send events recorded. - /// - public long SendCount => Volatile.Read(ref _sendCount); - - /// - /// Total number of receive events recorded. - /// - public long ReceiveCount => Volatile.Read(ref _receiveCount); - - /// - /// Number of times the local clock advanced due to observing a remote timestamp. - /// - public long ClockAdvances => Volatile.Read(ref _clockAdvances); - - /// - /// Maximum absolute observed drift (in milliseconds) between a received remote timestamp and the local timestamp prior to witnessing. - /// - public long MaxObservedDriftMs => Volatile.Read(ref _maxObservedDrift); - - /// - /// Maximum observed amount (in milliseconds) by which a remote timestamp was ahead of the local timestamp prior to witnessing. - /// - public long MaxRemoteAheadMs => Volatile.Read(ref _maxRemoteAheadMs); - - /// - /// Maximum observed amount (in milliseconds) by which a remote timestamp was behind the local timestamp prior to witnessing. - /// - public long MaxRemoteBehindMs => Volatile.Read(ref _maxRemoteBehindMs); - - /// - /// Number of receive events where the remote timestamp was ahead of or equal to the local timestamp prior to witnessing. - /// - public long RemoteAheadCount => Volatile.Read(ref _remoteAheadCount); - - internal void RecordLocalEvent(HlcTimestamp timestamp) - { - Interlocked.Increment(ref _localEventCount); - } - - internal void RecordSend(HlcTimestamp timestamp) - { - Interlocked.Increment(ref _sendCount); - } - - internal void RecordReceive(HlcTimestamp before, HlcTimestamp after, HlcTimestamp remote) - { - Interlocked.Increment(ref _receiveCount); - - // Clock advances due to remote when the remote timestamp is ahead of the local timestamp - // prior to witnessing and the post-witness wall time matches the remote wall time. - if (remote > before && after.WallTimeMs == remote.WallTimeMs) - { - Interlocked.Increment(ref _clockAdvances); - } - - var delta = remote.WallTimeMs - before.WallTimeMs; - if (delta >= 0) - { - Interlocked.Increment(ref _remoteAheadCount); - InterlockedMax(ref _maxRemoteAheadMs, delta); - } - else - { - InterlockedMax(ref _maxRemoteBehindMs, -delta); - } - - // Track absolute drift - InterlockedMax(ref _maxObservedDrift, Math.Abs(delta)); - } - - private static void InterlockedMax(ref long location, long value) - { - long current = Volatile.Read(ref location); - while (value > current) - { - var previous = Interlocked.CompareExchange(ref location, value, current); - if (previous == current) break; - current = previous; - } - } - - /// - /// Resets all counters to zero. - /// - public void Reset() - { - _localEventCount = 0; - _sendCount = 0; - _receiveCount = 0; - _clockAdvances = 0; - _maxObservedDrift = 0; - _maxRemoteAheadMs = 0; - _maxRemoteBehindMs = 0; - _remoteAheadCount = 0; - } - - /// - /// Returns a human-readable summary of the current statistics. - /// - public override string ToString() => - $"Local: {LocalEventCount}, Send: {SendCount}, Receive: {ReceiveCount}, " + - $"Advances: {ClockAdvances}, MaxDrift: {MaxObservedDriftMs}ms, " + - $"MaxAhead: {MaxRemoteAheadMs}ms, MaxBehind: {MaxRemoteBehindMs}ms"; -} - -/// -/// Message header for propagating HLC timestamps across service boundaries. -/// Can be serialized to/from various wire formats. -/// -public readonly record struct HlcMessageHeader -{ - /// - /// Standard header name for HTTP/gRPC. - /// - public const string HeaderName = "X-HLC-Timestamp"; - - /// - /// The HLC timestamp to propagate. - /// - public HlcTimestamp Timestamp { get; init; } - - /// - /// Optional correlation identifier. - /// - public Guid? CorrelationId { get; init; } - - /// - /// Optional causation identifier. - /// - public Guid? CausationId { get; init; } - - /// - /// Creates a new message header instance. - /// - public HlcMessageHeader(HlcTimestamp timestamp, Guid? correlationId = null, Guid? causationId = null) - { - Timestamp = timestamp; - CorrelationId = correlationId; - CausationId = causationId; - } - - /// - /// Serialize to a compact string for HTTP headers. - /// Format: "timestamp.counter@node[;correlation;causation]" - /// - public override string ToString() - { - var result = Timestamp.ToString(); - if (CorrelationId.HasValue) - { - result += $";{CorrelationId.Value:N}"; - if (CausationId.HasValue) - { - result += $";{CausationId.Value:N}"; - } - } - return result; - } - - /// - /// Parse from header string. - /// - public static HlcMessageHeader Parse(string value) - { - var parts = value.Split(';'); - var timestamp = HlcTimestamp.Parse(parts[0]); - - Guid? correlation = parts.Length > 1 ? Guid.Parse(parts[1]) : null; - Guid? causation = parts.Length > 2 ? Guid.Parse(parts[2]) : null; - - return new HlcMessageHeader(timestamp, correlation, causation); - } - - /// - /// Try to parse from header string. - /// - public static bool TryParse(string? value, out HlcMessageHeader header) - { - header = default; - if (string.IsNullOrEmpty(value)) return false; - - try - { - header = Parse(value); - return true; - } - catch - { - return false; - } - } -} - -/// -/// Registry for tracking multiple HLC nodes in a cluster. -/// Useful for testing and simulation of distributed systems. -/// -public sealed class HlcClusterRegistry -{ - private readonly ConcurrentDictionary _nodes = new(); - private readonly TimeProvider _sharedTimeProvider; - - /// - /// Creates a registry for tracking multiple HLC nodes that share a single . - /// - public HlcClusterRegistry(TimeProvider timeProvider) - { - _sharedTimeProvider = timeProvider; - } - - /// - /// Register a node in the cluster. - /// - public HlcGuidFactory RegisterNode(ushort nodeId, HlcOptions? options = null) - { - return _nodes.GetOrAdd(nodeId, id => new HlcGuidFactory(_sharedTimeProvider, id, options)); - } - - /// - /// Get a registered node. - /// - public HlcGuidFactory? GetNode(ushort nodeId) - { - return _nodes.TryGetValue(nodeId, out var factory) ? factory : null; - } - - /// - /// Simulate sending a message from one node to another. - /// Updates receiver's clock based on sender's timestamp. - /// - public void SimulateMessage(ushort senderId, ushort receiverId) - { - if (!_nodes.TryGetValue(senderId, out var sender)) - throw new ArgumentException($"Sender node {senderId} not registered"); - if (!_nodes.TryGetValue(receiverId, out var receiver)) - throw new ArgumentException($"Receiver node {receiverId} not registered"); - - var (_, senderTimestamp) = sender.NewGuidWithHlc(); - receiver.Witness(senderTimestamp); - } - - /// - /// Get all registered nodes. - /// - public IEnumerable<(ushort NodeId, HlcGuidFactory Factory)> GetAllNodes() - { - return _nodes.Select(kvp => (kvp.Key, kvp.Value)); - } - - /// - /// Get the maximum logical time across all nodes. - /// Useful for determining global ordering. - /// - public long GetMaxLogicalTime() - { - return _nodes.Values.Max(f => f.CurrentTimestamp.WallTimeMs); - } -} diff --git a/src/Distributed/HlcMessageHeader.cs b/src/Distributed/HlcMessageHeader.cs new file mode 100644 index 0000000..7f5b7bb --- /dev/null +++ b/src/Distributed/HlcMessageHeader.cs @@ -0,0 +1,114 @@ +namespace Clockworks.Distributed; + +/// +/// Message header for propagating HLC timestamps across service boundaries. +/// Can be serialized to/from various wire formats. +/// +public readonly record struct HlcMessageHeader +{ + /// + /// Standard header name for HTTP/gRPC. + /// + public const string HeaderName = "X-HLC-Timestamp"; + + /// + /// The HLC timestamp to propagate. + /// + public HlcTimestamp Timestamp { get; init; } + + /// + /// Optional correlation identifier. + /// + public Guid? CorrelationId { get; init; } + + /// + /// Optional causation identifier. + /// + public Guid? CausationId { get; init; } + + /// + /// Creates a new message header instance. + /// + public HlcMessageHeader(HlcTimestamp timestamp, Guid? correlationId = null, Guid? causationId = null) + { + Timestamp = timestamp; + CorrelationId = correlationId; + CausationId = causationId; + } + + /// + /// Serialize to a compact string for HTTP headers. + /// Format: "timestamp.counter@node[;correlation;causation]" + /// + public override string ToString() + { + var timestamp = Timestamp.ToString(); + if (!CorrelationId.HasValue) + return timestamp; + + var correlation = CorrelationId.GetValueOrDefault(); + if (!CausationId.HasValue) + { + return string.Create(timestamp.Length + 33, (timestamp, correlation), static (span, state) => + { + state.timestamp.AsSpan().CopyTo(span); + span[state.timestamp.Length] = ';'; + state.correlation.TryFormat(span[(state.timestamp.Length + 1)..], out _, "N"); + }); + } + + var causation = CausationId.GetValueOrDefault(); + return string.Create(timestamp.Length + 66, (timestamp, correlation, causation), static (span, state) => + { + state.timestamp.AsSpan().CopyTo(span); + var offset = state.timestamp.Length; + span[offset++] = ';'; + state.correlation.TryFormat(span[offset..], out _, "N"); + offset += 32; + span[offset++] = ';'; + state.causation.TryFormat(span[offset..], out _, "N"); + }); + } + + /// + /// Parse from header string. + /// + public static HlcMessageHeader Parse(string value) + { + var span = value.AsSpan(); + var firstSep = span.IndexOf(';'); + + if (firstSep < 0) + return new HlcMessageHeader(HlcTimestamp.Parse(value)); + + var timestamp = HlcTimestamp.Parse(span[..firstSep]); + span = span[(firstSep + 1)..]; + + var secondSep = span.IndexOf(';'); + if (secondSep < 0) + return new HlcMessageHeader(timestamp, Guid.Parse(span)); + + var correlation = Guid.Parse(span[..secondSep]); + var causation = Guid.Parse(span[(secondSep + 1)..]); + return new HlcMessageHeader(timestamp, correlation, causation); + } + + /// + /// Try to parse from header string. + /// + public static bool TryParse(string? value, out HlcMessageHeader header) + { + header = default; + if (string.IsNullOrEmpty(value)) return false; + + try + { + header = Parse(value); + return true; + } + catch + { + return false; + } + } +} diff --git a/src/Distributed/HlcStatistics.cs b/src/Distributed/HlcStatistics.cs new file mode 100644 index 0000000..e72b33b --- /dev/null +++ b/src/Distributed/HlcStatistics.cs @@ -0,0 +1,127 @@ +namespace Clockworks.Distributed; + +/// +/// Statistics tracking for HLC behavior analysis. +/// Thread-safe for concurrent updates. +/// +public sealed class HlcStatistics +{ + private long _localEventCount; + private long _sendCount; + private long _receiveCount; + private long _clockAdvances; // Times we advanced due to remote + private long _maxObservedDrift; + private long _maxRemoteAheadMs; + private long _maxRemoteBehindMs; + private long _remoteAheadCount; + + /// + /// Total number of local events recorded. + /// + public long LocalEventCount => Volatile.Read(ref _localEventCount); + + /// + /// Total number of send events recorded. + /// + public long SendCount => Volatile.Read(ref _sendCount); + + /// + /// Total number of receive events recorded. + /// + public long ReceiveCount => Volatile.Read(ref _receiveCount); + + /// + /// Number of times the local clock advanced due to observing a remote timestamp. + /// + public long ClockAdvances => Volatile.Read(ref _clockAdvances); + + /// + /// Maximum absolute observed drift (in milliseconds) between a received remote timestamp and the local timestamp prior to witnessing. + /// + public long MaxObservedDriftMs => Volatile.Read(ref _maxObservedDrift); + + /// + /// Maximum observed amount (in milliseconds) by which a remote timestamp was ahead of the local timestamp prior to witnessing. + /// + public long MaxRemoteAheadMs => Volatile.Read(ref _maxRemoteAheadMs); + + /// + /// Maximum observed amount (in milliseconds) by which a remote timestamp was behind the local timestamp prior to witnessing. + /// + public long MaxRemoteBehindMs => Volatile.Read(ref _maxRemoteBehindMs); + + /// + /// Number of receive events where the remote timestamp was ahead of or equal to the local timestamp prior to witnessing. + /// + public long RemoteAheadCount => Volatile.Read(ref _remoteAheadCount); + + internal void RecordLocalEvent(HlcTimestamp timestamp) + { + Interlocked.Increment(ref _localEventCount); + } + + internal void RecordSend(HlcTimestamp timestamp) + { + Interlocked.Increment(ref _sendCount); + } + + internal void RecordReceive(HlcTimestamp before, HlcTimestamp after, HlcTimestamp remote) + { + Interlocked.Increment(ref _receiveCount); + + // Clock advances due to remote when the remote timestamp is ahead of the local timestamp + // prior to witnessing and the post-witness wall time matches the remote wall time. + if (remote > before && after.WallTimeMs == remote.WallTimeMs) + { + Interlocked.Increment(ref _clockAdvances); + } + + var delta = remote.WallTimeMs - before.WallTimeMs; + if (delta >= 0) + { + Interlocked.Increment(ref _remoteAheadCount); + InterlockedMax(ref _maxRemoteAheadMs, delta); + } + else + { + InterlockedMax(ref _maxRemoteBehindMs, -delta); + } + + // Track absolute drift + InterlockedMax(ref _maxObservedDrift, long.Abs(delta)); + } + + private static void InterlockedMax(ref long location, long value) + { + long current = Volatile.Read(ref location); + while (value > current) + { + var previous = Interlocked.CompareExchange(ref location, value, current); + if (previous == current) break; + current = previous; + } + } + + /// + /// Resets all counters to zero. + /// + public void Reset() + { + _localEventCount = 0; + _sendCount = 0; + _receiveCount = 0; + _clockAdvances = 0; + _maxObservedDrift = 0; + _maxRemoteAheadMs = 0; + _maxRemoteBehindMs = 0; + _remoteAheadCount = 0; + } + + /// + /// Returns a human-readable summary of the current statistics. + /// + public override string ToString() => + $"Local: {LocalEventCount}, Send: {SendCount}, Receive: {ReceiveCount}, " + + $"Advances: {ClockAdvances}, MaxDrift: {MaxObservedDriftMs}ms, " + + $"MaxAhead: {MaxRemoteAheadMs}ms, MaxBehind: {MaxRemoteBehindMs}ms"; +} From 7b0ccd31e2bb87b850c1b145a1920d7bb5a842db Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 04:35:49 +0100 Subject: [PATCH 2/8] Optimize HLC timestamp serialization and parsing --- src/Distributed/HlcTimestamp.cs | 61 +++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/src/Distributed/HlcTimestamp.cs b/src/Distributed/HlcTimestamp.cs index 91ff7a6..9d76a88 100644 --- a/src/Distributed/HlcTimestamp.cs +++ b/src/Distributed/HlcTimestamp.cs @@ -1,4 +1,6 @@ -namespace Clockworks.Distributed; +using System.Buffers.Binary; + +namespace Clockworks.Distributed; /// /// Represents a Hybrid Logical Clock timestamp. @@ -57,9 +59,12 @@ public long ToPackedInt64() // 48 bits for timestamp (good until year 10889) // 12 bits for counter (0-4095) // 4 bits for node ID (0-15, use for small clusters) - return (WallTimeMs << 16) - | ((long)((ushort)(Counter & 0x0FFF)) << 4) - | (long)(NodeId & 0x000F); + ulong wall = (ulong)WallTimeMs; + ulong counter = (ulong)(Counter & 0x0FFF); + ulong node = (ulong)(NodeId & 0x000F); + + ulong packed = (wall << 16) | (counter << 4) | node; + return unchecked((long)packed); } /// @@ -83,16 +88,16 @@ public void WriteTo(Span destination) throw new ArgumentException("Destination must be at least 10 bytes", nameof(destination)); // Big-endian for lexicographic sorting - destination[0] = (byte)(WallTimeMs >> 40); - destination[1] = (byte)(WallTimeMs >> 32); - destination[2] = (byte)(WallTimeMs >> 24); - destination[3] = (byte)(WallTimeMs >> 16); - destination[4] = (byte)(WallTimeMs >> 8); - destination[5] = (byte)WallTimeMs; - destination[6] = (byte)(Counter >> 8); - destination[7] = (byte)Counter; - destination[8] = (byte)(NodeId >> 8); - destination[9] = (byte)NodeId; + // Use lower 48 bits of WallTimeMs for stable 6-byte encoding. + var wallTime = (ulong)WallTimeMs; + destination[0] = (byte)(wallTime >> 40); + destination[1] = (byte)(wallTime >> 32); + destination[2] = (byte)(wallTime >> 24); + destination[3] = (byte)(wallTime >> 16); + destination[4] = (byte)(wallTime >> 8); + destination[5] = (byte)wallTime; + BinaryPrimitives.WriteUInt16BigEndian(destination.Slice(6, 2), Counter); + BinaryPrimitives.WriteUInt16BigEndian(destination.Slice(8, 2), NodeId); } /// @@ -103,11 +108,12 @@ public static HlcTimestamp ReadFrom(ReadOnlySpan source) if (source.Length < 10) throw new ArgumentException("Source must be at least 10 bytes", nameof(source)); + // 6-byte big-endian wall time (lower 48-bits) long wallTime = ((long)source[0] << 40) | ((long)source[1] << 32) | ((long)source[2] << 24) | ((long)source[3] << 16) | ((long)source[4] << 8) | source[5]; - ushort counter = (ushort)((source[6] << 8) | source[7]); - ushort nodeId = (ushort)((source[8] << 8) | source[9]); + ushort counter = BinaryPrimitives.ReadUInt16BigEndian(source.Slice(6, 2)); + ushort nodeId = BinaryPrimitives.ReadUInt16BigEndian(source.Slice(8, 2)); return new HlcTimestamp(wallTime, counter, nodeId); } @@ -166,13 +172,26 @@ public int CompareTo(object? obj) /// public static HlcTimestamp Parse(string s) { - var atIndex = s.LastIndexOf('@'); - var dotIndex = s.LastIndexOf('.', atIndex > 0 ? atIndex : s.Length - 1); + return Parse(s.AsSpan()); + } + + /// + /// Parse from string format "walltime.counter@node". + /// + public static HlcTimestamp Parse(ReadOnlySpan s) + { + var atIndex = s.IndexOf('@'); + if (atIndex <= 0) + throw new FormatException("Invalid HLC timestamp format."); + + var dotIndex = s[..atIndex].IndexOf('.'); + if (dotIndex <= 0) + throw new FormatException("Invalid HLC timestamp format."); return new HlcTimestamp( - wallTimeMs: long.Parse(s.AsSpan(0, dotIndex)), - counter: ushort.Parse(s.AsSpan(dotIndex + 1, atIndex - dotIndex - 1)), - nodeId: ushort.Parse(s.AsSpan(atIndex + 1)) + wallTimeMs: long.Parse(s[..dotIndex]), + counter: ushort.Parse(s.Slice(dotIndex + 1, atIndex - dotIndex - 1)), + nodeId: ushort.Parse(s[(atIndex + 1)..]) ); } } From bc06f74dcaef37eca294b608902290304a7d32fc Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 04:43:50 +0100 Subject: [PATCH 3/8] Reduce RNG refills and simplify HLC witness max selection --- src/HlcGuidFactory.cs | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/src/HlcGuidFactory.cs b/src/HlcGuidFactory.cs index caa11a7..3194e8e 100644 --- a/src/HlcGuidFactory.cs +++ b/src/HlcGuidFactory.cs @@ -66,8 +66,9 @@ public sealed class HlcGuidFactory : IHlcGuidFactory, IDisposable private ushort _counter; // Pre-allocated random buffer (protected by _lock since we need lock anyway) - private readonly byte[] _randomBuffer = new byte[64]; - private int _randomPosition = 64; + // Sized to reduce RNG refill frequency in high-throughput scenarios. + private readonly byte[] _randomBuffer = new byte[256]; + private int _randomPosition = 256; // UUID constants private const byte Version7 = 0x70; @@ -188,16 +189,36 @@ public void Witness(HlcTimestamp remoteTimestamp) { var physicalTimeMs = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds(); - // Physical time participates in max selection but has no counter/node information. - // We treat it as (physicalTimeMs, 0, 0) for ordering purposes. - var physical = new HlcTimestamp(physicalTimeMs, counter: 0, nodeId: 0); - var local = new HlcTimestamp(_logicalTimeMs, _counter, _nodeId); + // Determine which source is the maximum in the total order: + // local = (_logicalTimeMs, _counter, _nodeId) + // remote = (remote.WallTimeMs, remote.Counter, remote.NodeId) + // physical = (physicalTimeMs, 0, 0) + var localWall = _logicalTimeMs; + var localCounter = _counter; + var localNode = _nodeId; + + var maxIsLocal = true; + var maxIsRemote = false; + + // Compare remote vs local + if (remoteTimestamp.WallTimeMs > localWall || + (remoteTimestamp.WallTimeMs == localWall && + (remoteTimestamp.Counter > localCounter || + (remoteTimestamp.Counter == localCounter && remoteTimestamp.NodeId > localNode)))) + { + maxIsLocal = false; + maxIsRemote = true; + } - var max = local; - if (remoteTimestamp > max) max = remoteTimestamp; - if (physical > max) max = physical; + // Compare physical vs current max (physical has counter/node of 0) + var maxWall = maxIsLocal ? localWall : remoteTimestamp.WallTimeMs; + if (physicalTimeMs > maxWall) + { + maxIsLocal = false; + maxIsRemote = false; + } - if (max.WallTimeMs == local.WallTimeMs && max.Counter == local.Counter && max.NodeId == local.NodeId) + if (maxIsLocal) { // Local time is already the max (or tied with max) - just increment counter. _counter++; @@ -207,7 +228,7 @@ public void Witness(HlcTimestamp remoteTimestamp) _counter = 0; } } - else if (max.WallTimeMs == remoteTimestamp.WallTimeMs && max.Counter == remoteTimestamp.Counter && max.NodeId == remoteTimestamp.NodeId) + else if (maxIsRemote) { // Remote timestamp is the max - adopt its wall time and advance counter beyond it. _logicalTimeMs = remoteTimestamp.WallTimeMs; From 9261740dc68e19c2a742ca36aad2889fce9f9616 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 04:50:20 +0100 Subject: [PATCH 4/8] Tighten UUID packing and witness fast path --- src/HlcGuidFactory.cs | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/HlcGuidFactory.cs b/src/HlcGuidFactory.cs index 3194e8e..ce342d0 100644 --- a/src/HlcGuidFactory.cs +++ b/src/HlcGuidFactory.cs @@ -1,5 +1,6 @@ using Clockworks.Abstractions; using Clockworks.Distributed; +using System.Buffers.Binary; using System.Runtime.CompilerServices; using System.Security.Cryptography; @@ -261,9 +262,13 @@ public void Witness(long remoteTimestampMs) lock (_lock) { var physicalTimeMs = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds(); - var maxTime = Math.Max(physicalTimeMs, Math.Max(_logicalTimeMs, remoteTimestampMs)); - - if (maxTime == _logicalTimeMs) + + var localTimeMs = _logicalTimeMs; + var maxTime = localTimeMs; + if (remoteTimestampMs > maxTime) maxTime = remoteTimestampMs; + if (physicalTimeMs > maxTime) maxTime = physicalTimeMs; + + if (maxTime == localTimeMs) { // Our logical time is already the max - just increment counter _counter++; @@ -342,21 +347,22 @@ private Guid CreateGuidFromHlc(HlcTimestamp timestamp, ReadOnlySpan random Span bytes = stackalloc byte[16]; // Bytes 0-5: 48-bit logical timestamp (big-endian) - bytes[0] = (byte)(timestamp.WallTimeMs >> 40); - bytes[1] = (byte)(timestamp.WallTimeMs >> 32); - bytes[2] = (byte)(timestamp.WallTimeMs >> 24); - bytes[3] = (byte)(timestamp.WallTimeMs >> 16); - bytes[4] = (byte)(timestamp.WallTimeMs >> 8); - bytes[5] = (byte)timestamp.WallTimeMs; + var wallTime = (ulong)timestamp.WallTimeMs; + bytes[0] = (byte)(wallTime >> 40); + bytes[1] = (byte)(wallTime >> 32); + bytes[2] = (byte)(wallTime >> 24); + bytes[3] = (byte)(wallTime >> 16); + bytes[4] = (byte)(wallTime >> 8); + bytes[5] = (byte)wallTime; // Bytes 6-7: version (4 bits) + counter (12 bits) - bytes[6] = (byte)(Version7 | ((timestamp.Counter >> 8) & VersionMask)); - bytes[7] = (byte)timestamp.Counter; + BinaryPrimitives.WriteUInt16BigEndian(bytes.Slice(6, 2), timestamp.Counter); + bytes[6] = (byte)(Version7 | (bytes[6] & VersionMask)); // Bytes 8-9: variant (2 bits) + node ID high bits (14 bits across bytes 8-9) // We encode node ID in the "random" portion for correlation - bytes[8] = (byte)(VariantRfc4122 | ((timestamp.NodeId >> 8) & VariantMask)); - bytes[9] = (byte)timestamp.NodeId; + BinaryPrimitives.WriteUInt16BigEndian(bytes.Slice(8, 2), timestamp.NodeId); + bytes[8] = (byte)(VariantRfc4122 | (bytes[8] & VariantMask)); // Bytes 10-15: random (48 bits) randomBytes.Slice(0, 6).CopyTo(bytes.Slice(10, 6)); From 43593302d5a0bd486412f6af15ac6a570b313751 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 05:00:58 +0100 Subject: [PATCH 5/8] Use BinaryPrimitives for UUID packing --- src/HlcGuidFactory.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/HlcGuidFactory.cs b/src/HlcGuidFactory.cs index ce342d0..8007da0 100644 --- a/src/HlcGuidFactory.cs +++ b/src/HlcGuidFactory.cs @@ -365,7 +365,7 @@ private Guid CreateGuidFromHlc(HlcTimestamp timestamp, ReadOnlySpan random bytes[8] = (byte)(VariantRfc4122 | (bytes[8] & VariantMask)); // Bytes 10-15: random (48 bits) - randomBytes.Slice(0, 6).CopyTo(bytes.Slice(10, 6)); + randomBytes[..6].CopyTo(bytes.Slice(10, 6)); return new Guid(bytes, bigEndian: true); } From 8d82b0711e7a0d28dc202b27016fc11214ac57e8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 04:12:39 +0000 Subject: [PATCH 6/8] Initial plan From c52f67a8be4ed0646b00a28bbb33860f6654702b Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 05:22:00 +0100 Subject: [PATCH 7/8] Make HLC message header TryParse non-exceptional --- src/Distributed/HlcMessageHeader.cs | 35 ++++++++++++++++++++++++----- src/Distributed/HlcTimestamp.cs | 31 +++++++++++++++++++++++++ tests/HlcMessageHeaderTests.cs | 27 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/src/Distributed/HlcMessageHeader.cs b/src/Distributed/HlcMessageHeader.cs index 7f5b7bb..2cd9726 100644 --- a/src/Distributed/HlcMessageHeader.cs +++ b/src/Distributed/HlcMessageHeader.cs @@ -100,15 +100,40 @@ public static bool TryParse(string? value, out HlcMessageHeader header) { header = default; if (string.IsNullOrEmpty(value)) return false; - - try + + var span = value.AsSpan(); + var firstSep = span.IndexOf(';'); + + HlcTimestamp timestamp; + if (firstSep < 0) { - header = Parse(value); + if (!HlcTimestamp.TryParse(value, out timestamp)) + return false; + + header = new HlcMessageHeader(timestamp); return true; } - catch - { + + if (!HlcTimestamp.TryParse(span[..firstSep], out timestamp)) return false; + + span = span[(firstSep + 1)..]; + var secondSep = span.IndexOf(';'); + + if (secondSep < 0) + { + if (!Guid.TryParseExact(span, "N", out var correlation)) + return false; + header = new HlcMessageHeader(timestamp, correlation); + return true; } + + if (!Guid.TryParseExact(span[..secondSep], "N", out var correlationId)) + return false; + if (!Guid.TryParseExact(span[(secondSep + 1)..], "N", out var causationId)) + return false; + + header = new HlcMessageHeader(timestamp, correlationId, causationId); + return true; } } diff --git a/src/Distributed/HlcTimestamp.cs b/src/Distributed/HlcTimestamp.cs index 9d76a88..eff5990 100644 --- a/src/Distributed/HlcTimestamp.cs +++ b/src/Distributed/HlcTimestamp.cs @@ -194,4 +194,35 @@ public static HlcTimestamp Parse(ReadOnlySpan s) nodeId: ushort.Parse(s[(atIndex + 1)..]) ); } + + /// + /// Tries to parse from string format "walltime.counter@node". + /// + public static bool TryParse(ReadOnlySpan s, out HlcTimestamp result) + { + result = default; + if (s.IsEmpty) + { + result = new HlcTimestamp(); + return true; + } + + var atIndex = s.IndexOf('@'); + if (atIndex <= 0) + return false; + + var dotIndex = s[..atIndex].IndexOf('.'); + if (dotIndex <= 0) + return false; + + if (!long.TryParse(s[..dotIndex], out var wallTimeMs)) + return false; + if (!ushort.TryParse(s.Slice(dotIndex + 1, atIndex - dotIndex - 1), out var counter)) + return false; + if (!ushort.TryParse(s[(atIndex + 1)..], out var nodeId)) + return false; + + result = new HlcTimestamp(wallTimeMs, counter, nodeId); + return true; + } } diff --git a/tests/HlcMessageHeaderTests.cs b/tests/HlcMessageHeaderTests.cs index aeee40a..b805310 100644 --- a/tests/HlcMessageHeaderTests.cs +++ b/tests/HlcMessageHeaderTests.cs @@ -39,4 +39,31 @@ public void GoldenString_Parses_AsExpected() Assert.Equal(Guid.Parse("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"), parsed.CorrelationId); Assert.Equal(Guid.Parse("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"), parsed.CausationId); } + + [Fact] + public void TryParse_RoundTrips_TimestampOnly() + { + const string value = "1700000000000.0012@3"; + Assert.True(HlcMessageHeader.TryParse(value, out var header)); + Assert.Equal(new HlcTimestamp(1_700_000_000_000, counter: 12, nodeId: 3), header.Timestamp); + Assert.Null(header.CorrelationId); + Assert.Null(header.CausationId); + } + + [Fact] + public void TryParse_RoundTrips_TimestampAndCorrelation() + { + const string value = "1700000000000.0012@3;aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + Assert.True(HlcMessageHeader.TryParse(value, out var header)); + Assert.Equal(new HlcTimestamp(1_700_000_000_000, counter: 12, nodeId: 3), header.Timestamp); + Assert.Equal(Guid.Parse("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"), header.CorrelationId); + Assert.Null(header.CausationId); + } + + [Fact] + public void TryParse_ReturnsFalse_OnInvalidGuidAndTimestamp() + { + Assert.False(HlcMessageHeader.TryParse("1700000000000.0012@3;not-a-guid", out _)); + Assert.False(HlcMessageHeader.TryParse("not-a-timestamp;aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", out _)); + } } From 2eae628c432a7a62a543dc51abcb80767b60b317 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Sun, 8 Feb 2026 05:34:58 +0100 Subject: [PATCH 8/8] Make HlcTimestamp span TryParse reject empty input --- src/Distributed/HlcTimestamp.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Distributed/HlcTimestamp.cs b/src/Distributed/HlcTimestamp.cs index eff5990..88ab2ba 100644 --- a/src/Distributed/HlcTimestamp.cs +++ b/src/Distributed/HlcTimestamp.cs @@ -202,10 +202,7 @@ public static bool TryParse(ReadOnlySpan s, out HlcTimestamp result) { result = default; if (s.IsEmpty) - { - result = new HlcTimestamp(); - return true; - } + return false; var atIndex = s.IndexOf('@'); if (atIndex <= 0)