From 9482f7e3b03df3ada0f3a353f3b55f50a301c914 Mon Sep 17 00:00:00 2001 From: Rockford Lhotka Date: Thu, 7 May 2026 20:25:50 -0500 Subject: [PATCH] LLM gateway: bounded queue with fail-fast (phase 3 of #352) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a per-tier capacity cap on the gateway: total in-flight + queued callers cannot exceed MaxConcurrent + MaxPending. Beyond that, new calls fail fast with LlmGatewaySaturatedException rather than waiting indefinitely. Backstop against runaway loops or saturation cascades. Phase 3 scope per design/llm-gateway.md: - LlmGatewayOptions adds Low/Balanced/HighMaxPending (defaults 64/32/16 — generous; this is a backstop, not a normal-operations limit) - LlmGatewaySaturatedException (in RockBot.Host.Abstractions so callers can catch without internal-type dependency) carries the saturated Tier and the CapacityCap that was exceeded - LlmGateway tracks a per-tier Active counter (Pending + InFlight); atomically increments on entry, checks vs MaxConcurrent + MaxPending, and rejects with the typed exception if the cap is exceeded. Outer try/finally ensures Active is always decremented (rejection, cancellation, exception, success — all paths) - Constructor validation: MaxPending >= 0 - Startup log line updated to surface both caps per tier ("Low=8+64 Balanced=4+32 High=2+16") - New metric: rockbot.llm.gateway.saturation_rejections (counter, tagged by tier) - 4 new tests: at-saturation throws typed exception with correct metadata; rejections do not consume capacity (5 rejects in a row, Active still 1); saturation is per-tier (Low full, Balanced/High unaffected); negative MaxPending rejected at construction All 12 gateway tests pass; all 628 Host tests pass; full solution test run clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../LlmGatewayOptions.cs | 23 +++ .../LlmGatewaySaturatedException.cs | 26 +++ src/RockBot.Host/HostDiagnostics.cs | 13 ++ src/RockBot.Host/LlmGateway.cs | 93 ++++++++--- tests/RockBot.Host.Tests/LlmGatewayTests.cs | 148 +++++++++++++++++- 5 files changed, 277 insertions(+), 26 deletions(-) create mode 100644 src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs diff --git a/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs b/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs index 36b705c..5276021 100644 --- a/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs +++ b/src/RockBot.Host.Abstractions/LlmGatewayOptions.cs @@ -27,4 +27,27 @@ public sealed class LlmGatewayOptions /// Expensive judgment calls; lower cap. /// public int HighMaxConcurrent { get; set; } = 2; + + /// + /// Maximum number of additional callers that may be queued waiting for a + /// slot beyond . + /// Once the total of in-flight + queued callers exceeds + /// LowMaxConcurrent + LowMaxPending, new calls fail fast with + /// instead of waiting indefinitely. + /// This is a backstop against runaway loops or saturation cascades, not a + /// normal-operations limit; defaults are intentionally generous. + /// + public int LowMaxPending { get; set; } = 64; + + /// + /// Maximum queued callers beyond on the + /// tier. See . + /// + public int BalancedMaxPending { get; set; } = 32; + + /// + /// Maximum queued callers beyond on the + /// tier. See . + /// + public int HighMaxPending { get; set; } = 16; } diff --git a/src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs b/src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs new file mode 100644 index 0000000..4e343b4 --- /dev/null +++ b/src/RockBot.Host.Abstractions/LlmGatewaySaturatedException.cs @@ -0,0 +1,26 @@ +namespace RockBot.Host; + +/// +/// Thrown by LlmGateway when a tier has reached its bounded queue depth +/// (MaxConcurrent + MaxPending) and a new caller arrives. The caller +/// decides whether to skip the work, defer it, or surface the failure upstream; +/// the gateway does not block indefinitely under saturation. +/// +public sealed class LlmGatewaySaturatedException : Exception +{ + /// The tier that was saturated. + public ModelTier Tier { get; } + + /// + /// The configured cap (MaxConcurrent + MaxPending) that was exceeded. + /// + public int CapacityCap { get; } + + public LlmGatewaySaturatedException(ModelTier tier, int capacityCap) + : base($"LLM gateway saturated on tier {tier}: " + + $"in-flight + queued callers exceeded the cap of {capacityCap}.") + { + Tier = tier; + CapacityCap = capacityCap; + } +} diff --git a/src/RockBot.Host/HostDiagnostics.cs b/src/RockBot.Host/HostDiagnostics.cs index 9b4f81d..f1e9d47 100644 --- a/src/RockBot.Host/HostDiagnostics.cs +++ b/src/RockBot.Host/HostDiagnostics.cs @@ -58,6 +58,19 @@ public static class HostDiagnostics unit: "ms", description: "Time spent waiting for a per-tier LLM gateway slot"); + /// + /// Number of LLM calls that were rejected immediately because the per-tier + /// gateway queue had reached its bounded depth + /// (MaxConcurrent + MaxPending). Tagged by tier. A non-zero rate is + /// a strong signal of either a runaway loop submitting work or sustained + /// upstream rate limiting; investigate before raising the caps. + /// + public static readonly Counter LlmGatewaySaturationRejections = + Meter.CreateCounter( + "rockbot.llm.gateway.saturation_rejections", + unit: "{rejection}", + description: "LLM calls rejected because the gateway queue was full"); + // ── Agent turn metrics — recorded at architectural boundaries ───────────── /// Duration from user message receipt to final reply published. diff --git a/src/RockBot.Host/LlmGateway.cs b/src/RockBot.Host/LlmGateway.cs index 595db07..7aee674 100644 --- a/src/RockBot.Host/LlmGateway.cs +++ b/src/RockBot.Host/LlmGateway.cs @@ -38,26 +38,37 @@ public LlmGateway(IOptions options, ILogger logge _slots = new TierSlot[tierValues.Length]; foreach (var tier in tierValues) { - var cap = tier switch + var (concurrent, pending) = tier switch { - ModelTier.Low => opts.LowMaxConcurrent, - ModelTier.High => opts.HighMaxConcurrent, - _ => opts.BalancedMaxConcurrent, + ModelTier.Low => (opts.LowMaxConcurrent, opts.LowMaxPending), + ModelTier.High => (opts.HighMaxConcurrent, opts.HighMaxPending), + _ => (opts.BalancedMaxConcurrent, opts.BalancedMaxPending), }; - if (cap < 1) + if (concurrent < 1) throw new ArgumentOutOfRangeException( nameof(options), - $"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {cap})."); + $"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {concurrent})."); - _slots[(int)tier] = new TierSlot(cap); + if (pending < 0) + throw new ArgumentOutOfRangeException( + nameof(options), + $"LlmGatewayOptions {tier}MaxPending must be >= 0 (was {pending})."); + + _slots[(int)tier] = new TierSlot(concurrent, pending); } _logger = logger; _logger.LogInformation( - "LlmGateway: per-tier concurrency caps Low={Low} Balanced={Balanced} High={High}", - opts.LowMaxConcurrent, opts.BalancedMaxConcurrent, opts.HighMaxConcurrent); + "LlmGateway: per-tier caps " + + "Low={LowConcurrent}+{LowPending} " + + "Balanced={BalancedConcurrent}+{BalancedPending} " + + "High={HighConcurrent}+{HighPending} " + + "(MaxConcurrent + MaxPending)", + opts.LowMaxConcurrent, opts.LowMaxPending, + opts.BalancedMaxConcurrent, opts.BalancedMaxPending, + opts.HighMaxConcurrent, opts.HighMaxPending); } /// @@ -82,29 +93,50 @@ public async Task ExecuteAsync( var slot = _slots[(int)tier]; var tierTag = new KeyValuePair("rockbot.llm.tier", tier.ToString()); - var slotWaitSw = Stopwatch.StartNew(); - Interlocked.Increment(ref slot.Pending); - try - { - await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - } - finally + // Bounded queue: atomically reserve a "ticket" (counted against in-flight + queued). + // If the tier is at its cap, fail fast rather than queuing indefinitely. + var capacityCap = slot.MaxConcurrent + slot.MaxPending; + var active = Interlocked.Increment(ref slot.Active); + if (active > capacityCap) { - Interlocked.Decrement(ref slot.Pending); - slotWaitSw.Stop(); - HostDiagnostics.LlmGatewaySlotWaitDuration.Record( - slotWaitSw.Elapsed.TotalMilliseconds, tierTag); + Interlocked.Decrement(ref slot.Active); + HostDiagnostics.LlmGatewaySaturationRejections.Add(1, tierTag); + _logger.LogWarning( + "LlmGateway: tier {Tier} saturated (active={Active} > cap={Cap}); rejecting call", + tier, active, capacityCap); + throw new LlmGatewaySaturatedException(tier, capacityCap); } - Interlocked.Increment(ref slot.InFlight); try { - return await operation(cancellationToken).ConfigureAwait(false); + var slotWaitSw = Stopwatch.StartNew(); + Interlocked.Increment(ref slot.Pending); + try + { + await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + Interlocked.Decrement(ref slot.Pending); + slotWaitSw.Stop(); + HostDiagnostics.LlmGatewaySlotWaitDuration.Record( + slotWaitSw.Elapsed.TotalMilliseconds, tierTag); + } + + Interlocked.Increment(ref slot.InFlight); + try + { + return await operation(cancellationToken).ConfigureAwait(false); + } + finally + { + Interlocked.Decrement(ref slot.InFlight); + slot.Semaphore.Release(); + } } finally { - Interlocked.Decrement(ref slot.InFlight); - slot.Semaphore.Release(); + Interlocked.Decrement(ref slot.Active); } } @@ -117,12 +149,23 @@ public void Dispose() private sealed class TierSlot { public readonly SemaphoreSlim Semaphore; + public readonly int MaxConcurrent; + public readonly int MaxPending; + + /// Callers waiting on the semaphore (have not yet acquired a slot). public int Pending; + + /// Callers currently running their operation. public int InFlight; - public TierSlot(int maxConcurrent) + /// Total callers active on this tier (Pending + InFlight). Used for the bounded-queue cap. + public int Active; + + public TierSlot(int maxConcurrent, int maxPending) { Semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent); + MaxConcurrent = maxConcurrent; + MaxPending = maxPending; } } } diff --git a/tests/RockBot.Host.Tests/LlmGatewayTests.cs b/tests/RockBot.Host.Tests/LlmGatewayTests.cs index 1c1fe9d..9620288 100644 --- a/tests/RockBot.Host.Tests/LlmGatewayTests.cs +++ b/tests/RockBot.Host.Tests/LlmGatewayTests.cs @@ -6,13 +6,22 @@ namespace RockBot.Host.Tests; [TestClass] public class LlmGatewayTests { - private static LlmGateway CreateGateway(int low = 2, int balanced = 2, int high = 2) + private static LlmGateway CreateGateway( + int low = 2, + int balanced = 2, + int high = 2, + int lowMaxPending = 64, + int balancedMaxPending = 64, + int highMaxPending = 64) { var options = Options.Create(new LlmGatewayOptions { LowMaxConcurrent = low, BalancedMaxConcurrent = balanced, HighMaxConcurrent = high, + LowMaxPending = lowMaxPending, + BalancedMaxPending = balancedMaxPending, + HighMaxPending = highMaxPending, }); return new LlmGateway(options, NullLogger.Instance); } @@ -222,6 +231,143 @@ public void Constructor_CapBelowOne_Throws() new LlmGateway(bad, NullLogger.Instance)); } + [TestMethod] + public void Constructor_NegativeMaxPending_Throws() + { + var bad = Options.Create(new LlmGatewayOptions { LowMaxPending = -1 }); + Assert.ThrowsExactly(() => + new LlmGateway(bad, NullLogger.Instance)); + } + + [TestMethod] + public async Task ExecuteAsync_AtSaturation_ThrowsLlmGatewaySaturatedException() + { + // Cap = MaxConcurrent (1) + MaxPending (1) = 2 callers max. + // Hold both with gates; the 3rd should fail fast. + using var gateway = CreateGateway(low: 1, lowMaxPending: 1); + var holdGate1 = new TaskCompletionSource(); + var holdGate2 = new TaskCompletionSource(); + + // Caller 1: takes the slot + var caller1 = gateway.ExecuteAsync(ModelTier.Low, async ct => + { + await holdGate1.Task; + return 0; + }, CancellationToken.None); + + await WaitUntilAsync( + () => gateway.GetInFlightCount(ModelTier.Low) == 1, + TimeSpan.FromSeconds(5)); + + // Caller 2: queued (slot occupied, but cap allows 1 pending) + var caller2 = gateway.ExecuteAsync(ModelTier.Low, async ct => + { + await holdGate2.Task; + return 0; + }, CancellationToken.None); + + await WaitUntilAsync( + () => gateway.GetPendingCount(ModelTier.Low) == 1, + TimeSpan.FromSeconds(5)); + + // Caller 3: must fail fast + var ex = await Assert.ThrowsExactlyAsync(async () => + await gateway.ExecuteAsync(ModelTier.Low, ct => Task.FromResult(0), CancellationToken.None)); + + Assert.AreEqual(ModelTier.Low, ex.Tier); + Assert.AreEqual(2, ex.CapacityCap, "Cap should be MaxConcurrent + MaxPending"); + + // Drain so callers 1 and 2 finish; this also confirms Active was decremented + // correctly so the next call can proceed. + holdGate1.SetResult(); + holdGate2.SetResult(); + await caller1; + await caller2; + + // After drain, a new call should be admitted (Active counter unwound cleanly) + var followup = await gateway.ExecuteAsync( + ModelTier.Low, + ct => Task.FromResult(99), + CancellationToken.None); + Assert.AreEqual(99, followup); + } + + [TestMethod] + public async Task ExecuteAsync_RejectionDoesNotConsumeCapacity() + { + // Cap = 1 + 0 = 1. After a reject, we should still be able to make a call. + using var gateway = CreateGateway(low: 1, lowMaxPending: 0); + var gate = new TaskCompletionSource(); + + var holder = gateway.ExecuteAsync(ModelTier.Low, async ct => + { + await gate.Task; + return 0; + }, CancellationToken.None); + + await WaitUntilAsync( + () => gateway.GetInFlightCount(ModelTier.Low) == 1, + TimeSpan.FromSeconds(5)); + + // Multiple rejections in a row — each must fully release the ticket. + for (int i = 0; i < 5; i++) + { + await Assert.ThrowsExactlyAsync(async () => + await gateway.ExecuteAsync(ModelTier.Low, + ct => Task.FromResult(0), + CancellationToken.None)); + } + + // Holder is still in-flight; Active should still be 1, not 6 from leaked tickets. + Assert.AreEqual(1, gateway.GetInFlightCount(ModelTier.Low)); + + gate.SetResult(); + await holder; + + // After holder finishes, Active is 0; new call accepted. + var follow = await gateway.ExecuteAsync( + ModelTier.Low, + ct => Task.FromResult(7), + CancellationToken.None); + Assert.AreEqual(7, follow); + } + + [TestMethod] + public async Task ExecuteAsync_SaturationIsPerTier() + { + // Saturate Low; Balanced and High should still accept calls. + using var gateway = CreateGateway( + low: 1, balanced: 1, high: 1, + lowMaxPending: 0, balancedMaxPending: 0, highMaxPending: 0); + + var lowGate = new TaskCompletionSource(); + var lowHolder = gateway.ExecuteAsync(ModelTier.Low, async ct => + { + await lowGate.Task; + return 0; + }, CancellationToken.None); + + await WaitUntilAsync( + () => gateway.GetInFlightCount(ModelTier.Low) == 1, + TimeSpan.FromSeconds(5)); + + // Low is saturated (cap=1, holder consumes it) + await Assert.ThrowsExactlyAsync(async () => + await gateway.ExecuteAsync(ModelTier.Low, ct => Task.FromResult(0), CancellationToken.None)); + + // Balanced and High remain available + var balancedResult = await gateway.ExecuteAsync( + ModelTier.Balanced, ct => Task.FromResult(1), CancellationToken.None); + var highResult = await gateway.ExecuteAsync( + ModelTier.High, ct => Task.FromResult(2), CancellationToken.None); + + Assert.AreEqual(1, balancedResult); + Assert.AreEqual(2, highResult); + + lowGate.SetResult(); + await lowHolder; + } + private static async Task WaitUntilAsync(Func predicate, TimeSpan timeout) { var sw = System.Diagnostics.Stopwatch.StartNew();