From 72017b8cab0ac16e878f5ac81538f03e9f9bd4d5 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 09:24:34 +0100 Subject: [PATCH 01/13] Clarify HLC packed encoding semantics --- src/Distributed/HlcTimestamp.cs | 38 ++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/src/Distributed/HlcTimestamp.cs b/src/Distributed/HlcTimestamp.cs index 88ab2ba..9cee2c0 100644 --- a/src/Distributed/HlcTimestamp.cs +++ b/src/Distributed/HlcTimestamp.cs @@ -19,6 +19,31 @@ namespace Clockworks.Distributed; /// public readonly record struct HlcTimestamp : IComparable, IComparable { + /// + /// Number of bits allocated to the wall time component in . + /// + public const int PackedWallTimeBits = 48; + + /// + /// Number of bits allocated to the counter component in . + /// + public const int PackedCounterBits = 12; + + /// + /// Number of bits allocated to node id in . + /// + public const int PackedNodeIdBits = 4; + + /// + /// Bit mask applied to in . + /// + public const ushort PackedCounterMask = 0x0FFF; + + /// + /// Bit mask applied to in . + /// + public const ushort PackedNodeIdMask = 0x000F; + /// /// Logical wall time in milliseconds since Unix epoch. /// May drift ahead of physical time to maintain causality. @@ -52,16 +77,23 @@ public HlcTimestamp(long wallTimeMs, ushort counter = 0, ushort nodeId = 0) /// /// Packs the timestamp into a 64-bit value for efficient storage/transmission. - /// Layout: [48 bits wall time][12 bits counter][4 bits node (truncated)] + /// Layout: [48 bits wall time][12 bits counter][4 bits node id]. /// + /// + /// This packed encoding is an optimization format. + /// + /// is truncated to the lower 4 bits (0-15). + /// Use / for a full-fidelity encoding. + /// + /// public long ToPackedInt64() { // 48 bits for timestamp (good until year 10889) // 12 bits for counter (0-4095) // 4 bits for node ID (0-15, use for small clusters) ulong wall = (ulong)WallTimeMs; - ulong counter = (ulong)(Counter & 0x0FFF); - ulong node = (ulong)(NodeId & 0x000F); + ulong counter = (ulong)(Counter & PackedCounterMask); + ulong node = (ulong)(NodeId & PackedNodeIdMask); ulong packed = (wall << 16) | (counter << 4) | node; return unchecked((long)packed); From e0503f1c12959a39ab3f2e1c8f5121f88c64bf63 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 09:24:38 +0100 Subject: [PATCH 02/13] VectorClock: add IsEmpty and internal builder for coordinator --- src/Distributed/VectorClock.cs | 26 +++- src/Distributed/VectorClockBuilder.cs | 161 ++++++++++++++++++++++ src/Distributed/VectorClockCoordinator.cs | 18 ++- 3 files changed, 201 insertions(+), 4 deletions(-) create mode 100644 src/Distributed/VectorClockBuilder.cs diff --git a/src/Distributed/VectorClock.cs b/src/Distributed/VectorClock.cs index e90a463..e0e4cd5 100644 --- a/src/Distributed/VectorClock.cs +++ b/src/Distributed/VectorClock.cs @@ -54,6 +54,11 @@ public enum VectorClockOrder private readonly ulong[]? _counters; private const int MaxEntries = ushort.MaxValue + 1; + /// + /// True when the vector clock contains no entries. + /// + public bool IsEmpty => _nodeIds == null || _counters == null || _nodeIds.Length == 0; + /// /// Creates an empty vector clock. /// @@ -69,6 +74,24 @@ private VectorClock(ushort[] nodeIds, ulong[] counters) _counters = counters; } + internal int Count => _nodeIds?.Length ?? 0; + + internal void CopyTo(Span nodeIds, Span counters) + { + if (_nodeIds == null || _counters == null) + return; + + _nodeIds.CopyTo(nodeIds); + _counters.CopyTo(counters); + } + + internal static VectorClock CreateUnsafe(ushort[] nodeIds, ulong[] counters) + { + if (nodeIds.Length != counters.Length) + throw new ArgumentException("nodeIds and counters must have same length."); + return new VectorClock(nodeIds, counters); + } + /// /// Gets the counter value for a specific node ID. /// Returns 0 if the node ID is not present in this vector clock. @@ -284,7 +307,8 @@ public VectorClockOrder Compare(VectorClock other) /// /// Writes this vector clock to a binary representation. - /// Format: [count:uint][nodeId:ushort,counter:ulong]* + /// Format: [count:u32 big-endian][(nodeId:u16 big-endian, counter:u64 big-endian)]*. + /// The in-memory representation is canonical (sorted by nodeId), so the serialized form is canonical. /// public void WriteTo(Span destination) { diff --git a/src/Distributed/VectorClockBuilder.cs b/src/Distributed/VectorClockBuilder.cs new file mode 100644 index 0000000..2994f4b --- /dev/null +++ b/src/Distributed/VectorClockBuilder.cs @@ -0,0 +1,161 @@ +using System.Buffers.Binary; + +namespace Clockworks.Distributed; + +internal sealed class VectorClockBuilder +{ + private const int MaxEntries = ushort.MaxValue + 1; + + private ushort[] _nodeIds; + private ulong[] _counters; + private int _count; + + public VectorClockBuilder(int initialCapacity = 4) + { + if (initialCapacity < 0) + throw new ArgumentOutOfRangeException(nameof(initialCapacity)); + + _nodeIds = initialCapacity == 0 ? [] : new ushort[initialCapacity]; + _counters = initialCapacity == 0 ? [] : new ulong[initialCapacity]; + _count = 0; + } + + public bool IsEmpty => _count == 0; + + public void Reset(VectorClock snapshot) + { + var count = snapshot.Count; + EnsureCapacity(count); + _count = count; + + if (count == 0) + return; + + snapshot.CopyTo(_nodeIds.AsSpan(0, count), _counters.AsSpan(0, count)); + } + + public void Increment(ushort nodeId) + { + if (_count == 0) + { + EnsureCapacity(1); + _nodeIds[0] = nodeId; + _counters[0] = 1; + _count = 1; + return; + } + + var index = Array.BinarySearch(_nodeIds, 0, _count, nodeId); + if (index >= 0) + { + _counters[index]++; + return; + } + + if (_count >= MaxEntries) + throw new InvalidOperationException($"Cannot increment: vector clock at max capacity ({MaxEntries})"); + + var insertIndex = ~index; + EnsureCapacity(_count + 1); + + if (insertIndex < _count) + { + Array.Copy(_nodeIds, insertIndex, _nodeIds, insertIndex + 1, _count - insertIndex); + Array.Copy(_counters, insertIndex, _counters, insertIndex + 1, _count - insertIndex); + } + + _nodeIds[insertIndex] = nodeId; + _counters[insertIndex] = 1; + _count++; + } + + public void Merge(VectorClock other) + { + if (other.IsEmpty) + return; + + // Decode directly from canonical binary format to avoid reliance on VectorClock internals. + // This is linear in number of entries and does not allocate per entry. + var size = other.GetBinarySize(); + byte[]? pooled = null; + Span buffer = size <= 1024 ? stackalloc byte[size] : (pooled = new byte[size]); + + other.WriteTo(buffer); + + var count = BinaryPrimitives.ReadUInt32BigEndian(buffer); + var offset = 4; + for (var i = 0; i < count; i++) + { + var nodeId = BinaryPrimitives.ReadUInt16BigEndian(buffer.Slice(offset, 2)); + offset += 2; + var counter = BinaryPrimitives.ReadUInt64BigEndian(buffer.Slice(offset, 8)); + offset += 8; + + MergeEntry(nodeId, counter); + } + } + + public VectorClock ToSnapshot() + { + if (_count == 0) + return new VectorClock(); + + var nodeIds = new ushort[_count]; + var counters = new ulong[_count]; + Array.Copy(_nodeIds, nodeIds, _count); + Array.Copy(_counters, counters, _count); + return VectorClock.CreateUnsafe(nodeIds, counters); + } + + private void MergeEntry(ushort nodeId, ulong counter) + { + if (_count == 0) + { + EnsureCapacity(1); + _nodeIds[0] = nodeId; + _counters[0] = counter; + _count = 1; + return; + } + + var index = Array.BinarySearch(_nodeIds, 0, _count, nodeId); + if (index >= 0) + { + if (counter > _counters[index]) + _counters[index] = counter; + return; + } + + if (_count >= MaxEntries) + throw new InvalidOperationException($"Cannot merge: vector clock at max capacity ({MaxEntries})"); + + var insertIndex = ~index; + EnsureCapacity(_count + 1); + + if (insertIndex < _count) + { + Array.Copy(_nodeIds, insertIndex, _nodeIds, insertIndex + 1, _count - insertIndex); + Array.Copy(_counters, insertIndex, _counters, insertIndex + 1, _count - insertIndex); + } + + _nodeIds[insertIndex] = nodeId; + _counters[insertIndex] = counter; + _count++; + } + + private void EnsureCapacity(int needed) + { + if (needed > MaxEntries) + throw new InvalidOperationException($"Vector clock cannot contain more than {MaxEntries} entries."); + + if (_nodeIds.Length >= needed) + return; + + var next = Math.Max(needed, _nodeIds.Length == 0 ? 4 : _nodeIds.Length * 2); + if (next > MaxEntries) + next = MaxEntries; + + Array.Resize(ref _nodeIds, next); + Array.Resize(ref _counters, next); + } +} diff --git a/src/Distributed/VectorClockCoordinator.cs b/src/Distributed/VectorClockCoordinator.cs index fc45719..c662332 100644 --- a/src/Distributed/VectorClockCoordinator.cs +++ b/src/Distributed/VectorClockCoordinator.cs @@ -28,6 +28,7 @@ public sealed class VectorClockCoordinator private readonly ushort _nodeId; private readonly Lock _lock = new(); private VectorClock _current; + private readonly VectorClockBuilder _builder = new(); /// /// Statistics for monitoring vector clock behavior. @@ -68,7 +69,12 @@ public void BeforeReceive(VectorClock remote) lock (_lock) { var before = _current; - _current = _current.Merge(remote).Increment(_nodeId); + + _builder.Reset(_current); + _builder.Merge(remote); + _builder.Increment(_nodeId); + _current = _builder.ToSnapshot(); + Statistics.RecordReceive(before, _current, remote); } } @@ -82,7 +88,10 @@ public VectorClock BeforeSend() { lock (_lock) { - _current = _current.Increment(_nodeId); + _builder.Reset(_current); + _builder.Increment(_nodeId); + _current = _builder.ToSnapshot(); + var snapshot = _current; Statistics.RecordSend(snapshot); return snapshot; @@ -97,7 +106,10 @@ public void NewLocalEvent() { lock (_lock) { - _current = _current.Increment(_nodeId); + _builder.Reset(_current); + _builder.Increment(_nodeId); + _current = _builder.ToSnapshot(); + Statistics.RecordLocalEvent(_current); } } From 60fadc4b544b1dc675fa312abf385cff7d2f7985 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 09:24:41 +0100 Subject: [PATCH 03/13] Demo: use Dictionary-based state and priority-queue network --- ...DistributedAtLeastOnceCausalityShowcase.cs | 135 ++++++++++++------ 1 file changed, 91 insertions(+), 44 deletions(-) diff --git a/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs b/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs index 8cb5be1..baafa3d 100644 --- a/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs +++ b/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs @@ -2,7 +2,6 @@ using Clockworks.Distributed; using Clockworks.Instrumentation; using Clockworks.Demo.Infrastructure; -using System.Collections.Concurrent; using System.Collections.Generic; using static System.Console; @@ -142,11 +141,12 @@ private sealed class Node private readonly TimeProvider _tp; private readonly UuidV7Factory _uuid; private readonly BoundedInboxDedupe _inbox; - private readonly ConcurrentDictionary> _acks = new(); - private readonly ConcurrentDictionary _retries = new(); - private readonly ConcurrentDictionary _attempts = new(StringComparer.OrdinalIgnoreCase); - private readonly ConcurrentDictionary _correlationIds = new(StringComparer.OrdinalIgnoreCase); - private readonly ConcurrentDictionary _templates = new(); + // This demo is single-threaded and deterministic, so we intentionally use non-concurrent collections. + private readonly Dictionary> _acks = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _retries = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _attempts = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _correlationIds = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _templates = new(); public string Name { get; } public HlcCoordinator Hlc { get; } @@ -171,7 +171,7 @@ public Node(string name, ushort nodeId, SimulatedTimeProvider tp, SimulatedNetwo Vector = new VectorClockCoordinator(nodeId); } - public bool IsDone => _acks.IsEmpty && _retries.IsEmpty; + public bool IsDone => _acks.Count == 0 && _retries.Count == 0; public void StartNewOrder(string orderId) { @@ -180,7 +180,7 @@ public void StartNewOrder(string orderId) var ts = Hlc.BeforeSend(); var correlationId = _uuid.NewGuid(); - _acks[orderId] = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + _acks[orderId] = new HashSet(StringComparer.OrdinalIgnoreCase); _attempts[orderId] = 1; _correlationIds[orderId] = correlationId; @@ -246,28 +246,27 @@ private void HandleAck(Message msg) return; } - fromSet.TryAdd(msg.From, 0); + fromSet.Add(msg.From); - if (fromSet.ContainsKey(PaymentsNodeName) && fromSet.ContainsKey(InventoryNodeName)) + if (fromSet.Contains(PaymentsNodeName) && fromSet.Contains(InventoryNodeName)) { - if (_retries.TryRemove(msg.OrderId, out var handle)) - { + if (_retries.Remove(msg.OrderId, out var handle)) handle.Dispose(); - } - _acks.TryRemove(msg.OrderId, out _); - _attempts.TryRemove(msg.OrderId, out _); - _correlationIds.TryRemove(msg.OrderId, out _); + _acks.Remove(msg.OrderId); + _attempts.Remove(msg.OrderId); + _correlationIds.Remove(msg.OrderId); // Clean up templates to prevent unbounded growth in long simulations - _templates.TryRemove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, PaymentsNodeName), out _); - _templates.TryRemove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, InventoryNodeName), out _); + _templates.Remove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, PaymentsNodeName)); + _templates.Remove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, InventoryNodeName)); } } public void Poll() { - // Snapshot to avoid collection modification during iteration + // Snapshot to avoid collection modification during iteration. + // (Timeout callbacks are driven by SimulatedTimeProvider; we still keep iteration safe.) var snapshot = _retries.ToArray(); foreach (var (orderId, handle) in snapshot) { @@ -287,10 +286,7 @@ private void ScheduleRetry(string orderId, int attempt) var handle = Timeouts.CreateTimeoutHandle(_tp, TimeSpan.FromMilliseconds(50), statistics: TimeoutStats); if (!_retries.TryAdd(orderId, handle)) - { handle.Dispose(); - return; - } } private void TryRetry(string orderId) @@ -301,13 +297,14 @@ private void TryRetry(string orderId) return; } - if (fromSet.ContainsKey(PaymentsNodeName) && fromSet.ContainsKey(InventoryNodeName)) + if (fromSet.Contains(PaymentsNodeName) && fromSet.Contains(InventoryNodeName)) { CleanupRetry(orderId); return; } - var nextAttempt = _attempts.AddOrUpdate(orderId, 2, static (_, current) => current + 1); + var nextAttempt = _attempts.TryGetValue(orderId, out var currentAttempt) ? currentAttempt + 1 : 2; + _attempts[orderId] = nextAttempt; if (nextAttempt > 10) { CleanupRetry(orderId); @@ -334,7 +331,7 @@ private MessageTemplate GetOrCreateTemplate(LogicalMessageKey key, Guid correlat if (_templates.TryGetValue(key, out var existing)) return existing; - if (hlc == default || vc.Equals(default(VectorClock))) + if (hlc == default || vc.IsEmpty) { // This should not happen for normal flow (templates are created during StartNewOrder), // but if it does, synthesize a reasonable envelope. @@ -354,15 +351,14 @@ private MessageTemplate GetOrCreateTemplate(LogicalMessageKey key, Guid correlat Hlc: hlc, VectorClock: vc); - return _templates.GetOrAdd(key, template); + _templates[key] = template; + return template; } private void CleanupRetry(string orderId) { - if (_retries.TryRemove(orderId, out var existing)) - { + if (_retries.Remove(orderId, out var existing)) existing.Dispose(); - } } private void Send(Message msg) => _network.Send(msg); @@ -562,8 +558,9 @@ private sealed class SimulatedNetwork(SimulatedTimeProvider tp, FailureInjector private readonly Lock _lock = new(); private readonly SimulatedTimeProvider _tp = tp; private readonly FailureInjector _failures = failures; - private readonly ConcurrentDictionary _nodes = new(StringComparer.OrdinalIgnoreCase); - private readonly List _inFlight = []; + private readonly Dictionary _nodes = new(StringComparer.OrdinalIgnoreCase); + private readonly PriorityQueue _inFlight = new(); + private long _sequence; public MessagingStatistics Stats { get; } = new(); @@ -595,7 +592,8 @@ public long? NextDueUtcMs { if (_inFlight.Count == 0) return null; - return _inFlight.Min(m => m.DeliverAtUtcMs); + + return _inFlight.Peek().Priority.DeliverAtUtcMs; } } } @@ -617,11 +615,11 @@ public void Send(Message msg) lock (_lock) { - _inFlight.Add(new Envelope(msg, deliverAt)); + Enqueue(msg, deliverAt); if (_failures.ShouldDuplicate()) { - _inFlight.Add(new Envelope(msg with { }, deliverAt + _failures.AdditionalDelayMs())); + Enqueue(msg with { }, deliverAt + _failures.AdditionalDelayMs()); Stats.RecordDuplicated(_inFlight.Count); } @@ -629,9 +627,9 @@ public void Send(Message msg) if (_failures.ShouldReorder() && _inFlight.Count >= 2) { - var i = _inFlight.Count - 1; - var j = Math.Max(0, i - 1); - (_inFlight[i], _inFlight[j]) = (_inFlight[j], _inFlight[i]); + // Model reordering as small bounded jitter on delivery time. + // Clamp to >= now so delivery remains time-driven (no "instant" negative delay). + ApplyReorderJitterUnsafe(now); Stats.RecordReordered(); } } @@ -647,14 +645,14 @@ public void DeliverDue() if (_inFlight.Count == 0) return; - due = _inFlight.Where(m => m.DeliverAtUtcMs <= now).ToList(); - if (due.Count == 0) - return; - - foreach (var m in due) + due = []; + while (_inFlight.Count > 0 && _inFlight.Peek().Priority.DeliverAtUtcMs <= now) { - _inFlight.Remove(m); + due.Add(_inFlight.Dequeue()); } + + if (due.Count == 0) + return; } foreach (var env in due) @@ -669,6 +667,41 @@ public void DeliverDue() } } + private void Enqueue(Message msg, long deliverAtUtcMs) + { + var env = new Envelope(msg); + var priority = new DeliveryPriority(deliverAtUtcMs, unchecked(++_sequence)); + env.Priority = priority; + _inFlight.Enqueue(env, priority); + } + + private void ApplyReorderJitterUnsafe(long nowUtcMs) + { + if (_inFlight.Count < 2) + return; + + // PriorityQueue has no decrease-key; rebuild for a tiny demo workload. + var entries = new List(_inFlight.Count); + while (_inFlight.Count > 0) + { + entries.Add(_inFlight.Dequeue()); + } + + var jitterWindowMs = Math.Min(10, Math.Max(1, _failures.MaxAdditionalDelayMs / 20)); + for (var i = Math.Max(0, entries.Count - 2); i < entries.Count; i++) + { + var env = entries[i]; + var jitter = _failures.AdditionalDelayMs() % (jitterWindowMs + 1); + var newDeliverAt = Math.Max(nowUtcMs, env.Priority.DeliverAtUtcMs - jitter); + env.Priority = env.Priority with { DeliverAtUtcMs = newDeliverAt }; + } + + foreach (var env in entries) + { + _inFlight.Enqueue(env, env.Priority); + } + } + private void ObserveConcurrency(Message delivered) { lock (_lock) @@ -700,7 +733,21 @@ private void ObserveConcurrency(Message delivered) } } - private sealed record Envelope(Message Message, long DeliverAtUtcMs); + private sealed record Envelope(Message Message) + { + public DeliveryPriority Priority { get; set; } + } + + private readonly record struct DeliveryPriority(long DeliverAtUtcMs, long Sequence) : IComparable + { + public int CompareTo(DeliveryPriority other) + { + var cmp = DeliverAtUtcMs.CompareTo(other.DeliverAtUtcMs); + if (cmp != 0) + return cmp; + return Sequence.CompareTo(other.Sequence); + } + } private sealed record DeliveredEvent(Guid EventId, MessageKind Kind, string From, string To, VectorClock VectorClock); } From b67df13c08e92a0e53c4f456ab065ae138252ba5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:06:06 +0000 Subject: [PATCH 04/13] Initial plan From bf427b76a7e3c923ed796de917349f9a6bc35705 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 16:17:25 +0100 Subject: [PATCH 05/13] Fix packed HLC decode for high-bit wall times --- src/Distributed/HlcTimestamp.cs | 7 ++++--- tests/HlcTimestampTests.cs | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/Distributed/HlcTimestamp.cs b/src/Distributed/HlcTimestamp.cs index 9cee2c0..d477e6c 100644 --- a/src/Distributed/HlcTimestamp.cs +++ b/src/Distributed/HlcTimestamp.cs @@ -104,10 +104,11 @@ public long ToPackedInt64() /// public static HlcTimestamp FromPackedInt64(long packed) { + var u = unchecked((ulong)packed); return new HlcTimestamp( - wallTimeMs: packed >> 16, - counter: (ushort)((packed >> 4) & 0xFFF), - nodeId: (ushort)(packed & 0xF) + wallTimeMs: (long)((u >> 16) & ((1UL << PackedWallTimeBits) - 1UL)), + counter: (ushort)((u >> PackedNodeIdBits) & PackedCounterMask), + nodeId: (ushort)(u & PackedNodeIdMask) ); } diff --git a/tests/HlcTimestampTests.cs b/tests/HlcTimestampTests.cs index 8b91590..cd5015d 100644 --- a/tests/HlcTimestampTests.cs +++ b/tests/HlcTimestampTests.cs @@ -30,4 +30,20 @@ public void ToPackedInt64_FromPackedInt64_RoundTrips_WithNodeTruncation() Assert.Equal((ushort)(ts.Counter & 0x0FFF), parsed.Counter); Assert.Equal((ushort)(ts.NodeId & 0x000F), parsed.NodeId); } + + [Fact] + public void FromPackedInt64_DoesNotSignExtendWallTime_WhenPackedHighBitSet() + { + // Pick a 48-bit wall time where bit 47 is set; after packing (<< 16) this becomes bit 63. + var wall48 = (1L << 47) + 123; + var ts = new HlcTimestamp(wallTimeMs: wall48, counter: 1, nodeId: 2); + + var packed = ts.ToPackedInt64(); + Assert.True(packed < 0, "Packed representation should have sign bit set for this case."); + + var decoded = HlcTimestamp.FromPackedInt64(packed); + Assert.Equal(wall48, decoded.WallTimeMs); + Assert.Equal((ushort)1, decoded.Counter); + Assert.Equal((ushort)2, decoded.NodeId); + } } From 0106037191d6f8675820e962abb58d6f4cd5c80d Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 16:17:29 +0100 Subject: [PATCH 06/13] VectorClockBuilder: use ArrayPool for merge buffers --- src/Distributed/VectorClockBuilder.cs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/Distributed/VectorClockBuilder.cs b/src/Distributed/VectorClockBuilder.cs index 2994f4b..45222ec 100644 --- a/src/Distributed/VectorClockBuilder.cs +++ b/src/Distributed/VectorClockBuilder.cs @@ -1,3 +1,4 @@ +using System.Buffers; using System.Buffers.Binary; namespace Clockworks.Distributed; @@ -77,9 +78,27 @@ public void Merge(VectorClock other) // Decode directly from canonical binary format to avoid reliance on VectorClock internals. // This is linear in number of entries and does not allocate per entry. var size = other.GetBinarySize(); - byte[]? pooled = null; - Span buffer = size <= 1024 ? stackalloc byte[size] : (pooled = new byte[size]); + if (size <= 1024) + { + Span buffer = stackalloc byte[size]; + MergeFromBuffer(other, buffer); + return; + } + + var rented = ArrayPool.Shared.Rent(size); + try + { + var buffer = rented.AsSpan(0, size); + MergeFromBuffer(other, buffer); + } + finally + { + ArrayPool.Shared.Return(rented); + } + } + private void MergeFromBuffer(VectorClock other, Span buffer) + { other.WriteTo(buffer); var count = BinaryPrimitives.ReadUInt32BigEndian(buffer); From 9932afd7ff4a01dd111bad069caa25864f8c13d4 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 17:02:42 +0100 Subject: [PATCH 07/13] VectorClock: use CollectionsMarshal for canonicalization --- src/Distributed/VectorClock.cs | 39 ++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/Distributed/VectorClock.cs b/src/Distributed/VectorClock.cs index e0e4cd5..26f2328 100644 --- a/src/Distributed/VectorClock.cs +++ b/src/Distributed/VectorClock.cs @@ -1,5 +1,6 @@ using System.Buffers.Binary; using System.Globalization; +using System.Runtime.InteropServices; namespace Clockworks.Distributed; @@ -454,24 +455,36 @@ private static VectorClock CreateCanonical(List<(ushort nodeId, ulong counter)> if (pairs.Count == 0) return new VectorClock(); - pairs.Sort((a, b) => a.nodeId.CompareTo(b.nodeId)); + if (pairs.Count == 1) + { + var (nodeId, counter) = pairs[0]; + return new VectorClock([nodeId], [counter]); + } - var nodeIds = new List(pairs.Count); - var counters = new List(pairs.Count); - foreach (var pair in pairs) + var maxByNodeId = new Dictionary(capacity: pairs.Count); + foreach (var (nodeId, counter) in pairs) { - if (nodeIds.Count > 0 && nodeIds[^1] == pair.nodeId) - { - if (pair.counter > counters[^1]) - counters[^1] = pair.counter; - continue; - } + ref var existing = ref CollectionsMarshal.GetValueRefOrAddDefault(maxByNodeId, nodeId, out var exists); + if (!exists || counter > existing) + existing = counter; + } - nodeIds.Add(pair.nodeId); - counters.Add(pair.counter); + if (maxByNodeId.Count == 0) + return new VectorClock(); + + var sorted = maxByNodeId.Keys.ToArray(); + Array.Sort(sorted); + + var nodeIds = new ushort[sorted.Length]; + var counters = new ulong[sorted.Length]; + for (var i = 0; i < sorted.Length; i++) + { + var nodeId = sorted[i]; + nodeIds[i] = nodeId; + counters[i] = maxByNodeId[nodeId]; } - return new VectorClock([.. nodeIds], [.. counters]); + return new VectorClock(nodeIds, counters); } /// From 073be3605192c8f58e68678ee685196c4b65b5d7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Feb 2026 16:55:02 +0000 Subject: [PATCH 08/13] Initial plan From 4f144db48c865d5c9004bb1789b2f85957b07080 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 18:42:15 +0100 Subject: [PATCH 09/13] Docs: update README for HLC and VectorClock wire formats --- README.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5d0bbae..f44b3fb 100644 --- a/README.md +++ b/README.md @@ -25,12 +25,13 @@ It is built around `TimeProvider` so that *time becomes an injectable dependency - **Hybrid Logical Clock (HLC)** - HLC timestamps and utilities to preserve causality in distributed simulations - Helpers to witness remote timestamps and generate outbound timestamps + - Provides both a canonical 10-byte big-endian encoding (`HlcTimestamp.WriteTo`/`ReadFrom`) and an optimized packed 64-bit encoding (`ToPackedInt64`/`FromPackedInt64`) - **Vector Clock** - Full vector clock implementation for exact causality tracking and concurrency detection - Sorted-array representation optimized for sparse clocks - - `VectorClockCoordinator` for thread-safe clock management across distributed nodes - - Message header propagation for HTTP/gRPC integration + - `VectorClockCoordinator` for thread-safe clock management across distributed nodes (allocation-conscious hot path) + - Canonical binary wire format (`VectorClock.WriteTo`/`ReadFrom`) and string form for HTTP/gRPC headers - **Lightweight instrumentation** - Counters for timers, advances, and timeouts useful in simulation/test assertions @@ -100,6 +101,8 @@ In Clockworks, a "remote timestamp" is the `HlcTimestamp` produced on a differen via `HlcMessageHeader` (format: `walltime.counter@node`). The receiver should call `BeforeReceive(...)` with that timestamp to preserve causality. +Note: `HlcTimestamp.ToPackedInt64()`/`FromPackedInt64()` is an optimization encoding with a 48-bit wall time and a 4-bit node id (node id is truncated). Use `WriteTo`/`ReadFrom` when you need a full-fidelity representation. + ```csharp var tp = new SimulatedTimeProvider(); @@ -192,6 +195,13 @@ Console.WriteLine(clockA.IsConcurrentWith(clockB)); // true // HLC would show one as "less than" the other based on physical time ``` +**Wire formats** + +- **Binary (canonical):** `VectorClock.WriteTo`/`VectorClock.ReadFrom` + - Format: `[count:u32 big-endian][(nodeId:u16 big-endian, counter:u64 big-endian)]*` + - `ReadFrom` canonicalizes unsorted input and deduplicates node IDs by taking the maximum counter. +- **String (for headers):** `VectorClock.ToString()` / `VectorClock.Parse(...)` (`"node:counter,node:counter"`, sorted by node id) + ## Demos ### Console demos (`demo/Clockworks.Demo`) From f9e82b1b0ab6cbf633032fad4f6214ed0fc8ddb7 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 18:45:26 +0100 Subject: [PATCH 10/13] Use ThrowIfNegative for argument validation --- src/Distributed/VectorClockBuilder.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Distributed/VectorClockBuilder.cs b/src/Distributed/VectorClockBuilder.cs index 45222ec..8d840d9 100644 --- a/src/Distributed/VectorClockBuilder.cs +++ b/src/Distributed/VectorClockBuilder.cs @@ -13,8 +13,7 @@ internal sealed class VectorClockBuilder public VectorClockBuilder(int initialCapacity = 4) { - if (initialCapacity < 0) - throw new ArgumentOutOfRangeException(nameof(initialCapacity)); + ArgumentOutOfRangeException.ThrowIfNegative(initialCapacity); _nodeIds = initialCapacity == 0 ? [] : new ushort[initialCapacity]; _counters = initialCapacity == 0 ? [] : new ulong[initialCapacity]; From 9eb987fad82df417638f2814f01fa461e4a5e4c6 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 19:07:21 +0100 Subject: [PATCH 11/13] UuidV7Factory: optimize batch generation throughput --- src/UuidV7Factory.cs | 71 +++++++++++++++++++++++++++++++++++-- tests/UuidV7FactoryTests.cs | 40 +++++++++++++++++++++ 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/src/UuidV7Factory.cs b/src/UuidV7Factory.cs index 622b72a..b9aea81 100644 --- a/src/UuidV7Factory.cs +++ b/src/UuidV7Factory.cs @@ -1,4 +1,5 @@ using Clockworks.Abstractions; +using System.Buffers.Binary; using System.Runtime.CompilerServices; using System.Security.Cryptography; @@ -109,9 +110,72 @@ public Guid NewGuid() /// public void NewGuids(Span destination) { - for (int i = 0; i < destination.Length; i++) + if (destination.Length == 0) + return; + + FillGuids(destination); + } + + private void FillGuids(Span destination) + { + var i = 0; + var spinWait = new SpinWait(); + + while (i < destination.Length) { - destination[i] = NewGuid(); + var currentPacked = Volatile.Read(ref _packedState); + var (currentTimestamp, currentCounter) = UnpackState(currentPacked); + + var physicalTime = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds(); + + var baseTimestamp = physicalTime > currentTimestamp ? physicalTime : currentTimestamp; + + // If we moved forward to a new physical millisecond, start a fresh random counter window. + // Otherwise continue from the current counter. + var startCounter = physicalTime > currentTimestamp ? GetRandomCounterStart() : (ushort)(currentCounter + 1); + + if (startCounter > MaxCounterValue) + { + // Counter overflow at this millisecond. + switch (_effectiveOverflowBehavior) + { + case CounterOverflowBehavior.SpinWait: + SpinWaitForNextMillisecond(baseTimestamp); + continue; + + case CounterOverflowBehavior.IncrementTimestamp: + baseTimestamp = currentTimestamp + 1; + startCounter = GetRandomCounterStart(); + break; + + case CounterOverflowBehavior.ThrowException: + throw new InvalidOperationException( + $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); + + default: + throw new ArgumentOutOfRangeException(); + } + } + + var remainingInMs = MaxCounterValue - startCounter; + var available = Math.Min(destination.Length - i, remainingInMs + 1); + + var newTimestamp = baseTimestamp; + var newCounter = (ushort)(startCounter + available - 1); + var newPacked = PackState(newTimestamp, newCounter); + + if (Interlocked.CompareExchange(ref _packedState, newPacked, currentPacked) != currentPacked) + { + spinWait.SpinOnce(); + continue; + } + + for (var j = 0; j < available; j++) + { + destination[i + j] = CreateGuidFromState(baseTimestamp, (ushort)(startCounter + j)); + } + + i += available; } } @@ -250,7 +314,8 @@ private ushort GetRandomCounterStart() // Start in lower half of counter space to leave room for increments // This reduces collision probability across instances starting in the same ms var bytes = _randomBuffer.Value!.GetBytes(2); - return (ushort)((bytes[0] | (bytes[1] << 8)) & CounterRandomStart); + var value = BinaryPrimitives.ReadUInt16LittleEndian(bytes); + return (ushort)(value & CounterRandomStart); } private void SpinWaitForNextMillisecond(long currentMs) diff --git a/tests/UuidV7FactoryTests.cs b/tests/UuidV7FactoryTests.cs index 004f1d4..ed7a4fa 100644 --- a/tests/UuidV7FactoryTests.cs +++ b/tests/UuidV7FactoryTests.cs @@ -4,6 +4,46 @@ namespace Clockworks.Tests; public sealed class UuidV7FactoryTests { + [Fact] + public void NewGuids_IsMonotonic_AndUnique_WhenTimeDoesNotAdvance() + { + var time = SimulatedTimeProvider.FromUnixMs(1_700_000_000_000); + using var rng = new DeterministicRandomNumberGenerator(seed: 1); + using var factory = new UuidV7Factory(time, rng, overflowBehavior: CounterOverflowBehavior.IncrementTimestamp); + + Span batch = stackalloc Guid[256]; + factory.NewGuids(batch); + + for (var i = 1; i < batch.Length; i++) + { + Assert.True(batch[i - 1] < batch[i]); + } + + var set = new HashSet(); + foreach (var g in batch) + Assert.True(set.Add(g)); + + // Time does not need to advance. + Assert.Equal(1_700_000_000_000, time.GetUtcNow().ToUnixTimeMilliseconds()); + } + + [Fact] + public void NewGuids_RemainsMonotonic_AcrossCounterOverflow_WhenIncrementTimestampEnabled() + { + var time = SimulatedTimeProvider.FromUnixMs(1_700_000_000_000); + using var rng = new DeterministicRandomNumberGenerator(seed: 1); + using var factory = new UuidV7Factory(time, rng, overflowBehavior: CounterOverflowBehavior.IncrementTimestamp); + + // Large enough to cross the 12-bit counter range regardless of random start. + var arr = new Guid[5000]; + factory.NewGuids(arr); + + for (var i = 1; i < arr.Length; i++) + { + Assert.True(arr[i - 1] < arr[i]); + } + } + [Fact] public void NewGuid_IsMonotonic_WhenTimeDoesNotAdvance() { From 3b412ee2897db530821431289f3df10d7afe1e51 Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 19:14:21 +0100 Subject: [PATCH 12/13] UuidV7Factory: fix CA2208 for overflow behavior switch --- src/UuidV7Factory.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/UuidV7Factory.cs b/src/UuidV7Factory.cs index b9aea81..7f05c8c 100644 --- a/src/UuidV7Factory.cs +++ b/src/UuidV7Factory.cs @@ -153,7 +153,7 @@ private void FillGuids(Span destination) $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); default: - throw new ArgumentOutOfRangeException(); + throw new ArgumentOutOfRangeException(nameof(_effectiveOverflowBehavior)); } } @@ -233,7 +233,7 @@ private void FillGuids(Span destination) $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); default: - throw new ArgumentOutOfRangeException(); + throw new ArgumentOutOfRangeException(nameof(_effectiveOverflowBehavior)); } } else From 020611d35a75813d27a60b57884050186ee39f4e Mon Sep 17 00:00:00 2001 From: Dexter Ajoku Date: Thu, 19 Feb 2026 19:22:48 +0100 Subject: [PATCH 13/13] Replace ArgumentOutOfRangeException with UnreachableException Switched default branches in two switch statements to throw UnreachableException instead of ArgumentOutOfRangeException, clarifying that these code paths should never be reached. This improves code semantics and intent. --- src/UuidV7Factory.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/UuidV7Factory.cs b/src/UuidV7Factory.cs index 7f05c8c..c296567 100644 --- a/src/UuidV7Factory.cs +++ b/src/UuidV7Factory.cs @@ -1,5 +1,6 @@ using Clockworks.Abstractions; using System.Buffers.Binary; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Security.Cryptography; @@ -153,7 +154,7 @@ private void FillGuids(Span destination) $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); default: - throw new ArgumentOutOfRangeException(nameof(_effectiveOverflowBehavior)); + throw new UnreachableException(); } } @@ -233,7 +234,7 @@ private void FillGuids(Span destination) $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); default: - throw new ArgumentOutOfRangeException(nameof(_effectiveOverflowBehavior)); + throw new UnreachableException(); } } else