diff --git a/README.md b/README.md index 5d0bbae..f44b3fb 100644 --- a/README.md +++ b/README.md @@ -25,12 +25,13 @@ It is built around `TimeProvider` so that *time becomes an injectable dependency - **Hybrid Logical Clock (HLC)** - HLC timestamps and utilities to preserve causality in distributed simulations - Helpers to witness remote timestamps and generate outbound timestamps + - Provides both a canonical 10-byte big-endian encoding (`HlcTimestamp.WriteTo`/`ReadFrom`) and an optimized packed 64-bit encoding (`ToPackedInt64`/`FromPackedInt64`) - **Vector Clock** - Full vector clock implementation for exact causality tracking and concurrency detection - Sorted-array representation optimized for sparse clocks - - `VectorClockCoordinator` for thread-safe clock management across distributed nodes - - Message header propagation for HTTP/gRPC integration + - `VectorClockCoordinator` for thread-safe clock management across distributed nodes (allocation-conscious hot path) + - Canonical binary wire format (`VectorClock.WriteTo`/`ReadFrom`) and string form for HTTP/gRPC headers - **Lightweight instrumentation** - Counters for timers, advances, and timeouts useful in simulation/test assertions @@ -100,6 +101,8 @@ In Clockworks, a "remote timestamp" is the `HlcTimestamp` produced on a differen via `HlcMessageHeader` (format: `walltime.counter@node`). The receiver should call `BeforeReceive(...)` with that timestamp to preserve causality. +Note: `HlcTimestamp.ToPackedInt64()`/`FromPackedInt64()` is an optimization encoding with a 48-bit wall time and a 4-bit node id (node id is truncated). Use `WriteTo`/`ReadFrom` when you need a full-fidelity representation. + ```csharp var tp = new SimulatedTimeProvider(); @@ -192,6 +195,13 @@ Console.WriteLine(clockA.IsConcurrentWith(clockB)); // true // HLC would show one as "less than" the other based on physical time ``` +**Wire formats** + +- **Binary (canonical):** `VectorClock.WriteTo`/`VectorClock.ReadFrom` + - Format: `[count:u32 big-endian][(nodeId:u16 big-endian, counter:u64 big-endian)]*` + - `ReadFrom` canonicalizes unsorted input and deduplicates node IDs by taking the maximum counter. +- **String (for headers):** `VectorClock.ToString()` / `VectorClock.Parse(...)` (`"node:counter,node:counter"`, sorted by node id) + ## Demos ### Console demos (`demo/Clockworks.Demo`) diff --git a/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs b/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs index 8cb5be1..baafa3d 100644 --- a/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs +++ b/demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs @@ -2,7 +2,6 @@ using Clockworks.Distributed; using Clockworks.Instrumentation; using Clockworks.Demo.Infrastructure; -using System.Collections.Concurrent; using System.Collections.Generic; using static System.Console; @@ -142,11 +141,12 @@ private sealed class Node private readonly TimeProvider _tp; private readonly UuidV7Factory _uuid; private readonly BoundedInboxDedupe _inbox; - private readonly ConcurrentDictionary> _acks = new(); - private readonly ConcurrentDictionary _retries = new(); - private readonly ConcurrentDictionary _attempts = new(StringComparer.OrdinalIgnoreCase); - private readonly ConcurrentDictionary _correlationIds = new(StringComparer.OrdinalIgnoreCase); - private readonly ConcurrentDictionary _templates = new(); + // This demo is single-threaded and deterministic, so we intentionally use non-concurrent collections. + private readonly Dictionary> _acks = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _retries = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _attempts = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _correlationIds = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _templates = new(); public string Name { get; } public HlcCoordinator Hlc { get; } @@ -171,7 +171,7 @@ public Node(string name, ushort nodeId, SimulatedTimeProvider tp, SimulatedNetwo Vector = new VectorClockCoordinator(nodeId); } - public bool IsDone => _acks.IsEmpty && _retries.IsEmpty; + public bool IsDone => _acks.Count == 0 && _retries.Count == 0; public void StartNewOrder(string orderId) { @@ -180,7 +180,7 @@ public void StartNewOrder(string orderId) var ts = Hlc.BeforeSend(); var correlationId = _uuid.NewGuid(); - _acks[orderId] = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + _acks[orderId] = new HashSet(StringComparer.OrdinalIgnoreCase); _attempts[orderId] = 1; _correlationIds[orderId] = correlationId; @@ -246,28 +246,27 @@ private void HandleAck(Message msg) return; } - fromSet.TryAdd(msg.From, 0); + fromSet.Add(msg.From); - if (fromSet.ContainsKey(PaymentsNodeName) && fromSet.ContainsKey(InventoryNodeName)) + if (fromSet.Contains(PaymentsNodeName) && fromSet.Contains(InventoryNodeName)) { - if (_retries.TryRemove(msg.OrderId, out var handle)) - { + if (_retries.Remove(msg.OrderId, out var handle)) handle.Dispose(); - } - _acks.TryRemove(msg.OrderId, out _); - _attempts.TryRemove(msg.OrderId, out _); - _correlationIds.TryRemove(msg.OrderId, out _); + _acks.Remove(msg.OrderId); + _attempts.Remove(msg.OrderId); + _correlationIds.Remove(msg.OrderId); // Clean up templates to prevent unbounded growth in long simulations - _templates.TryRemove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, PaymentsNodeName), out _); - _templates.TryRemove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, InventoryNodeName), out _); + _templates.Remove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, PaymentsNodeName)); + _templates.Remove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, InventoryNodeName)); } } public void Poll() { - // Snapshot to avoid collection modification during iteration + // Snapshot to avoid collection modification during iteration. + // (Timeout callbacks are driven by SimulatedTimeProvider; we still keep iteration safe.) var snapshot = _retries.ToArray(); foreach (var (orderId, handle) in snapshot) { @@ -287,10 +286,7 @@ private void ScheduleRetry(string orderId, int attempt) var handle = Timeouts.CreateTimeoutHandle(_tp, TimeSpan.FromMilliseconds(50), statistics: TimeoutStats); if (!_retries.TryAdd(orderId, handle)) - { handle.Dispose(); - return; - } } private void TryRetry(string orderId) @@ -301,13 +297,14 @@ private void TryRetry(string orderId) return; } - if (fromSet.ContainsKey(PaymentsNodeName) && fromSet.ContainsKey(InventoryNodeName)) + if (fromSet.Contains(PaymentsNodeName) && fromSet.Contains(InventoryNodeName)) { CleanupRetry(orderId); return; } - var nextAttempt = _attempts.AddOrUpdate(orderId, 2, static (_, current) => current + 1); + var nextAttempt = _attempts.TryGetValue(orderId, out var currentAttempt) ? currentAttempt + 1 : 2; + _attempts[orderId] = nextAttempt; if (nextAttempt > 10) { CleanupRetry(orderId); @@ -334,7 +331,7 @@ private MessageTemplate GetOrCreateTemplate(LogicalMessageKey key, Guid correlat if (_templates.TryGetValue(key, out var existing)) return existing; - if (hlc == default || vc.Equals(default(VectorClock))) + if (hlc == default || vc.IsEmpty) { // This should not happen for normal flow (templates are created during StartNewOrder), // but if it does, synthesize a reasonable envelope. @@ -354,15 +351,14 @@ private MessageTemplate GetOrCreateTemplate(LogicalMessageKey key, Guid correlat Hlc: hlc, VectorClock: vc); - return _templates.GetOrAdd(key, template); + _templates[key] = template; + return template; } private void CleanupRetry(string orderId) { - if (_retries.TryRemove(orderId, out var existing)) - { + if (_retries.Remove(orderId, out var existing)) existing.Dispose(); - } } private void Send(Message msg) => _network.Send(msg); @@ -562,8 +558,9 @@ private sealed class SimulatedNetwork(SimulatedTimeProvider tp, FailureInjector private readonly Lock _lock = new(); private readonly SimulatedTimeProvider _tp = tp; private readonly FailureInjector _failures = failures; - private readonly ConcurrentDictionary _nodes = new(StringComparer.OrdinalIgnoreCase); - private readonly List _inFlight = []; + private readonly Dictionary _nodes = new(StringComparer.OrdinalIgnoreCase); + private readonly PriorityQueue _inFlight = new(); + private long _sequence; public MessagingStatistics Stats { get; } = new(); @@ -595,7 +592,8 @@ public long? NextDueUtcMs { if (_inFlight.Count == 0) return null; - return _inFlight.Min(m => m.DeliverAtUtcMs); + + return _inFlight.Peek().Priority.DeliverAtUtcMs; } } } @@ -617,11 +615,11 @@ public void Send(Message msg) lock (_lock) { - _inFlight.Add(new Envelope(msg, deliverAt)); + Enqueue(msg, deliverAt); if (_failures.ShouldDuplicate()) { - _inFlight.Add(new Envelope(msg with { }, deliverAt + _failures.AdditionalDelayMs())); + Enqueue(msg with { }, deliverAt + _failures.AdditionalDelayMs()); Stats.RecordDuplicated(_inFlight.Count); } @@ -629,9 +627,9 @@ public void Send(Message msg) if (_failures.ShouldReorder() && _inFlight.Count >= 2) { - var i = _inFlight.Count - 1; - var j = Math.Max(0, i - 1); - (_inFlight[i], _inFlight[j]) = (_inFlight[j], _inFlight[i]); + // Model reordering as small bounded jitter on delivery time. + // Clamp to >= now so delivery remains time-driven (no "instant" negative delay). + ApplyReorderJitterUnsafe(now); Stats.RecordReordered(); } } @@ -647,14 +645,14 @@ public void DeliverDue() if (_inFlight.Count == 0) return; - due = _inFlight.Where(m => m.DeliverAtUtcMs <= now).ToList(); - if (due.Count == 0) - return; - - foreach (var m in due) + due = []; + while (_inFlight.Count > 0 && _inFlight.Peek().Priority.DeliverAtUtcMs <= now) { - _inFlight.Remove(m); + due.Add(_inFlight.Dequeue()); } + + if (due.Count == 0) + return; } foreach (var env in due) @@ -669,6 +667,41 @@ public void DeliverDue() } } + private void Enqueue(Message msg, long deliverAtUtcMs) + { + var env = new Envelope(msg); + var priority = new DeliveryPriority(deliverAtUtcMs, unchecked(++_sequence)); + env.Priority = priority; + _inFlight.Enqueue(env, priority); + } + + private void ApplyReorderJitterUnsafe(long nowUtcMs) + { + if (_inFlight.Count < 2) + return; + + // PriorityQueue has no decrease-key; rebuild for a tiny demo workload. + var entries = new List(_inFlight.Count); + while (_inFlight.Count > 0) + { + entries.Add(_inFlight.Dequeue()); + } + + var jitterWindowMs = Math.Min(10, Math.Max(1, _failures.MaxAdditionalDelayMs / 20)); + for (var i = Math.Max(0, entries.Count - 2); i < entries.Count; i++) + { + var env = entries[i]; + var jitter = _failures.AdditionalDelayMs() % (jitterWindowMs + 1); + var newDeliverAt = Math.Max(nowUtcMs, env.Priority.DeliverAtUtcMs - jitter); + env.Priority = env.Priority with { DeliverAtUtcMs = newDeliverAt }; + } + + foreach (var env in entries) + { + _inFlight.Enqueue(env, env.Priority); + } + } + private void ObserveConcurrency(Message delivered) { lock (_lock) @@ -700,7 +733,21 @@ private void ObserveConcurrency(Message delivered) } } - private sealed record Envelope(Message Message, long DeliverAtUtcMs); + private sealed record Envelope(Message Message) + { + public DeliveryPriority Priority { get; set; } + } + + private readonly record struct DeliveryPriority(long DeliverAtUtcMs, long Sequence) : IComparable + { + public int CompareTo(DeliveryPriority other) + { + var cmp = DeliverAtUtcMs.CompareTo(other.DeliverAtUtcMs); + if (cmp != 0) + return cmp; + return Sequence.CompareTo(other.Sequence); + } + } private sealed record DeliveredEvent(Guid EventId, MessageKind Kind, string From, string To, VectorClock VectorClock); } diff --git a/src/Distributed/HlcTimestamp.cs b/src/Distributed/HlcTimestamp.cs index 88ab2ba..d477e6c 100644 --- a/src/Distributed/HlcTimestamp.cs +++ b/src/Distributed/HlcTimestamp.cs @@ -19,6 +19,31 @@ namespace Clockworks.Distributed; /// public readonly record struct HlcTimestamp : IComparable, IComparable { + /// + /// Number of bits allocated to the wall time component in . + /// + public const int PackedWallTimeBits = 48; + + /// + /// Number of bits allocated to the counter component in . + /// + public const int PackedCounterBits = 12; + + /// + /// Number of bits allocated to node id in . + /// + public const int PackedNodeIdBits = 4; + + /// + /// Bit mask applied to in . + /// + public const ushort PackedCounterMask = 0x0FFF; + + /// + /// Bit mask applied to in . + /// + public const ushort PackedNodeIdMask = 0x000F; + /// /// Logical wall time in milliseconds since Unix epoch. /// May drift ahead of physical time to maintain causality. @@ -52,16 +77,23 @@ public HlcTimestamp(long wallTimeMs, ushort counter = 0, ushort nodeId = 0) /// /// Packs the timestamp into a 64-bit value for efficient storage/transmission. - /// Layout: [48 bits wall time][12 bits counter][4 bits node (truncated)] + /// Layout: [48 bits wall time][12 bits counter][4 bits node id]. /// + /// + /// This packed encoding is an optimization format. + /// + /// is truncated to the lower 4 bits (0-15). + /// Use / for a full-fidelity encoding. + /// + /// public long ToPackedInt64() { // 48 bits for timestamp (good until year 10889) // 12 bits for counter (0-4095) // 4 bits for node ID (0-15, use for small clusters) ulong wall = (ulong)WallTimeMs; - ulong counter = (ulong)(Counter & 0x0FFF); - ulong node = (ulong)(NodeId & 0x000F); + ulong counter = (ulong)(Counter & PackedCounterMask); + ulong node = (ulong)(NodeId & PackedNodeIdMask); ulong packed = (wall << 16) | (counter << 4) | node; return unchecked((long)packed); @@ -72,10 +104,11 @@ public long ToPackedInt64() /// public static HlcTimestamp FromPackedInt64(long packed) { + var u = unchecked((ulong)packed); return new HlcTimestamp( - wallTimeMs: packed >> 16, - counter: (ushort)((packed >> 4) & 0xFFF), - nodeId: (ushort)(packed & 0xF) + wallTimeMs: (long)((u >> 16) & ((1UL << PackedWallTimeBits) - 1UL)), + counter: (ushort)((u >> PackedNodeIdBits) & PackedCounterMask), + nodeId: (ushort)(u & PackedNodeIdMask) ); } diff --git a/src/Distributed/VectorClock.cs b/src/Distributed/VectorClock.cs index e90a463..26f2328 100644 --- a/src/Distributed/VectorClock.cs +++ b/src/Distributed/VectorClock.cs @@ -1,5 +1,6 @@ using System.Buffers.Binary; using System.Globalization; +using System.Runtime.InteropServices; namespace Clockworks.Distributed; @@ -54,6 +55,11 @@ public enum VectorClockOrder private readonly ulong[]? _counters; private const int MaxEntries = ushort.MaxValue + 1; + /// + /// True when the vector clock contains no entries. + /// + public bool IsEmpty => _nodeIds == null || _counters == null || _nodeIds.Length == 0; + /// /// Creates an empty vector clock. /// @@ -69,6 +75,24 @@ private VectorClock(ushort[] nodeIds, ulong[] counters) _counters = counters; } + internal int Count => _nodeIds?.Length ?? 0; + + internal void CopyTo(Span nodeIds, Span counters) + { + if (_nodeIds == null || _counters == null) + return; + + _nodeIds.CopyTo(nodeIds); + _counters.CopyTo(counters); + } + + internal static VectorClock CreateUnsafe(ushort[] nodeIds, ulong[] counters) + { + if (nodeIds.Length != counters.Length) + throw new ArgumentException("nodeIds and counters must have same length."); + return new VectorClock(nodeIds, counters); + } + /// /// Gets the counter value for a specific node ID. /// Returns 0 if the node ID is not present in this vector clock. @@ -284,7 +308,8 @@ public VectorClockOrder Compare(VectorClock other) /// /// Writes this vector clock to a binary representation. - /// Format: [count:uint][nodeId:ushort,counter:ulong]* + /// Format: [count:u32 big-endian][(nodeId:u16 big-endian, counter:u64 big-endian)]*. + /// The in-memory representation is canonical (sorted by nodeId), so the serialized form is canonical. /// public void WriteTo(Span destination) { @@ -430,24 +455,36 @@ private static VectorClock CreateCanonical(List<(ushort nodeId, ulong counter)> if (pairs.Count == 0) return new VectorClock(); - pairs.Sort((a, b) => a.nodeId.CompareTo(b.nodeId)); + if (pairs.Count == 1) + { + var (nodeId, counter) = pairs[0]; + return new VectorClock([nodeId], [counter]); + } - var nodeIds = new List(pairs.Count); - var counters = new List(pairs.Count); - foreach (var pair in pairs) + var maxByNodeId = new Dictionary(capacity: pairs.Count); + foreach (var (nodeId, counter) in pairs) { - if (nodeIds.Count > 0 && nodeIds[^1] == pair.nodeId) - { - if (pair.counter > counters[^1]) - counters[^1] = pair.counter; - continue; - } + ref var existing = ref CollectionsMarshal.GetValueRefOrAddDefault(maxByNodeId, nodeId, out var exists); + if (!exists || counter > existing) + existing = counter; + } + + if (maxByNodeId.Count == 0) + return new VectorClock(); - nodeIds.Add(pair.nodeId); - counters.Add(pair.counter); + var sorted = maxByNodeId.Keys.ToArray(); + Array.Sort(sorted); + + var nodeIds = new ushort[sorted.Length]; + var counters = new ulong[sorted.Length]; + for (var i = 0; i < sorted.Length; i++) + { + var nodeId = sorted[i]; + nodeIds[i] = nodeId; + counters[i] = maxByNodeId[nodeId]; } - return new VectorClock([.. nodeIds], [.. counters]); + return new VectorClock(nodeIds, counters); } /// diff --git a/src/Distributed/VectorClockBuilder.cs b/src/Distributed/VectorClockBuilder.cs new file mode 100644 index 0000000..8d840d9 --- /dev/null +++ b/src/Distributed/VectorClockBuilder.cs @@ -0,0 +1,179 @@ +using System.Buffers; +using System.Buffers.Binary; + +namespace Clockworks.Distributed; + +internal sealed class VectorClockBuilder +{ + private const int MaxEntries = ushort.MaxValue + 1; + + private ushort[] _nodeIds; + private ulong[] _counters; + private int _count; + + public VectorClockBuilder(int initialCapacity = 4) + { + ArgumentOutOfRangeException.ThrowIfNegative(initialCapacity); + + _nodeIds = initialCapacity == 0 ? [] : new ushort[initialCapacity]; + _counters = initialCapacity == 0 ? [] : new ulong[initialCapacity]; + _count = 0; + } + + public bool IsEmpty => _count == 0; + + public void Reset(VectorClock snapshot) + { + var count = snapshot.Count; + EnsureCapacity(count); + _count = count; + + if (count == 0) + return; + + snapshot.CopyTo(_nodeIds.AsSpan(0, count), _counters.AsSpan(0, count)); + } + + public void Increment(ushort nodeId) + { + if (_count == 0) + { + EnsureCapacity(1); + _nodeIds[0] = nodeId; + _counters[0] = 1; + _count = 1; + return; + } + + var index = Array.BinarySearch(_nodeIds, 0, _count, nodeId); + if (index >= 0) + { + _counters[index]++; + return; + } + + if (_count >= MaxEntries) + throw new InvalidOperationException($"Cannot increment: vector clock at max capacity ({MaxEntries})"); + + var insertIndex = ~index; + EnsureCapacity(_count + 1); + + if (insertIndex < _count) + { + Array.Copy(_nodeIds, insertIndex, _nodeIds, insertIndex + 1, _count - insertIndex); + Array.Copy(_counters, insertIndex, _counters, insertIndex + 1, _count - insertIndex); + } + + _nodeIds[insertIndex] = nodeId; + _counters[insertIndex] = 1; + _count++; + } + + public void Merge(VectorClock other) + { + if (other.IsEmpty) + return; + + // Decode directly from canonical binary format to avoid reliance on VectorClock internals. + // This is linear in number of entries and does not allocate per entry. + var size = other.GetBinarySize(); + if (size <= 1024) + { + Span buffer = stackalloc byte[size]; + MergeFromBuffer(other, buffer); + return; + } + + var rented = ArrayPool.Shared.Rent(size); + try + { + var buffer = rented.AsSpan(0, size); + MergeFromBuffer(other, buffer); + } + finally + { + ArrayPool.Shared.Return(rented); + } + } + + private void MergeFromBuffer(VectorClock other, Span buffer) + { + other.WriteTo(buffer); + + var count = BinaryPrimitives.ReadUInt32BigEndian(buffer); + var offset = 4; + for (var i = 0; i < count; i++) + { + var nodeId = BinaryPrimitives.ReadUInt16BigEndian(buffer.Slice(offset, 2)); + offset += 2; + var counter = BinaryPrimitives.ReadUInt64BigEndian(buffer.Slice(offset, 8)); + offset += 8; + + MergeEntry(nodeId, counter); + } + } + + public VectorClock ToSnapshot() + { + if (_count == 0) + return new VectorClock(); + + var nodeIds = new ushort[_count]; + var counters = new ulong[_count]; + Array.Copy(_nodeIds, nodeIds, _count); + Array.Copy(_counters, counters, _count); + return VectorClock.CreateUnsafe(nodeIds, counters); + } + + private void MergeEntry(ushort nodeId, ulong counter) + { + if (_count == 0) + { + EnsureCapacity(1); + _nodeIds[0] = nodeId; + _counters[0] = counter; + _count = 1; + return; + } + + var index = Array.BinarySearch(_nodeIds, 0, _count, nodeId); + if (index >= 0) + { + if (counter > _counters[index]) + _counters[index] = counter; + return; + } + + if (_count >= MaxEntries) + throw new InvalidOperationException($"Cannot merge: vector clock at max capacity ({MaxEntries})"); + + var insertIndex = ~index; + EnsureCapacity(_count + 1); + + if (insertIndex < _count) + { + Array.Copy(_nodeIds, insertIndex, _nodeIds, insertIndex + 1, _count - insertIndex); + Array.Copy(_counters, insertIndex, _counters, insertIndex + 1, _count - insertIndex); + } + + _nodeIds[insertIndex] = nodeId; + _counters[insertIndex] = counter; + _count++; + } + + private void EnsureCapacity(int needed) + { + if (needed > MaxEntries) + throw new InvalidOperationException($"Vector clock cannot contain more than {MaxEntries} entries."); + + if (_nodeIds.Length >= needed) + return; + + var next = Math.Max(needed, _nodeIds.Length == 0 ? 4 : _nodeIds.Length * 2); + if (next > MaxEntries) + next = MaxEntries; + + Array.Resize(ref _nodeIds, next); + Array.Resize(ref _counters, next); + } +} diff --git a/src/Distributed/VectorClockCoordinator.cs b/src/Distributed/VectorClockCoordinator.cs index fc45719..c662332 100644 --- a/src/Distributed/VectorClockCoordinator.cs +++ b/src/Distributed/VectorClockCoordinator.cs @@ -28,6 +28,7 @@ public sealed class VectorClockCoordinator private readonly ushort _nodeId; private readonly Lock _lock = new(); private VectorClock _current; + private readonly VectorClockBuilder _builder = new(); /// /// Statistics for monitoring vector clock behavior. @@ -68,7 +69,12 @@ public void BeforeReceive(VectorClock remote) lock (_lock) { var before = _current; - _current = _current.Merge(remote).Increment(_nodeId); + + _builder.Reset(_current); + _builder.Merge(remote); + _builder.Increment(_nodeId); + _current = _builder.ToSnapshot(); + Statistics.RecordReceive(before, _current, remote); } } @@ -82,7 +88,10 @@ public VectorClock BeforeSend() { lock (_lock) { - _current = _current.Increment(_nodeId); + _builder.Reset(_current); + _builder.Increment(_nodeId); + _current = _builder.ToSnapshot(); + var snapshot = _current; Statistics.RecordSend(snapshot); return snapshot; @@ -97,7 +106,10 @@ public void NewLocalEvent() { lock (_lock) { - _current = _current.Increment(_nodeId); + _builder.Reset(_current); + _builder.Increment(_nodeId); + _current = _builder.ToSnapshot(); + Statistics.RecordLocalEvent(_current); } } diff --git a/src/UuidV7Factory.cs b/src/UuidV7Factory.cs index 622b72a..c296567 100644 --- a/src/UuidV7Factory.cs +++ b/src/UuidV7Factory.cs @@ -1,4 +1,6 @@ using Clockworks.Abstractions; +using System.Buffers.Binary; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Security.Cryptography; @@ -109,9 +111,72 @@ public Guid NewGuid() /// public void NewGuids(Span destination) { - for (int i = 0; i < destination.Length; i++) + if (destination.Length == 0) + return; + + FillGuids(destination); + } + + private void FillGuids(Span destination) + { + var i = 0; + var spinWait = new SpinWait(); + + while (i < destination.Length) { - destination[i] = NewGuid(); + var currentPacked = Volatile.Read(ref _packedState); + var (currentTimestamp, currentCounter) = UnpackState(currentPacked); + + var physicalTime = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds(); + + var baseTimestamp = physicalTime > currentTimestamp ? physicalTime : currentTimestamp; + + // If we moved forward to a new physical millisecond, start a fresh random counter window. + // Otherwise continue from the current counter. + var startCounter = physicalTime > currentTimestamp ? GetRandomCounterStart() : (ushort)(currentCounter + 1); + + if (startCounter > MaxCounterValue) + { + // Counter overflow at this millisecond. + switch (_effectiveOverflowBehavior) + { + case CounterOverflowBehavior.SpinWait: + SpinWaitForNextMillisecond(baseTimestamp); + continue; + + case CounterOverflowBehavior.IncrementTimestamp: + baseTimestamp = currentTimestamp + 1; + startCounter = GetRandomCounterStart(); + break; + + case CounterOverflowBehavior.ThrowException: + throw new InvalidOperationException( + $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); + + default: + throw new UnreachableException(); + } + } + + var remainingInMs = MaxCounterValue - startCounter; + var available = Math.Min(destination.Length - i, remainingInMs + 1); + + var newTimestamp = baseTimestamp; + var newCounter = (ushort)(startCounter + available - 1); + var newPacked = PackState(newTimestamp, newCounter); + + if (Interlocked.CompareExchange(ref _packedState, newPacked, currentPacked) != currentPacked) + { + spinWait.SpinOnce(); + continue; + } + + for (var j = 0; j < available; j++) + { + destination[i + j] = CreateGuidFromState(baseTimestamp, (ushort)(startCounter + j)); + } + + i += available; } } @@ -169,7 +234,7 @@ public void NewGuids(Span destination) $"Counter overflow: generated {MaxCounterValue + 1} UUIDs within millisecond {currentTimestamp}"); default: - throw new ArgumentOutOfRangeException(); + throw new UnreachableException(); } } else @@ -250,7 +315,8 @@ private ushort GetRandomCounterStart() // Start in lower half of counter space to leave room for increments // This reduces collision probability across instances starting in the same ms var bytes = _randomBuffer.Value!.GetBytes(2); - return (ushort)((bytes[0] | (bytes[1] << 8)) & CounterRandomStart); + var value = BinaryPrimitives.ReadUInt16LittleEndian(bytes); + return (ushort)(value & CounterRandomStart); } private void SpinWaitForNextMillisecond(long currentMs) diff --git a/tests/HlcTimestampTests.cs b/tests/HlcTimestampTests.cs index 8b91590..cd5015d 100644 --- a/tests/HlcTimestampTests.cs +++ b/tests/HlcTimestampTests.cs @@ -30,4 +30,20 @@ public void ToPackedInt64_FromPackedInt64_RoundTrips_WithNodeTruncation() Assert.Equal((ushort)(ts.Counter & 0x0FFF), parsed.Counter); Assert.Equal((ushort)(ts.NodeId & 0x000F), parsed.NodeId); } + + [Fact] + public void FromPackedInt64_DoesNotSignExtendWallTime_WhenPackedHighBitSet() + { + // Pick a 48-bit wall time where bit 47 is set; after packing (<< 16) this becomes bit 63. + var wall48 = (1L << 47) + 123; + var ts = new HlcTimestamp(wallTimeMs: wall48, counter: 1, nodeId: 2); + + var packed = ts.ToPackedInt64(); + Assert.True(packed < 0, "Packed representation should have sign bit set for this case."); + + var decoded = HlcTimestamp.FromPackedInt64(packed); + Assert.Equal(wall48, decoded.WallTimeMs); + Assert.Equal((ushort)1, decoded.Counter); + Assert.Equal((ushort)2, decoded.NodeId); + } } diff --git a/tests/UuidV7FactoryTests.cs b/tests/UuidV7FactoryTests.cs index 004f1d4..ed7a4fa 100644 --- a/tests/UuidV7FactoryTests.cs +++ b/tests/UuidV7FactoryTests.cs @@ -4,6 +4,46 @@ namespace Clockworks.Tests; public sealed class UuidV7FactoryTests { + [Fact] + public void NewGuids_IsMonotonic_AndUnique_WhenTimeDoesNotAdvance() + { + var time = SimulatedTimeProvider.FromUnixMs(1_700_000_000_000); + using var rng = new DeterministicRandomNumberGenerator(seed: 1); + using var factory = new UuidV7Factory(time, rng, overflowBehavior: CounterOverflowBehavior.IncrementTimestamp); + + Span batch = stackalloc Guid[256]; + factory.NewGuids(batch); + + for (var i = 1; i < batch.Length; i++) + { + Assert.True(batch[i - 1] < batch[i]); + } + + var set = new HashSet(); + foreach (var g in batch) + Assert.True(set.Add(g)); + + // Time does not need to advance. + Assert.Equal(1_700_000_000_000, time.GetUtcNow().ToUnixTimeMilliseconds()); + } + + [Fact] + public void NewGuids_RemainsMonotonic_AcrossCounterOverflow_WhenIncrementTimestampEnabled() + { + var time = SimulatedTimeProvider.FromUnixMs(1_700_000_000_000); + using var rng = new DeterministicRandomNumberGenerator(seed: 1); + using var factory = new UuidV7Factory(time, rng, overflowBehavior: CounterOverflowBehavior.IncrementTimestamp); + + // Large enough to cross the 12-bit counter range regardless of random start. + var arr = new Guid[5000]; + factory.NewGuids(arr); + + for (var i = 1; i < arr.Length; i++) + { + Assert.True(arr[i - 1] < arr[i]); + } + } + [Fact] public void NewGuid_IsMonotonic_WhenTimeDoesNotAdvance() {