Skip to content
Merged
69 changes: 69 additions & 0 deletions src/Distributed/HlcClusterRegistry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System.Collections.Concurrent;

namespace Clockworks.Distributed;

/// <summary>
/// Registry for tracking multiple HLC nodes in a cluster.
/// Useful for testing and simulation of distributed systems.
/// </summary>
public sealed class HlcClusterRegistry
{
private readonly ConcurrentDictionary<ushort, HlcGuidFactory> _nodes = new();
private readonly TimeProvider _sharedTimeProvider;

/// <summary>
/// Creates a registry for tracking multiple HLC nodes that share a single <see cref="TimeProvider"/>.
/// </summary>
public HlcClusterRegistry(TimeProvider timeProvider)
{
_sharedTimeProvider = timeProvider;
}

/// <summary>
/// Register a node in the cluster.
/// </summary>
public HlcGuidFactory RegisterNode(ushort nodeId, HlcOptions? options = null)
{
return _nodes.GetOrAdd(nodeId, id => new HlcGuidFactory(_sharedTimeProvider, id, options));
}

/// <summary>
/// Get a registered node.
/// </summary>
public HlcGuidFactory? GetNode(ushort nodeId)
{
return _nodes.TryGetValue(nodeId, out var factory) ? factory : null;
}

/// <summary>
/// Simulate sending a message from one node to another.
/// Updates receiver's clock based on sender's timestamp.
/// </summary>
public void SimulateMessage(ushort senderId, ushort receiverId)
{
if (!_nodes.TryGetValue(senderId, out var sender))
throw new ArgumentException($"Sender node {senderId} not registered");
if (!_nodes.TryGetValue(receiverId, out var receiver))
throw new ArgumentException($"Receiver node {receiverId} not registered");

var (_, senderTimestamp) = sender.NewGuidWithHlc();
receiver.Witness(senderTimestamp);
}

/// <summary>
/// Get all registered nodes.
/// </summary>
public IEnumerable<(ushort NodeId, HlcGuidFactory Factory)> GetAllNodes()
{
return _nodes.Select(kvp => (kvp.Key, kvp.Value));
}

/// <summary>
/// Get the maximum logical time across all nodes.
/// Useful for determining global ordering.
/// </summary>
public long GetMaxLogicalTime()
{
return _nodes.Values.Max(f => f.CurrentTimestamp.WallTimeMs);
}
}
282 changes: 0 additions & 282 deletions src/Distributed/HlcCoordinator.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using System.Collections.Concurrent;

namespace Clockworks.Distributed;

/// <summary>
Expand Down Expand Up @@ -92,283 +90,3 @@ public Guid NewLocalEventGuid()
/// </summary>
public HlcTimestamp CurrentTimestamp => _factory.CurrentTimestamp;
}

/// <summary>
/// Statistics tracking for HLC behavior analysis.
/// Thread-safe for concurrent updates.
/// </summary>
public sealed class HlcStatistics
{
private long _localEventCount;
private long _sendCount;
private long _receiveCount;
private long _clockAdvances; // Times we advanced due to remote
private long _maxObservedDrift;
private long _maxRemoteAheadMs;
private long _maxRemoteBehindMs;
private long _remoteAheadCount;

/// <summary>
/// Total number of local events recorded.
/// </summary>
public long LocalEventCount => Volatile.Read(ref _localEventCount);

/// <summary>
/// Total number of send events recorded.
/// </summary>
public long SendCount => Volatile.Read(ref _sendCount);

/// <summary>
/// Total number of receive events recorded.
/// </summary>
public long ReceiveCount => Volatile.Read(ref _receiveCount);

/// <summary>
/// Number of times the local clock advanced due to observing a remote timestamp.
/// </summary>
public long ClockAdvances => Volatile.Read(ref _clockAdvances);

/// <summary>
/// Maximum absolute observed drift (in milliseconds) between a received remote timestamp and the local timestamp prior to witnessing.
/// </summary>
public long MaxObservedDriftMs => Volatile.Read(ref _maxObservedDrift);

/// <summary>
/// Maximum observed amount (in milliseconds) by which a remote timestamp was ahead of the local timestamp prior to witnessing.
/// </summary>
public long MaxRemoteAheadMs => Volatile.Read(ref _maxRemoteAheadMs);

/// <summary>
/// Maximum observed amount (in milliseconds) by which a remote timestamp was behind the local timestamp prior to witnessing.
/// </summary>
public long MaxRemoteBehindMs => Volatile.Read(ref _maxRemoteBehindMs);

/// <summary>
/// Number of receive events where the remote timestamp was ahead of or equal to the local timestamp prior to witnessing.
/// </summary>
public long RemoteAheadCount => Volatile.Read(ref _remoteAheadCount);

internal void RecordLocalEvent(HlcTimestamp timestamp)
{
Interlocked.Increment(ref _localEventCount);
}

internal void RecordSend(HlcTimestamp timestamp)
{
Interlocked.Increment(ref _sendCount);
}

internal void RecordReceive(HlcTimestamp before, HlcTimestamp after, HlcTimestamp remote)
{
Interlocked.Increment(ref _receiveCount);

// Clock advances due to remote when the remote timestamp is ahead of the local timestamp
// prior to witnessing and the post-witness wall time matches the remote wall time.
if (remote > before && after.WallTimeMs == remote.WallTimeMs)
{
Interlocked.Increment(ref _clockAdvances);
}

var delta = remote.WallTimeMs - before.WallTimeMs;
if (delta >= 0)
{
Interlocked.Increment(ref _remoteAheadCount);
InterlockedMax(ref _maxRemoteAheadMs, delta);
}
else
{
InterlockedMax(ref _maxRemoteBehindMs, -delta);
}

// Track absolute drift
InterlockedMax(ref _maxObservedDrift, Math.Abs(delta));
}

private static void InterlockedMax(ref long location, long value)
{
long current = Volatile.Read(ref location);
while (value > current)
{
var previous = Interlocked.CompareExchange(ref location, value, current);
if (previous == current) break;
current = previous;
}
}

/// <summary>
/// Resets all counters to zero.
/// </summary>
public void Reset()
{
_localEventCount = 0;
_sendCount = 0;
_receiveCount = 0;
_clockAdvances = 0;
_maxObservedDrift = 0;
_maxRemoteAheadMs = 0;
_maxRemoteBehindMs = 0;
_remoteAheadCount = 0;
}

/// <summary>
/// Returns a human-readable summary of the current statistics.
/// </summary>
public override string ToString() =>
$"Local: {LocalEventCount}, Send: {SendCount}, Receive: {ReceiveCount}, " +
$"Advances: {ClockAdvances}, MaxDrift: {MaxObservedDriftMs}ms, " +
$"MaxAhead: {MaxRemoteAheadMs}ms, MaxBehind: {MaxRemoteBehindMs}ms";
}

/// <summary>
/// Message header for propagating HLC timestamps across service boundaries.
/// Can be serialized to/from various wire formats.
/// </summary>
public readonly record struct HlcMessageHeader
{
/// <summary>
/// Standard header name for HTTP/gRPC.
/// </summary>
public const string HeaderName = "X-HLC-Timestamp";

/// <summary>
/// The HLC timestamp to propagate.
/// </summary>
public HlcTimestamp Timestamp { get; init; }

/// <summary>
/// Optional correlation identifier.
/// </summary>
public Guid? CorrelationId { get; init; }

/// <summary>
/// Optional causation identifier.
/// </summary>
public Guid? CausationId { get; init; }

/// <summary>
/// Creates a new message header instance.
/// </summary>
public HlcMessageHeader(HlcTimestamp timestamp, Guid? correlationId = null, Guid? causationId = null)
{
Timestamp = timestamp;
CorrelationId = correlationId;
CausationId = causationId;
}

/// <summary>
/// Serialize to a compact string for HTTP headers.
/// Format: "timestamp.counter@node[;correlation;causation]"
/// </summary>
public override string ToString()
{
var result = Timestamp.ToString();
if (CorrelationId.HasValue)
{
result += $";{CorrelationId.Value:N}";
if (CausationId.HasValue)
{
result += $";{CausationId.Value:N}";
}
}
return result;
}

/// <summary>
/// Parse from header string.
/// </summary>
public static HlcMessageHeader Parse(string value)
{
var parts = value.Split(';');
var timestamp = HlcTimestamp.Parse(parts[0]);

Guid? correlation = parts.Length > 1 ? Guid.Parse(parts[1]) : null;
Guid? causation = parts.Length > 2 ? Guid.Parse(parts[2]) : null;

return new HlcMessageHeader(timestamp, correlation, causation);
}

/// <summary>
/// Try to parse from header string.
/// </summary>
public static bool TryParse(string? value, out HlcMessageHeader header)
{
header = default;
if (string.IsNullOrEmpty(value)) return false;

try
{
header = Parse(value);
return true;
}
catch
{
return false;
}
}
}

/// <summary>
/// Registry for tracking multiple HLC nodes in a cluster.
/// Useful for testing and simulation of distributed systems.
/// </summary>
public sealed class HlcClusterRegistry
{
private readonly ConcurrentDictionary<ushort, HlcGuidFactory> _nodes = new();
private readonly TimeProvider _sharedTimeProvider;

/// <summary>
/// Creates a registry for tracking multiple HLC nodes that share a single <see cref="TimeProvider"/>.
/// </summary>
public HlcClusterRegistry(TimeProvider timeProvider)
{
_sharedTimeProvider = timeProvider;
}

/// <summary>
/// Register a node in the cluster.
/// </summary>
public HlcGuidFactory RegisterNode(ushort nodeId, HlcOptions? options = null)
{
return _nodes.GetOrAdd(nodeId, id => new HlcGuidFactory(_sharedTimeProvider, id, options));
}

/// <summary>
/// Get a registered node.
/// </summary>
public HlcGuidFactory? GetNode(ushort nodeId)
{
return _nodes.TryGetValue(nodeId, out var factory) ? factory : null;
}

/// <summary>
/// Simulate sending a message from one node to another.
/// Updates receiver's clock based on sender's timestamp.
/// </summary>
public void SimulateMessage(ushort senderId, ushort receiverId)
{
if (!_nodes.TryGetValue(senderId, out var sender))
throw new ArgumentException($"Sender node {senderId} not registered");
if (!_nodes.TryGetValue(receiverId, out var receiver))
throw new ArgumentException($"Receiver node {receiverId} not registered");

var (_, senderTimestamp) = sender.NewGuidWithHlc();
receiver.Witness(senderTimestamp);
}

/// <summary>
/// Get all registered nodes.
/// </summary>
public IEnumerable<(ushort NodeId, HlcGuidFactory Factory)> GetAllNodes()
{
return _nodes.Select(kvp => (kvp.Key, kvp.Value));
}

/// <summary>
/// Get the maximum logical time across all nodes.
/// Useful for determining global ordering.
/// </summary>
public long GetMaxLogicalTime()
{
return _nodes.Values.Max(f => f.CurrentTimestamp.WallTimeMs);
}
}
Loading