Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ It is built around `TimeProvider` so that *time becomes an injectable dependency
- **Hybrid Logical Clock (HLC)**
- HLC timestamps and utilities to preserve causality in distributed simulations
- Helpers to witness remote timestamps and generate outbound timestamps
- Provides both a canonical 10-byte big-endian encoding (`HlcTimestamp.WriteTo`/`ReadFrom`) and an optimized packed 64-bit encoding (`ToPackedInt64`/`FromPackedInt64`)

- **Vector Clock**
- Full vector clock implementation for exact causality tracking and concurrency detection
- Sorted-array representation optimized for sparse clocks
- `VectorClockCoordinator` for thread-safe clock management across distributed nodes
- Message header propagation for HTTP/gRPC integration
- `VectorClockCoordinator` for thread-safe clock management across distributed nodes (allocation-conscious hot path)
- Canonical binary wire format (`VectorClock.WriteTo`/`ReadFrom`) and string form for HTTP/gRPC headers

- **Lightweight instrumentation**
- Counters for timers, advances, and timeouts useful in simulation/test assertions
Expand Down Expand Up @@ -100,6 +101,8 @@ In Clockworks, a "remote timestamp" is the `HlcTimestamp` produced on a differen
via `HlcMessageHeader` (format: `walltime.counter@node`). The receiver should call `BeforeReceive(...)` with that
timestamp to preserve causality.

Note: `HlcTimestamp.ToPackedInt64()`/`FromPackedInt64()` is an optimization encoding with a 48-bit wall time and a 4-bit node id (node id is truncated). Use `WriteTo`/`ReadFrom` when you need a full-fidelity representation.

```csharp
var tp = new SimulatedTimeProvider();

Expand Down Expand Up @@ -192,6 +195,13 @@ Console.WriteLine(clockA.IsConcurrentWith(clockB)); // true
// HLC would show one as "less than" the other based on physical time
```

**Wire formats**

- **Binary (canonical):** `VectorClock.WriteTo`/`VectorClock.ReadFrom`
- Format: `[count:u32 big-endian][(nodeId:u16 big-endian, counter:u64 big-endian)]*`
- `ReadFrom` canonicalizes unsorted input and deduplicates node IDs by taking the maximum counter.
- **String (for headers):** `VectorClock.ToString()` / `VectorClock.Parse(...)` (`"node:counter,node:counter"`, sorted by node id)

## Demos

### Console demos (`demo/Clockworks.Demo`)
Expand Down
135 changes: 91 additions & 44 deletions demo/Clockworks.Demo/Demos/DistributedAtLeastOnceCausalityShowcase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Clockworks.Distributed;
using Clockworks.Instrumentation;
using Clockworks.Demo.Infrastructure;
using System.Collections.Concurrent;
using System.Collections.Generic;
using static System.Console;

Expand Down Expand Up @@ -142,11 +141,12 @@ private sealed class Node
private readonly TimeProvider _tp;
private readonly UuidV7Factory _uuid;
private readonly BoundedInboxDedupe _inbox;
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, byte>> _acks = new();
private readonly ConcurrentDictionary<string, Timeouts.TimeoutHandle> _retries = new();
private readonly ConcurrentDictionary<string, int> _attempts = new(StringComparer.OrdinalIgnoreCase);
private readonly ConcurrentDictionary<string, Guid> _correlationIds = new(StringComparer.OrdinalIgnoreCase);
private readonly ConcurrentDictionary<LogicalMessageKey, MessageTemplate> _templates = new();
// This demo is single-threaded and deterministic, so we intentionally use non-concurrent collections.
private readonly Dictionary<string, HashSet<string>> _acks = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, Timeouts.TimeoutHandle> _retries = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, int> _attempts = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, Guid> _correlationIds = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<LogicalMessageKey, MessageTemplate> _templates = new();

public string Name { get; }
public HlcCoordinator Hlc { get; }
Expand All @@ -171,7 +171,7 @@ public Node(string name, ushort nodeId, SimulatedTimeProvider tp, SimulatedNetwo
Vector = new VectorClockCoordinator(nodeId);
}

public bool IsDone => _acks.IsEmpty && _retries.IsEmpty;
public bool IsDone => _acks.Count == 0 && _retries.Count == 0;

public void StartNewOrder(string orderId)
{
Expand All @@ -180,7 +180,7 @@ public void StartNewOrder(string orderId)
var ts = Hlc.BeforeSend();
var correlationId = _uuid.NewGuid();

_acks[orderId] = new ConcurrentDictionary<string, byte>(StringComparer.OrdinalIgnoreCase);
_acks[orderId] = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
_attempts[orderId] = 1;
_correlationIds[orderId] = correlationId;

Expand Down Expand Up @@ -246,28 +246,27 @@ private void HandleAck(Message msg)
return;
}

fromSet.TryAdd(msg.From, 0);
fromSet.Add(msg.From);

if (fromSet.ContainsKey(PaymentsNodeName) && fromSet.ContainsKey(InventoryNodeName))
if (fromSet.Contains(PaymentsNodeName) && fromSet.Contains(InventoryNodeName))
{
if (_retries.TryRemove(msg.OrderId, out var handle))
{
if (_retries.Remove(msg.OrderId, out var handle))
handle.Dispose();
}

_acks.TryRemove(msg.OrderId, out _);
_attempts.TryRemove(msg.OrderId, out _);
_correlationIds.TryRemove(msg.OrderId, out _);
_acks.Remove(msg.OrderId);
_attempts.Remove(msg.OrderId);
_correlationIds.Remove(msg.OrderId);

// Clean up templates to prevent unbounded growth in long simulations
_templates.TryRemove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, PaymentsNodeName), out _);
_templates.TryRemove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, InventoryNodeName), out _);
_templates.Remove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, PaymentsNodeName));
_templates.Remove(new LogicalMessageKey(MessageKind.PlaceOrder, msg.OrderId, InventoryNodeName));
}
}

public void Poll()
{
// Snapshot to avoid collection modification during iteration
// Snapshot to avoid collection modification during iteration.
// (Timeout callbacks are driven by SimulatedTimeProvider; we still keep iteration safe.)
var snapshot = _retries.ToArray();
foreach (var (orderId, handle) in snapshot)
{
Expand All @@ -287,10 +286,7 @@ private void ScheduleRetry(string orderId, int attempt)

var handle = Timeouts.CreateTimeoutHandle(_tp, TimeSpan.FromMilliseconds(50), statistics: TimeoutStats);
if (!_retries.TryAdd(orderId, handle))
{
handle.Dispose();
return;
}
}

private void TryRetry(string orderId)
Expand All @@ -301,13 +297,14 @@ private void TryRetry(string orderId)
return;
}

if (fromSet.ContainsKey(PaymentsNodeName) && fromSet.ContainsKey(InventoryNodeName))
if (fromSet.Contains(PaymentsNodeName) && fromSet.Contains(InventoryNodeName))
{
CleanupRetry(orderId);
return;
}

var nextAttempt = _attempts.AddOrUpdate(orderId, 2, static (_, current) => current + 1);
var nextAttempt = _attempts.TryGetValue(orderId, out var currentAttempt) ? currentAttempt + 1 : 2;
_attempts[orderId] = nextAttempt;
if (nextAttempt > 10)
{
CleanupRetry(orderId);
Expand All @@ -334,7 +331,7 @@ private MessageTemplate GetOrCreateTemplate(LogicalMessageKey key, Guid correlat
if (_templates.TryGetValue(key, out var existing))
return existing;

if (hlc == default || vc.Equals(default(VectorClock)))
if (hlc == default || vc.IsEmpty)
{
// This should not happen for normal flow (templates are created during StartNewOrder),
// but if it does, synthesize a reasonable envelope.
Expand All @@ -354,15 +351,14 @@ private MessageTemplate GetOrCreateTemplate(LogicalMessageKey key, Guid correlat
Hlc: hlc,
VectorClock: vc);

return _templates.GetOrAdd(key, template);
_templates[key] = template;
return template;
}

private void CleanupRetry(string orderId)
{
if (_retries.TryRemove(orderId, out var existing))
{
if (_retries.Remove(orderId, out var existing))
existing.Dispose();
}
}

private void Send(Message msg) => _network.Send(msg);
Expand Down Expand Up @@ -562,8 +558,9 @@ private sealed class SimulatedNetwork(SimulatedTimeProvider tp, FailureInjector
private readonly Lock _lock = new();
private readonly SimulatedTimeProvider _tp = tp;
private readonly FailureInjector _failures = failures;
private readonly ConcurrentDictionary<string, Node> _nodes = new(StringComparer.OrdinalIgnoreCase);
private readonly List<Envelope> _inFlight = [];
private readonly Dictionary<string, Node> _nodes = new(StringComparer.OrdinalIgnoreCase);
private readonly PriorityQueue<Envelope, DeliveryPriority> _inFlight = new();
private long _sequence;

public MessagingStatistics Stats { get; } = new();

Expand Down Expand Up @@ -595,7 +592,8 @@ public long? NextDueUtcMs
{
if (_inFlight.Count == 0)
return null;
return _inFlight.Min(m => m.DeliverAtUtcMs);

return _inFlight.Peek().Priority.DeliverAtUtcMs;
}
}
}
Expand All @@ -617,21 +615,21 @@ public void Send(Message msg)

lock (_lock)
{
_inFlight.Add(new Envelope(msg, deliverAt));
Enqueue(msg, deliverAt);

if (_failures.ShouldDuplicate())
{
_inFlight.Add(new Envelope(msg with { }, deliverAt + _failures.AdditionalDelayMs()));
Enqueue(msg with { }, deliverAt + _failures.AdditionalDelayMs());
Stats.RecordDuplicated(_inFlight.Count);
}

Stats.RecordSent(_inFlight.Count);

if (_failures.ShouldReorder() && _inFlight.Count >= 2)
{
var i = _inFlight.Count - 1;
var j = Math.Max(0, i - 1);
(_inFlight[i], _inFlight[j]) = (_inFlight[j], _inFlight[i]);
// Model reordering as small bounded jitter on delivery time.
// Clamp to >= now so delivery remains time-driven (no "instant" negative delay).
ApplyReorderJitterUnsafe(now);
Stats.RecordReordered();
}
}
Expand All @@ -647,14 +645,14 @@ public void DeliverDue()
if (_inFlight.Count == 0)
return;

due = _inFlight.Where(m => m.DeliverAtUtcMs <= now).ToList();
if (due.Count == 0)
return;

foreach (var m in due)
due = [];
while (_inFlight.Count > 0 && _inFlight.Peek().Priority.DeliverAtUtcMs <= now)
{
_inFlight.Remove(m);
due.Add(_inFlight.Dequeue());
}

if (due.Count == 0)
return;
}

foreach (var env in due)
Expand All @@ -669,6 +667,41 @@ public void DeliverDue()
}
}

private void Enqueue(Message msg, long deliverAtUtcMs)
{
var env = new Envelope(msg);
var priority = new DeliveryPriority(deliverAtUtcMs, unchecked(++_sequence));
env.Priority = priority;
_inFlight.Enqueue(env, priority);
}

private void ApplyReorderJitterUnsafe(long nowUtcMs)
{
if (_inFlight.Count < 2)
return;

// PriorityQueue has no decrease-key; rebuild for a tiny demo workload.
var entries = new List<Envelope>(_inFlight.Count);
while (_inFlight.Count > 0)
{
entries.Add(_inFlight.Dequeue());
}

var jitterWindowMs = Math.Min(10, Math.Max(1, _failures.MaxAdditionalDelayMs / 20));
for (var i = Math.Max(0, entries.Count - 2); i < entries.Count; i++)
{
var env = entries[i];
var jitter = _failures.AdditionalDelayMs() % (jitterWindowMs + 1);
var newDeliverAt = Math.Max(nowUtcMs, env.Priority.DeliverAtUtcMs - jitter);
env.Priority = env.Priority with { DeliverAtUtcMs = newDeliverAt };
}

foreach (var env in entries)
{
_inFlight.Enqueue(env, env.Priority);
}
}

private void ObserveConcurrency(Message delivered)
{
lock (_lock)
Expand Down Expand Up @@ -700,7 +733,21 @@ private void ObserveConcurrency(Message delivered)
}
}

private sealed record Envelope(Message Message, long DeliverAtUtcMs);
private sealed record Envelope(Message Message)
{
public DeliveryPriority Priority { get; set; }
}

private readonly record struct DeliveryPriority(long DeliverAtUtcMs, long Sequence) : IComparable<DeliveryPriority>
{
public int CompareTo(DeliveryPriority other)
{
var cmp = DeliverAtUtcMs.CompareTo(other.DeliverAtUtcMs);
if (cmp != 0)
return cmp;
return Sequence.CompareTo(other.Sequence);
}
}

private sealed record DeliveredEvent(Guid EventId, MessageKind Kind, string From, string To, VectorClock VectorClock);
}
Expand Down
45 changes: 39 additions & 6 deletions src/Distributed/HlcTimestamp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,31 @@ namespace Clockworks.Distributed;
/// </remarks>
public readonly record struct HlcTimestamp : IComparable<HlcTimestamp>, IComparable
{
/// <summary>
/// Number of bits allocated to the wall time component in <see cref="ToPackedInt64"/>.
/// </summary>
public const int PackedWallTimeBits = 48;

/// <summary>
/// Number of bits allocated to the counter component in <see cref="ToPackedInt64"/>.
/// </summary>
public const int PackedCounterBits = 12;

/// <summary>
/// Number of bits allocated to node id in <see cref="ToPackedInt64"/>.
/// </summary>
public const int PackedNodeIdBits = 4;

/// <summary>
/// Bit mask applied to <see cref="Counter"/> in <see cref="ToPackedInt64"/>.
/// </summary>
public const ushort PackedCounterMask = 0x0FFF;

/// <summary>
/// Bit mask applied to <see cref="NodeId"/> in <see cref="ToPackedInt64"/>.
/// </summary>
public const ushort PackedNodeIdMask = 0x000F;

/// <summary>
/// Logical wall time in milliseconds since Unix epoch.
/// May drift ahead of physical time to maintain causality.
Expand Down Expand Up @@ -52,16 +77,23 @@ public HlcTimestamp(long wallTimeMs, ushort counter = 0, ushort nodeId = 0)

/// <summary>
/// Packs the timestamp into a 64-bit value for efficient storage/transmission.
/// Layout: [48 bits wall time][12 bits counter][4 bits node (truncated)]
/// Layout: [48 bits wall time][12 bits counter][4 bits node id].
/// </summary>
/// <remarks>
/// This packed encoding is an optimization format.
/// <para>
/// <see cref="NodeId"/> is truncated to the lower 4 bits (0-15).
/// Use <see cref="WriteTo"/> / <see cref="ReadFrom"/> for a full-fidelity encoding.
/// </para>
/// </remarks>
public long ToPackedInt64()
{
// 48 bits for timestamp (good until year 10889)
// 12 bits for counter (0-4095)
// 4 bits for node ID (0-15, use for small clusters)
ulong wall = (ulong)WallTimeMs;
ulong counter = (ulong)(Counter & 0x0FFF);
ulong node = (ulong)(NodeId & 0x000F);
ulong counter = (ulong)(Counter & PackedCounterMask);
ulong node = (ulong)(NodeId & PackedNodeIdMask);

ulong packed = (wall << 16) | (counter << 4) | node;
return unchecked((long)packed);
Expand All @@ -72,10 +104,11 @@ public long ToPackedInt64()
/// </summary>
public static HlcTimestamp FromPackedInt64(long packed)
{
var u = unchecked((ulong)packed);
return new HlcTimestamp(
wallTimeMs: packed >> 16,
counter: (ushort)((packed >> 4) & 0xFFF),
nodeId: (ushort)(packed & 0xF)
wallTimeMs: (long)((u >> 16) & ((1UL << PackedWallTimeBits) - 1UL)),
counter: (ushort)((u >> PackedNodeIdBits) & PackedCounterMask),
nodeId: (ushort)(u & PackedNodeIdMask)
);
}

Expand Down
Loading