Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/RockBot.Host.Abstractions/LlmGatewayOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,27 @@ public sealed class LlmGatewayOptions
/// Expensive judgment calls; lower cap.
/// </summary>
public int HighMaxConcurrent { get; set; } = 2;

/// <summary>
/// Maximum number of additional callers that may be queued waiting for a
/// <see cref="ModelTier.Low"/> slot beyond <see cref="LowMaxConcurrent"/>.
/// Once the total of in-flight + queued callers exceeds
/// <c>LowMaxConcurrent + LowMaxPending</c>, new calls fail fast with
/// <see cref="LlmGatewaySaturatedException"/> instead of waiting indefinitely.
/// This is a backstop against runaway loops or saturation cascades, not a
/// normal-operations limit; defaults are intentionally generous.
/// </summary>
public int LowMaxPending { get; set; } = 64;

/// <summary>
/// Maximum queued callers beyond <see cref="BalancedMaxConcurrent"/> on the
/// <see cref="ModelTier.Balanced"/> tier. See <see cref="LowMaxPending"/>.
/// </summary>
public int BalancedMaxPending { get; set; } = 32;

/// <summary>
/// Maximum queued callers beyond <see cref="HighMaxConcurrent"/> on the
/// <see cref="ModelTier.High"/> tier. See <see cref="LowMaxPending"/>.
/// </summary>
public int HighMaxPending { get; set; } = 16;
}
26 changes: 26 additions & 0 deletions src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace RockBot.Host;

/// <summary>
/// Thrown by <c>LlmGateway</c> when a tier has reached its bounded queue depth
/// (<c>MaxConcurrent + MaxPending</c>) and a new caller arrives. The caller
/// decides whether to skip the work, defer it, or surface the failure upstream;
/// the gateway does not block indefinitely under saturation.
/// </summary>
public sealed class LlmGatewaySaturatedException : Exception
{
/// <summary>The tier that was saturated.</summary>
public ModelTier Tier { get; }

/// <summary>
/// The configured cap (<c>MaxConcurrent + MaxPending</c>) that was exceeded.
/// </summary>
public int CapacityCap { get; }

public LlmGatewaySaturatedException(ModelTier tier, int capacityCap)
: base($"LLM gateway saturated on tier {tier}: " +
$"in-flight + queued callers exceeded the cap of {capacityCap}.")
{
Tier = tier;
CapacityCap = capacityCap;
}
}
13 changes: 13 additions & 0 deletions src/RockBot.Host/HostDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ public static class HostDiagnostics
unit: "ms",
description: "Time spent waiting for a per-tier LLM gateway slot");

/// <summary>
/// Number of LLM calls that were rejected immediately because the per-tier
/// gateway queue had reached its bounded depth
/// (<c>MaxConcurrent + MaxPending</c>). Tagged by tier. A non-zero rate is
/// a strong signal of either a runaway loop submitting work or sustained
/// upstream rate limiting; investigate before raising the caps.
/// </summary>
public static readonly Counter<long> LlmGatewaySaturationRejections =
Meter.CreateCounter<long>(
"rockbot.llm.gateway.saturation_rejections",
unit: "{rejection}",
description: "LLM calls rejected because the gateway queue was full");

// ── Agent turn metrics — recorded at architectural boundaries ─────────────

/// <summary>Duration from user message receipt to final reply published.</summary>
Expand Down
93 changes: 68 additions & 25 deletions src/RockBot.Host/LlmGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,37 @@ public LlmGateway(IOptions<LlmGatewayOptions> options, ILogger<LlmGateway> logge
_slots = new TierSlot[tierValues.Length];
foreach (var tier in tierValues)
{
var cap = tier switch
var (concurrent, pending) = tier switch
{
ModelTier.Low => opts.LowMaxConcurrent,
ModelTier.High => opts.HighMaxConcurrent,
_ => opts.BalancedMaxConcurrent,
ModelTier.Low => (opts.LowMaxConcurrent, opts.LowMaxPending),
ModelTier.High => (opts.HighMaxConcurrent, opts.HighMaxPending),
_ => (opts.BalancedMaxConcurrent, opts.BalancedMaxPending),
};

if (cap < 1)
if (concurrent < 1)
throw new ArgumentOutOfRangeException(
nameof(options),
$"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {cap}).");
$"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {concurrent}).");

_slots[(int)tier] = new TierSlot(cap);
if (pending < 0)
throw new ArgumentOutOfRangeException(
nameof(options),
$"LlmGatewayOptions {tier}MaxPending must be >= 0 (was {pending}).");

_slots[(int)tier] = new TierSlot(concurrent, pending);
}

_logger = logger;

_logger.LogInformation(
"LlmGateway: per-tier concurrency caps Low={Low} Balanced={Balanced} High={High}",
opts.LowMaxConcurrent, opts.BalancedMaxConcurrent, opts.HighMaxConcurrent);
"LlmGateway: per-tier caps " +
"Low={LowConcurrent}+{LowPending} " +
"Balanced={BalancedConcurrent}+{BalancedPending} " +
"High={HighConcurrent}+{HighPending} " +
"(MaxConcurrent + MaxPending)",
opts.LowMaxConcurrent, opts.LowMaxPending,
opts.BalancedMaxConcurrent, opts.BalancedMaxPending,
opts.HighMaxConcurrent, opts.HighMaxPending);
}

/// <summary>
Expand All @@ -82,29 +93,50 @@ public async Task<T> ExecuteAsync<T>(
var slot = _slots[(int)tier];
var tierTag = new KeyValuePair<string, object?>("rockbot.llm.tier", tier.ToString());

var slotWaitSw = Stopwatch.StartNew();
Interlocked.Increment(ref slot.Pending);
try
{
await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
finally
// Bounded queue: atomically reserve a "ticket" (counted against in-flight + queued).
// If the tier is at its cap, fail fast rather than queuing indefinitely.
var capacityCap = slot.MaxConcurrent + slot.MaxPending;
var active = Interlocked.Increment(ref slot.Active);
if (active > capacityCap)
{
Interlocked.Decrement(ref slot.Pending);
slotWaitSw.Stop();
HostDiagnostics.LlmGatewaySlotWaitDuration.Record(
slotWaitSw.Elapsed.TotalMilliseconds, tierTag);
Interlocked.Decrement(ref slot.Active);
HostDiagnostics.LlmGatewaySaturationRejections.Add(1, tierTag);
_logger.LogWarning(
"LlmGateway: tier {Tier} saturated (active={Active} > cap={Cap}); rejecting call",
tier, active, capacityCap);
throw new LlmGatewaySaturatedException(tier, capacityCap);
}

Interlocked.Increment(ref slot.InFlight);
try
{
return await operation(cancellationToken).ConfigureAwait(false);
var slotWaitSw = Stopwatch.StartNew();
Interlocked.Increment(ref slot.Pending);
try
{
await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
Interlocked.Decrement(ref slot.Pending);
slotWaitSw.Stop();
HostDiagnostics.LlmGatewaySlotWaitDuration.Record(
slotWaitSw.Elapsed.TotalMilliseconds, tierTag);
}

Interlocked.Increment(ref slot.InFlight);
try
{
return await operation(cancellationToken).ConfigureAwait(false);
}
finally
{
Interlocked.Decrement(ref slot.InFlight);
slot.Semaphore.Release();
}
}
finally
{
Interlocked.Decrement(ref slot.InFlight);
slot.Semaphore.Release();
Interlocked.Decrement(ref slot.Active);
}
}

Expand All @@ -117,12 +149,23 @@ public void Dispose()
private sealed class TierSlot
{
public readonly SemaphoreSlim Semaphore;
public readonly int MaxConcurrent;
public readonly int MaxPending;

/// <summary>Callers waiting on the semaphore (have not yet acquired a slot).</summary>
public int Pending;

/// <summary>Callers currently running their operation.</summary>
public int InFlight;

public TierSlot(int maxConcurrent)
/// <summary>Total callers active on this tier (Pending + InFlight). Used for the bounded-queue cap.</summary>
public int Active;

public TierSlot(int maxConcurrent, int maxPending)
{
Semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
MaxConcurrent = maxConcurrent;
MaxPending = maxPending;
}
}
}
148 changes: 147 additions & 1 deletion tests/RockBot.Host.Tests/LlmGatewayTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,22 @@ namespace RockBot.Host.Tests;
[TestClass]
public class LlmGatewayTests
{
private static LlmGateway CreateGateway(int low = 2, int balanced = 2, int high = 2)
private static LlmGateway CreateGateway(
int low = 2,
int balanced = 2,
int high = 2,
int lowMaxPending = 64,
int balancedMaxPending = 64,
int highMaxPending = 64)
{
var options = Options.Create(new LlmGatewayOptions
{
LowMaxConcurrent = low,
BalancedMaxConcurrent = balanced,
HighMaxConcurrent = high,
LowMaxPending = lowMaxPending,
BalancedMaxPending = balancedMaxPending,
HighMaxPending = highMaxPending,
});
return new LlmGateway(options, NullLogger<LlmGateway>.Instance);
}
Expand Down Expand Up @@ -222,6 +231,143 @@ public void Constructor_CapBelowOne_Throws()
new LlmGateway(bad, NullLogger<LlmGateway>.Instance));
}

[TestMethod]
public void Constructor_NegativeMaxPending_Throws()
{
var bad = Options.Create(new LlmGatewayOptions { LowMaxPending = -1 });
Assert.ThrowsExactly<ArgumentOutOfRangeException>(() =>
new LlmGateway(bad, NullLogger<LlmGateway>.Instance));
}

[TestMethod]
public async Task ExecuteAsync_AtSaturation_ThrowsLlmGatewaySaturatedException()
{
// Cap = MaxConcurrent (1) + MaxPending (1) = 2 callers max.
// Hold both with gates; the 3rd should fail fast.
using var gateway = CreateGateway(low: 1, lowMaxPending: 1);
var holdGate1 = new TaskCompletionSource();
var holdGate2 = new TaskCompletionSource();

// Caller 1: takes the slot
var caller1 = gateway.ExecuteAsync(ModelTier.Low, async ct =>
{
await holdGate1.Task;
return 0;
}, CancellationToken.None);

await WaitUntilAsync(
() => gateway.GetInFlightCount(ModelTier.Low) == 1,
TimeSpan.FromSeconds(5));

// Caller 2: queued (slot occupied, but cap allows 1 pending)
var caller2 = gateway.ExecuteAsync(ModelTier.Low, async ct =>
{
await holdGate2.Task;
return 0;
}, CancellationToken.None);

await WaitUntilAsync(
() => gateway.GetPendingCount(ModelTier.Low) == 1,
TimeSpan.FromSeconds(5));

// Caller 3: must fail fast
var ex = await Assert.ThrowsExactlyAsync<LlmGatewaySaturatedException>(async () =>
await gateway.ExecuteAsync(ModelTier.Low, ct => Task.FromResult(0), CancellationToken.None));

Assert.AreEqual(ModelTier.Low, ex.Tier);
Assert.AreEqual(2, ex.CapacityCap, "Cap should be MaxConcurrent + MaxPending");

// Drain so callers 1 and 2 finish; this also confirms Active was decremented
// correctly so the next call can proceed.
holdGate1.SetResult();
holdGate2.SetResult();
await caller1;
await caller2;

// After drain, a new call should be admitted (Active counter unwound cleanly)
var followup = await gateway.ExecuteAsync(
ModelTier.Low,
ct => Task.FromResult(99),
CancellationToken.None);
Assert.AreEqual(99, followup);
}

[TestMethod]
public async Task ExecuteAsync_RejectionDoesNotConsumeCapacity()
{
// Cap = 1 + 0 = 1. After a reject, we should still be able to make a call.
using var gateway = CreateGateway(low: 1, lowMaxPending: 0);
var gate = new TaskCompletionSource();

var holder = gateway.ExecuteAsync(ModelTier.Low, async ct =>
{
await gate.Task;
return 0;
}, CancellationToken.None);

await WaitUntilAsync(
() => gateway.GetInFlightCount(ModelTier.Low) == 1,
TimeSpan.FromSeconds(5));

// Multiple rejections in a row — each must fully release the ticket.
for (int i = 0; i < 5; i++)
{
await Assert.ThrowsExactlyAsync<LlmGatewaySaturatedException>(async () =>
await gateway.ExecuteAsync(ModelTier.Low,
ct => Task.FromResult(0),
CancellationToken.None));
}

// Holder is still in-flight; Active should still be 1, not 6 from leaked tickets.
Assert.AreEqual(1, gateway.GetInFlightCount(ModelTier.Low));

gate.SetResult();
await holder;

// After holder finishes, Active is 0; new call accepted.
var follow = await gateway.ExecuteAsync(
ModelTier.Low,
ct => Task.FromResult(7),
CancellationToken.None);
Assert.AreEqual(7, follow);
}

[TestMethod]
public async Task ExecuteAsync_SaturationIsPerTier()
{
// Saturate Low; Balanced and High should still accept calls.
using var gateway = CreateGateway(
low: 1, balanced: 1, high: 1,
lowMaxPending: 0, balancedMaxPending: 0, highMaxPending: 0);

var lowGate = new TaskCompletionSource();
var lowHolder = gateway.ExecuteAsync(ModelTier.Low, async ct =>
{
await lowGate.Task;
return 0;
}, CancellationToken.None);

await WaitUntilAsync(
() => gateway.GetInFlightCount(ModelTier.Low) == 1,
TimeSpan.FromSeconds(5));

// Low is saturated (cap=1, holder consumes it)
await Assert.ThrowsExactlyAsync<LlmGatewaySaturatedException>(async () =>
await gateway.ExecuteAsync(ModelTier.Low, ct => Task.FromResult(0), CancellationToken.None));

// Balanced and High remain available
var balancedResult = await gateway.ExecuteAsync(
ModelTier.Balanced, ct => Task.FromResult(1), CancellationToken.None);
var highResult = await gateway.ExecuteAsync(
ModelTier.High, ct => Task.FromResult(2), CancellationToken.None);

Assert.AreEqual(1, balancedResult);
Assert.AreEqual(2, highResult);

lowGate.SetResult();
await lowHolder;
}

private static async Task WaitUntilAsync(Func<bool> predicate, TimeSpan timeout)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
Expand Down
Loading