Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## General Guidelines
- First general instruction
- Second general instruction
- Prefer updating documentation comments for mathematical/complexity accuracy when refactoring; keep wire format and semantics unchanged unless explicitly requested.

## Versioning and Git Practices
- Treat recent Clockworks changes as a minor version bump (semver).
Expand Down
169 changes: 92 additions & 77 deletions src/Distributed/VectorClock.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Buffers.Binary;
using System.Globalization;

namespace Clockworks.Distributed;
Expand Down Expand Up @@ -33,7 +34,7 @@ public enum VectorClockOrder
///
/// <para>
/// Uses a performance-first sorted-array representation with parallel arrays
/// for node IDs and counters. This provides O(n) merge and compare operations
/// for node IDs and counters. This provides O(n+m) merge and compare operations
/// while maintaining deterministic canonical ordering by node ID.
/// </para>
///
Expand Down Expand Up @@ -100,14 +101,11 @@ public VectorClock Increment(ushort nodeId)
var index = Array.BinarySearch(_nodeIds, nodeId);
if (index >= 0)
{
// Node exists, increment its counter
// Must copy both arrays to maintain immutability
var newNodeIds = new ushort[_nodeIds.Length];
// Node exists; node IDs array does not change, so it can be shared safely.
var newCounters = new ulong[_counters.Length];
Array.Copy(_nodeIds, newNodeIds, _nodeIds.Length);
Array.Copy(_counters, newCounters, _counters.Length);
newCounters[index]++;
return new VectorClock(newNodeIds, newCounters);
return new VectorClock(_nodeIds, newCounters);
}
else
{
Expand All @@ -116,14 +114,21 @@ public VectorClock Increment(ushort nodeId)
var newNodeIds = new ushort[_nodeIds.Length + 1];
var newCounters = new ulong[_counters.Length + 1];

Array.Copy(_nodeIds, 0, newNodeIds, 0, insertIndex);
Array.Copy(_counters, 0, newCounters, 0, insertIndex);
if (insertIndex > 0)
{
Array.Copy(_nodeIds, 0, newNodeIds, 0, insertIndex);
Array.Copy(_counters, 0, newCounters, 0, insertIndex);
}

newNodeIds[insertIndex] = nodeId;
newCounters[insertIndex] = 1;

Array.Copy(_nodeIds, insertIndex, newNodeIds, insertIndex + 1, _nodeIds.Length - insertIndex);
Array.Copy(_counters, insertIndex, newCounters, insertIndex + 1, _counters.Length - insertIndex);
var tail = _nodeIds.Length - insertIndex;
if (tail > 0)
{
Array.Copy(_nodeIds, insertIndex, newNodeIds, insertIndex + 1, tail);
Array.Copy(_counters, insertIndex, newCounters, insertIndex + 1, tail);
}

return new VectorClock(newNodeIds, newCounters);
}
Expand All @@ -140,79 +145,112 @@ public VectorClock Merge(VectorClock other)
if (other._nodeIds == null || other._counters == null)
return this;

// Merge two sorted arrays
var resultNodes = new List<ushort>();
var resultCounters = new List<ulong>();
// Merge two sorted arrays (upper bound = combined length)
var maxLen = _nodeIds.Length + other._nodeIds.Length;
var nodeIds = new ushort[maxLen];
var counters = new ulong[maxLen];

int i = 0, j = 0;
int i = 0, j = 0, k = 0;
while (i < _nodeIds.Length || j < other._nodeIds.Length)
{
if (i >= _nodeIds.Length)
{
// Exhausted this, add from other
resultNodes.Add(other._nodeIds[j]);
resultCounters.Add(other._counters[j]);
nodeIds[k] = other._nodeIds[j];
counters[k] = other._counters[j];
j++;
}
else if (j >= other._nodeIds.Length)
{
// Exhausted other, add from this
resultNodes.Add(_nodeIds[i]);
resultCounters.Add(_counters[i]);
nodeIds[k] = _nodeIds[i];
counters[k] = _counters[i];
i++;
}
else if (_nodeIds[i] < other._nodeIds[j])
{
resultNodes.Add(_nodeIds[i]);
resultCounters.Add(_counters[i]);
nodeIds[k] = _nodeIds[i];
counters[k] = _counters[i];
i++;
}
else if (_nodeIds[i] > other._nodeIds[j])
{
resultNodes.Add(other._nodeIds[j]);
resultCounters.Add(other._counters[j]);
nodeIds[k] = other._nodeIds[j];
counters[k] = other._counters[j];
j++;
}
else // Equal node IDs
else
{
resultNodes.Add(_nodeIds[i]);
resultCounters.Add(Math.Max(_counters[i], other._counters[j]));
nodeIds[k] = _nodeIds[i];
counters[k] = Math.Max(_counters[i], other._counters[j]);
i++;
j++;
}

k++;
}

return new VectorClock(resultNodes.ToArray(), resultCounters.ToArray());
if (k == maxLen)
return new VectorClock(nodeIds, counters);

Array.Resize(ref nodeIds, k);
Array.Resize(ref counters, k);
return new VectorClock(nodeIds, counters);
}

/// <summary>
/// Compares this vector clock with another to determine their causal relationship.
/// </summary>
public VectorClockOrder Compare(VectorClock other)
{
if (_nodeIds == null || _counters == null)
{
if (other._nodeIds == null || other._counters == null)
return VectorClockOrder.Equal;
return VectorClockOrder.Before;
}

if (other._nodeIds == null || other._counters == null)
return VectorClockOrder.After;

var thisIsLessOrEqual = true;
var otherIsLessOrEqual = true;

// Get all unique node IDs from both clocks
var allNodes = new HashSet<ushort>();
if (_nodeIds != null)
foreach (var node in _nodeIds)
allNodes.Add(node);
if (other._nodeIds != null)
foreach (var node in other._nodeIds)
allNodes.Add(node);

foreach (var nodeId in allNodes)
// Linear merge-style comparison over two sorted arrays.
// Missing entries are treated as 0.
int i = 0, j = 0;
while (i < _nodeIds.Length || j < other._nodeIds.Length)
{
var thisValue = Get(nodeId);
var otherValue = other.Get(nodeId);
ushort nodeId;
ulong thisValue;
ulong otherValue;

if (j >= other._nodeIds.Length || (i < _nodeIds.Length && _nodeIds[i] < other._nodeIds[j]))
{
nodeId = _nodeIds[i];
thisValue = _counters[i];
otherValue = 0;
i++;
}
else if (i >= _nodeIds.Length || _nodeIds[i] > other._nodeIds[j])
{
nodeId = other._nodeIds[j];
thisValue = 0;
otherValue = other._counters[j];
j++;
}
else
{
nodeId = _nodeIds[i];
thisValue = _counters[i];
otherValue = other._counters[j];
i++;
j++;
}

if (thisValue > otherValue)
thisIsLessOrEqual = false; // this > other, so this is NOT ≤ other
if (otherValue > thisValue)
otherIsLessOrEqual = false; // other > this, so other is NOT ≤ this
thisIsLessOrEqual = false;
else if (otherValue > thisValue)
otherIsLessOrEqual = false;

// Early exit if we know they're concurrent
if (!thisIsLessOrEqual && !otherIsLessOrEqual)
return VectorClockOrder.Concurrent;
}
Expand All @@ -223,7 +261,6 @@ public VectorClockOrder Compare(VectorClock other)
return VectorClockOrder.Before;
if (otherIsLessOrEqual)
return VectorClockOrder.After;

return VectorClockOrder.Concurrent;
}

Expand Down Expand Up @@ -257,12 +294,7 @@ public void WriteTo(Span<byte> destination)
if (destination.Length < requiredSize)
throw new ArgumentException($"Destination must be at least {requiredSize} bytes", nameof(destination));

// Write count as big-endian uint
var countValue = (uint)count;
destination[0] = (byte)(countValue >> 24);
destination[1] = (byte)(countValue >> 16);
destination[2] = (byte)(countValue >> 8);
destination[3] = (byte)countValue;
BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)count);

if (count == 0)
return;
Expand All @@ -273,19 +305,10 @@ public void WriteTo(Span<byte> destination)
var nodeId = _nodeIds![i];
var counter = _counters![i];

// Write nodeId as big-endian ushort
destination[offset++] = (byte)(nodeId >> 8);
destination[offset++] = (byte)nodeId;

// Write counter as big-endian ulong
destination[offset++] = (byte)(counter >> 56);
destination[offset++] = (byte)(counter >> 48);
destination[offset++] = (byte)(counter >> 40);
destination[offset++] = (byte)(counter >> 32);
destination[offset++] = (byte)(counter >> 24);
destination[offset++] = (byte)(counter >> 16);
destination[offset++] = (byte)(counter >> 8);
destination[offset++] = (byte)counter;
BinaryPrimitives.WriteUInt16BigEndian(destination.Slice(offset, 2), nodeId);
offset += 2;
BinaryPrimitives.WriteUInt64BigEndian(destination.Slice(offset, 8), counter);
offset += 8;
}
}

Expand All @@ -306,7 +329,7 @@ public static VectorClock ReadFrom(ReadOnlySpan<byte> source)
if (source.Length < 4)
throw new ArgumentException("Source must be at least 4 bytes", nameof(source));

var count = (uint)((source[0] << 24) | (source[1] << 16) | (source[2] << 8) | source[3]);
var count = BinaryPrimitives.ReadUInt32BigEndian(source);
if (count > MaxEntries)
throw new ArgumentOutOfRangeException(nameof(source), $"Vector clock entry count {count} exceeds max {MaxEntries}.");

Expand All @@ -325,18 +348,10 @@ public static VectorClock ReadFrom(ReadOnlySpan<byte> source)
var offset = 4;
for (var i = 0; i < countValue; i++)
{
// Read nodeId as big-endian ushort
nodeIds[i] = (ushort)((source[offset++] << 8) | source[offset++]);

// Read counter as big-endian ulong
counters[i] = ((ulong)source[offset++] << 56) |
((ulong)source[offset++] << 48) |
((ulong)source[offset++] << 40) |
((ulong)source[offset++] << 32) |
((ulong)source[offset++] << 24) |
((ulong)source[offset++] << 16) |
((ulong)source[offset++] << 8) |
source[offset++];
nodeIds[i] = BinaryPrimitives.ReadUInt16BigEndian(source.Slice(offset, 2));
offset += 2;
counters[i] = BinaryPrimitives.ReadUInt64BigEndian(source.Slice(offset, 8));
offset += 8;
}

var isSortedUnique = true;
Expand Down