diff --git a/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs b/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs
index 36b705c..5276021 100644
--- a/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs
+++ b/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs
@@ -27,4 +27,27 @@ public sealed class LlmGatewayOptions
/// Expensive judgment calls; lower cap.
///
public int HighMaxConcurrent { get; set; } = 2;
+
+ ///
+ /// Maximum number of additional callers that may be queued waiting for a
+ /// slot beyond .
+ /// Once the total of in-flight + queued callers exceeds
+ /// LowMaxConcurrent + LowMaxPending, new calls fail fast with
+ /// instead of waiting indefinitely.
+ /// This is a backstop against runaway loops or saturation cascades, not a
+ /// normal-operations limit; defaults are intentionally generous.
+ ///
+ public int LowMaxPending { get; set; } = 64;
+
+ ///
+ /// Maximum queued callers beyond on the
+ /// tier. See .
+ ///
+ public int BalancedMaxPending { get; set; } = 32;
+
+ ///
+ /// Maximum queued callers beyond on the
+ /// tier. See .
+ ///
+ public int HighMaxPending { get; set; } = 16;
}
diff --git a/src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs b/src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs
new file mode 100644
index 0000000..4e343b4
--- /dev/null
+++ b/src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs
@@ -0,0 +1,26 @@
+namespace RockBot.Host;
+
+///
+/// Thrown by LlmGateway when a tier has reached its bounded queue depth
+/// (MaxConcurrent + MaxPending) and a new caller arrives. The caller
+/// decides whether to skip the work, defer it, or surface the failure upstream;
+/// the gateway does not block indefinitely under saturation.
+///
+public sealed class LlmGatewaySaturatedException : Exception
+{
+ /// The tier that was saturated.
+ public ModelTier Tier { get; }
+
+ ///
+ /// The configured cap (MaxConcurrent + MaxPending) that was exceeded.
+ ///
+ public int CapacityCap { get; }
+
+ public LlmGatewaySaturatedException(ModelTier tier, int capacityCap)
+ : base($"LLM gateway saturated on tier {tier}: " +
+ $"in-flight + queued callers exceeded the cap of {capacityCap}.")
+ {
+ Tier = tier;
+ CapacityCap = capacityCap;
+ }
+}
diff --git a/src/RockBot.Host/HostDiagnostics.cs b/src/RockBot.Host/HostDiagnostics.cs
index 9b4f81d..f1e9d47 100644
--- a/src/RockBot.Host/HostDiagnostics.cs
+++ b/src/RockBot.Host/HostDiagnostics.cs
@@ -58,6 +58,19 @@ public static class HostDiagnostics
unit: "ms",
description: "Time spent waiting for a per-tier LLM gateway slot");
+ ///
+ /// Number of LLM calls that were rejected immediately because the per-tier
+ /// gateway queue had reached its bounded depth
+ /// (MaxConcurrent + MaxPending). Tagged by tier. A non-zero rate is
+ /// a strong signal of either a runaway loop submitting work or sustained
+ /// upstream rate limiting; investigate before raising the caps.
+ ///
+ public static readonly Counter LlmGatewaySaturationRejections =
+ Meter.CreateCounter(
+ "rockbot.llm.gateway.saturation_rejections",
+ unit: "{rejection}",
+ description: "LLM calls rejected because the gateway queue was full");
+
// ── Agent turn metrics — recorded at architectural boundaries ─────────────
/// Duration from user message receipt to final reply published.
diff --git a/src/RockBot.Host/LlmGateway.cs b/src/RockBot.Host/LlmGateway.cs
index 595db07..7aee674 100644
--- a/src/RockBot.Host/LlmGateway.cs
+++ b/src/RockBot.Host/LlmGateway.cs
@@ -38,26 +38,37 @@ public LlmGateway(IOptions options, ILogger logge
_slots = new TierSlot[tierValues.Length];
foreach (var tier in tierValues)
{
- var cap = tier switch
+ var (concurrent, pending) = tier switch
{
- ModelTier.Low => opts.LowMaxConcurrent,
- ModelTier.High => opts.HighMaxConcurrent,
- _ => opts.BalancedMaxConcurrent,
+ ModelTier.Low => (opts.LowMaxConcurrent, opts.LowMaxPending),
+ ModelTier.High => (opts.HighMaxConcurrent, opts.HighMaxPending),
+ _ => (opts.BalancedMaxConcurrent, opts.BalancedMaxPending),
};
- if (cap < 1)
+ if (concurrent < 1)
throw new ArgumentOutOfRangeException(
nameof(options),
- $"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {cap}).");
+ $"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {concurrent}).");
- _slots[(int)tier] = new TierSlot(cap);
+ if (pending < 0)
+ throw new ArgumentOutOfRangeException(
+ nameof(options),
+ $"LlmGatewayOptions {tier}MaxPending must be >= 0 (was {pending}).");
+
+ _slots[(int)tier] = new TierSlot(concurrent, pending);
}
_logger = logger;
_logger.LogInformation(
- "LlmGateway: per-tier concurrency caps Low={Low} Balanced={Balanced} High={High}",
- opts.LowMaxConcurrent, opts.BalancedMaxConcurrent, opts.HighMaxConcurrent);
+ "LlmGateway: per-tier caps " +
+ "Low={LowConcurrent}+{LowPending} " +
+ "Balanced={BalancedConcurrent}+{BalancedPending} " +
+ "High={HighConcurrent}+{HighPending} " +
+ "(MaxConcurrent + MaxPending)",
+ opts.LowMaxConcurrent, opts.LowMaxPending,
+ opts.BalancedMaxConcurrent, opts.BalancedMaxPending,
+ opts.HighMaxConcurrent, opts.HighMaxPending);
}
///
@@ -82,29 +93,50 @@ public async Task ExecuteAsync(
var slot = _slots[(int)tier];
var tierTag = new KeyValuePair("rockbot.llm.tier", tier.ToString());
- var slotWaitSw = Stopwatch.StartNew();
- Interlocked.Increment(ref slot.Pending);
- try
- {
- await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
- }
- finally
+ // Bounded queue: atomically reserve a "ticket" (counted against in-flight + queued).
+ // If the tier is at its cap, fail fast rather than queuing indefinitely.
+ var capacityCap = slot.MaxConcurrent + slot.MaxPending;
+ var active = Interlocked.Increment(ref slot.Active);
+ if (active > capacityCap)
{
- Interlocked.Decrement(ref slot.Pending);
- slotWaitSw.Stop();
- HostDiagnostics.LlmGatewaySlotWaitDuration.Record(
- slotWaitSw.Elapsed.TotalMilliseconds, tierTag);
+ Interlocked.Decrement(ref slot.Active);
+ HostDiagnostics.LlmGatewaySaturationRejections.Add(1, tierTag);
+ _logger.LogWarning(
+ "LlmGateway: tier {Tier} saturated (active={Active} > cap={Cap}); rejecting call",
+ tier, active, capacityCap);
+ throw new LlmGatewaySaturatedException(tier, capacityCap);
}
- Interlocked.Increment(ref slot.InFlight);
try
{
- return await operation(cancellationToken).ConfigureAwait(false);
+ var slotWaitSw = Stopwatch.StartNew();
+ Interlocked.Increment(ref slot.Pending);
+ try
+ {
+ await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ Interlocked.Decrement(ref slot.Pending);
+ slotWaitSw.Stop();
+ HostDiagnostics.LlmGatewaySlotWaitDuration.Record(
+ slotWaitSw.Elapsed.TotalMilliseconds, tierTag);
+ }
+
+ Interlocked.Increment(ref slot.InFlight);
+ try
+ {
+ return await operation(cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ Interlocked.Decrement(ref slot.InFlight);
+ slot.Semaphore.Release();
+ }
}
finally
{
- Interlocked.Decrement(ref slot.InFlight);
- slot.Semaphore.Release();
+ Interlocked.Decrement(ref slot.Active);
}
}
@@ -117,12 +149,23 @@ public void Dispose()
private sealed class TierSlot
{
public readonly SemaphoreSlim Semaphore;
+ public readonly int MaxConcurrent;
+ public readonly int MaxPending;
+
+ /// Callers waiting on the semaphore (have not yet acquired a slot).
public int Pending;
+
+ /// Callers currently running their operation.
public int InFlight;
- public TierSlot(int maxConcurrent)
+ /// Total callers active on this tier (Pending + InFlight). Used for the bounded-queue cap.
+ public int Active;
+
+ public TierSlot(int maxConcurrent, int maxPending)
{
Semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
+ MaxConcurrent = maxConcurrent;
+ MaxPending = maxPending;
}
}
}
diff --git a/tests/RockBot.Host.Tests/LlmGatewayTests.cs b/tests/RockBot.Host.Tests/LlmGatewayTests.cs
index 1c1fe9d..9620288 100644
--- a/tests/RockBot.Host.Tests/LlmGatewayTests.cs
+++ b/tests/RockBot.Host.Tests/LlmGatewayTests.cs
@@ -6,13 +6,22 @@ namespace RockBot.Host.Tests;
[TestClass]
public class LlmGatewayTests
{
- private static LlmGateway CreateGateway(int low = 2, int balanced = 2, int high = 2)
+ private static LlmGateway CreateGateway(
+ int low = 2,
+ int balanced = 2,
+ int high = 2,
+ int lowMaxPending = 64,
+ int balancedMaxPending = 64,
+ int highMaxPending = 64)
{
var options = Options.Create(new LlmGatewayOptions
{
LowMaxConcurrent = low,
BalancedMaxConcurrent = balanced,
HighMaxConcurrent = high,
+ LowMaxPending = lowMaxPending,
+ BalancedMaxPending = balancedMaxPending,
+ HighMaxPending = highMaxPending,
});
return new LlmGateway(options, NullLogger.Instance);
}
@@ -222,6 +231,143 @@ public void Constructor_CapBelowOne_Throws()
new LlmGateway(bad, NullLogger.Instance));
}
+ [TestMethod]
+ public void Constructor_NegativeMaxPending_Throws()
+ {
+ var bad = Options.Create(new LlmGatewayOptions { LowMaxPending = -1 });
+ Assert.ThrowsExactly(() =>
+ new LlmGateway(bad, NullLogger.Instance));
+ }
+
+ [TestMethod]
+ public async Task ExecuteAsync_AtSaturation_ThrowsLlmGatewaySaturatedException()
+ {
+ // Cap = MaxConcurrent (1) + MaxPending (1) = 2 callers max.
+ // Hold both with gates; the 3rd should fail fast.
+ using var gateway = CreateGateway(low: 1, lowMaxPending: 1);
+ var holdGate1 = new TaskCompletionSource();
+ var holdGate2 = new TaskCompletionSource();
+
+ // Caller 1: takes the slot
+ var caller1 = gateway.ExecuteAsync(ModelTier.Low, async ct =>
+ {
+ await holdGate1.Task;
+ return 0;
+ }, CancellationToken.None);
+
+ await WaitUntilAsync(
+ () => gateway.GetInFlightCount(ModelTier.Low) == 1,
+ TimeSpan.FromSeconds(5));
+
+ // Caller 2: queued (slot occupied, but cap allows 1 pending)
+ var caller2 = gateway.ExecuteAsync(ModelTier.Low, async ct =>
+ {
+ await holdGate2.Task;
+ return 0;
+ }, CancellationToken.None);
+
+ await WaitUntilAsync(
+ () => gateway.GetPendingCount(ModelTier.Low) == 1,
+ TimeSpan.FromSeconds(5));
+
+ // Caller 3: must fail fast
+ var ex = await Assert.ThrowsExactlyAsync(async () =>
+ await gateway.ExecuteAsync(ModelTier.Low, ct => Task.FromResult(0), CancellationToken.None));
+
+ Assert.AreEqual(ModelTier.Low, ex.Tier);
+ Assert.AreEqual(2, ex.CapacityCap, "Cap should be MaxConcurrent + MaxPending");
+
+ // Drain so callers 1 and 2 finish; this also confirms Active was decremented
+ // correctly so the next call can proceed.
+ holdGate1.SetResult();
+ holdGate2.SetResult();
+ await caller1;
+ await caller2;
+
+ // After drain, a new call should be admitted (Active counter unwound cleanly)
+ var followup = await gateway.ExecuteAsync(
+ ModelTier.Low,
+ ct => Task.FromResult(99),
+ CancellationToken.None);
+ Assert.AreEqual(99, followup);
+ }
+
+ [TestMethod]
+ public async Task ExecuteAsync_RejectionDoesNotConsumeCapacity()
+ {
+ // Cap = 1 + 0 = 1. After a reject, we should still be able to make a call.
+ using var gateway = CreateGateway(low: 1, lowMaxPending: 0);
+ var gate = new TaskCompletionSource();
+
+ var holder = gateway.ExecuteAsync(ModelTier.Low, async ct =>
+ {
+ await gate.Task;
+ return 0;
+ }, CancellationToken.None);
+
+ await WaitUntilAsync(
+ () => gateway.GetInFlightCount(ModelTier.Low) == 1,
+ TimeSpan.FromSeconds(5));
+
+ // Multiple rejections in a row — each must fully release the ticket.
+ for (int i = 0; i < 5; i++)
+ {
+ await Assert.ThrowsExactlyAsync(async () =>
+ await gateway.ExecuteAsync(ModelTier.Low,
+ ct => Task.FromResult(0),
+ CancellationToken.None));
+ }
+
+ // Holder is still in-flight; Active should still be 1, not 6 from leaked tickets.
+ Assert.AreEqual(1, gateway.GetInFlightCount(ModelTier.Low));
+
+ gate.SetResult();
+ await holder;
+
+ // After holder finishes, Active is 0; new call accepted.
+ var follow = await gateway.ExecuteAsync(
+ ModelTier.Low,
+ ct => Task.FromResult(7),
+ CancellationToken.None);
+ Assert.AreEqual(7, follow);
+ }
+
+ [TestMethod]
+ public async Task ExecuteAsync_SaturationIsPerTier()
+ {
+ // Saturate Low; Balanced and High should still accept calls.
+ using var gateway = CreateGateway(
+ low: 1, balanced: 1, high: 1,
+ lowMaxPending: 0, balancedMaxPending: 0, highMaxPending: 0);
+
+ var lowGate = new TaskCompletionSource();
+ var lowHolder = gateway.ExecuteAsync(ModelTier.Low, async ct =>
+ {
+ await lowGate.Task;
+ return 0;
+ }, CancellationToken.None);
+
+ await WaitUntilAsync(
+ () => gateway.GetInFlightCount(ModelTier.Low) == 1,
+ TimeSpan.FromSeconds(5));
+
+ // Low is saturated (cap=1, holder consumes it)
+ await Assert.ThrowsExactlyAsync(async () =>
+ await gateway.ExecuteAsync(ModelTier.Low, ct => Task.FromResult(0), CancellationToken.None));
+
+ // Balanced and High remain available
+ var balancedResult = await gateway.ExecuteAsync(
+ ModelTier.Balanced, ct => Task.FromResult(1), CancellationToken.None);
+ var highResult = await gateway.ExecuteAsync(
+ ModelTier.High, ct => Task.FromResult(2), CancellationToken.None);
+
+ Assert.AreEqual(1, balancedResult);
+ Assert.AreEqual(2, highResult);
+
+ lowGate.SetResult();
+ await lowHolder;
+ }
+
private static async Task WaitUntilAsync(Func predicate, TimeSpan timeout)
{
var sw = System.Diagnostics.Stopwatch.StartNew();