Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/RockBot.Host.Abstractions/ILlmClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ namespace RockBot.Host;

/// <summary>
/// Wrapper around <see cref="IChatClient"/> for all LLM calls in an agent process.
/// Adds retry logic for known model-specific SDK quirks. Registered as transient
/// so concurrent callers (user loop, background tasks, dreaming, session evaluation)
/// each get their own instance and never queue behind each other.
/// Adds retry logic for known model-specific SDK quirks and routes every call through
/// the per-tier <c>LlmGateway</c> which caps concurrency and propagates cancellation.
///
/// Registered as transient so each consumer gets its own instance, but the gateway
/// is a singleton so all consumers share the per-tier concurrency budget.
///
/// To avoid starting background LLM work while the user is actively waiting
/// for a response, use <see cref="IUserActivityMonitor"/> instead of this interface.
Expand Down
30 changes: 30 additions & 0 deletions src/RockBot.Host.Abstractions/LlmGatewayOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace RockBot.Host;

/// <summary>
/// Options for <c>LlmGateway</c>: the global per-tier concurrency layer that all LLM
/// calls flow through. See <c>design/llm-gateway.md</c> for the full design rationale.
/// </summary>
/// <remarks>
/// Caps are per-process. Across multiple agent processes against the same provider
/// account, total concurrency is the sum. Per-account rate limits ultimately bound the
/// system; the gateway is a per-process governor, not a global one.
/// </remarks>
public sealed class LlmGatewayOptions
{
/// <summary>
/// Maximum concurrent in-flight LLM calls on the <see cref="ModelTier.Low"/> tier.
/// Cheap calls used heavily for batch/extraction work, so a higher cap is appropriate.
/// </summary>
public int LowMaxConcurrent { get; set; } = 8;

/// <summary>
/// Maximum concurrent in-flight LLM calls on the <see cref="ModelTier.Balanced"/> tier.
/// </summary>
public int BalancedMaxConcurrent { get; set; } = 4;

/// <summary>
/// Maximum concurrent in-flight LLM calls on the <see cref="ModelTier.High"/> tier.
/// Expensive judgment calls; lower cap.
/// </summary>
public int HighMaxConcurrent { get; set; } = 2;
}
12 changes: 12 additions & 0 deletions src/RockBot.Host/HostDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ public static class HostDiagnostics
unit: "{token}",
description: "Total number of output tokens produced");

/// <summary>
/// Time a caller spent waiting for a per-tier gateway slot before its LLM call
/// could proceed. Non-zero values indicate contention; sustained high values
/// indicate the tier's <c>MaxConcurrent</c> cap is too low for the workload
/// (or that callers are issuing too many parallel calls).
/// </summary>
public static readonly Histogram<double> LlmGatewaySlotWaitDuration =
Meter.CreateHistogram<double>(
"rockbot.llm.gateway.slot_wait.duration",
unit: "ms",
description: "Time spent waiting for a per-tier LLM gateway slot");

// ── Agent turn metrics — recorded at architectural boundaries ─────────────

/// <summary>Duration from user message receipt to final reply published.</summary>
Expand Down
27 changes: 27 additions & 0 deletions src/RockBot.Host/ILlmGateway.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace RockBot.Host;

/// <summary>
/// Global per-tier concurrency layer for LLM calls. All <see cref="ILlmClient"/>
/// invocations flow through an implementation of this gateway so that parallel
/// callers cannot overwhelm a tier and so that cancellation reliably drains
/// pending work. See <c>design/llm-gateway.md</c>.
/// </summary>
internal interface ILlmGateway
{
/// <summary>
/// Acquires a slot on the per-tier concurrency semaphore, then invokes
/// <paramref name="operation"/>. If <paramref name="cancellationToken"/>
/// fires while waiting for a slot, the wait aborts with
/// <see cref="OperationCanceledException"/> before the operation runs.
/// </summary>
/// <remarks>
/// The same <paramref name="cancellationToken"/> is passed to
/// <paramref name="operation"/>. Implementations must propagate cancellation
/// end-to-end; any path that drops the token re-introduces the rate-limit and
/// preemption hazards the gateway exists to prevent.
/// </remarks>
Task<T> ExecuteAsync<T>(
ModelTier tier,
Func<CancellationToken, Task<T>> operation,
CancellationToken cancellationToken);
}
16 changes: 11 additions & 5 deletions src/RockBot.Host/LlmClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace RockBot.Host;
/// <summary>
/// Default implementation of <see cref="ILlmClient"/>.
/// Selects the appropriate <see cref="IChatClient"/> from the
/// <see cref="TieredChatClientRegistry"/> and adds retry logic for
/// known model-specific SDK quirks. Registered as transient so each consumer
/// gets its own instance — concurrent calls from the user loop, background tasks,
/// dreaming, and session evaluation proceed independently without queuing.
/// <see cref="TieredChatClientRegistry"/>, adds retry logic for known
/// model-specific SDK quirks, and routes the call through the singleton
/// <see cref="ILlmGateway"/> which caps per-tier concurrency.
/// </summary>
internal sealed class LlmClient(
TieredChatClientRegistry registry,
ILlmGateway gateway,
LlmCostEstimator costEstimator,
ILogger<LlmClient> logger) : ILlmClient
{
Expand Down Expand Up @@ -67,7 +67,13 @@ private async Task<ChatResponse> CallTierAsync(
// next model. The original cancellation token is passed through so
// FallbackChatClient can correctly distinguish user cancellation from
// provider timeouts.
var response = await InvokeWithNullArgRetryAsync(client, messages, options, cancellationToken);
//
// The gateway gates the actual SDK call on the per-tier concurrency
// semaphore so bursty parallel callers cannot overwhelm a tier.
var response = await gateway.ExecuteAsync(
tier,
ct => InvokeWithNullArgRetryAsync(client, messages, options, ct),
cancellationToken);

if (response.Usage is { } usage)
{
Expand Down
128 changes: 128 additions & 0 deletions src/RockBot.Host/LlmGateway.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace RockBot.Host;

/// <summary>
/// Global per-tier concurrency layer for all LLM calls. Every call to
/// <see cref="ILlmClient"/> flows through here so that bursty parallel work
/// (e.g. observation-framework extraction) cannot overwhelm a tier.
/// </summary>
/// <remarks>
/// <para>
/// Cancellation is the priority mechanism. When a caller's <c>ct</c> fires, the
/// pending wait on the per-tier <see cref="SemaphoreSlim"/> aborts immediately,
/// freeing the slot for other waiters. This is how user-initiated work effectively
/// preempts dream-cycle work without an explicit priority queue: the work-serializer
/// already cancels the dream when a user message arrives, and that cancellation
/// drains the dream's queued LLM calls.
/// </para>
/// <para>
/// Registered as a singleton so all callers share the same per-tier semaphores.
/// </para>
/// <para>
/// See <c>design/llm-gateway.md</c> for the full design.
/// </para>
/// </remarks>
internal sealed class LlmGateway : ILlmGateway, IDisposable
{
private readonly TierSlot[] _slots;
private readonly ILogger<LlmGateway> _logger;

public LlmGateway(IOptions<LlmGatewayOptions> options, ILogger<LlmGateway> logger)
{
var opts = options.Value;

var tierValues = (ModelTier[])Enum.GetValues(typeof(ModelTier));
_slots = new TierSlot[tierValues.Length];
foreach (var tier in tierValues)
{
var cap = tier switch
{
ModelTier.Low => opts.LowMaxConcurrent,
ModelTier.High => opts.HighMaxConcurrent,
_ => opts.BalancedMaxConcurrent,
};

if (cap < 1)
throw new ArgumentOutOfRangeException(
nameof(options),
$"LlmGatewayOptions {tier}MaxConcurrent must be >= 1 (was {cap}).");

_slots[(int)tier] = new TierSlot(cap);
}

_logger = logger;

_logger.LogInformation(
"LlmGateway: per-tier concurrency caps Low={Low} Balanced={Balanced} High={High}",
opts.LowMaxConcurrent, opts.BalancedMaxConcurrent, opts.HighMaxConcurrent);
}

/// <summary>
/// Returns the current number of waiters on the per-tier semaphore. Useful for
/// diagnostics and tests; values are observational and may race.
/// </summary>
internal int GetPendingCount(ModelTier tier) => Volatile.Read(ref _slots[(int)tier].Pending);

/// <summary>
/// Returns the current number of in-flight calls on the tier. Useful for
/// diagnostics and tests; values are observational and may race.
/// </summary>
internal int GetInFlightCount(ModelTier tier) => Volatile.Read(ref _slots[(int)tier].InFlight);

public async Task<T> ExecuteAsync<T>(
ModelTier tier,
Func<CancellationToken, Task<T>> operation,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(operation);

var slot = _slots[(int)tier];
var tierTag = new KeyValuePair<string, object?>("rockbot.llm.tier", tier.ToString());

var slotWaitSw = Stopwatch.StartNew();
Interlocked.Increment(ref slot.Pending);
try
{
await slot.Semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
Interlocked.Decrement(ref slot.Pending);
slotWaitSw.Stop();
HostDiagnostics.LlmGatewaySlotWaitDuration.Record(
slotWaitSw.Elapsed.TotalMilliseconds, tierTag);
}

Interlocked.Increment(ref slot.InFlight);
try
{
return await operation(cancellationToken).ConfigureAwait(false);
}
finally
{
Interlocked.Decrement(ref slot.InFlight);
slot.Semaphore.Release();
}
}

public void Dispose()
{
foreach (var slot in _slots)
slot.Semaphore.Dispose();
}

private sealed class TierSlot
{
public readonly SemaphoreSlim Semaphore;
public int Pending;
public int InFlight;

public TierSlot(int maxConcurrent)
{
Semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
}
}
}
2 changes: 2 additions & 0 deletions src/RockBot.Host/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public static IServiceCollection AddRockBotHost(

services.AddSingleton<LlmCostEstimator>();
services.Configure<LlmPricingOptions>(_ => { });
services.Configure<LlmGatewayOptions>(_ => { });
services.AddSingleton<ILlmGateway, LlmGateway>();
services.AddTransient<ILlmClient, LlmClient>();
services.AddSingleton<IToolProgressNotifier, ToolProgressNotifier>();
services.AddTransient<AgentLoopRunner>();
Expand Down
Loading
Loading