From 172a8a73561d28e0974b5542caff083c2833157a Mon Sep 17 00:00:00 2001 From: Jared Pleva Date: Wed, 1 Apr 2026 01:49:49 +0000 Subject: [PATCH 1/2] docs: add REPL, Ralph Loop, and enhanced tools to README and architecture Reposition ShellForge as both a governed agent runtime and an interactive coding CLI. Add shellforge chat (REPL), shellforge ralph (multi-task loop), sub-agent orchestrator, and edit_file/glob/grep tools to README, architecture, and roadmap docs. Mark v0.8.0 UMAAL milestone as completed. Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 98 ++++++++++++++++++++++++++++++++++++++------ docs/architecture.md | 61 ++++++++++++++++++++++----- docs/roadmap.md | 25 +++++++++-- 3 files changed, 159 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 4918a3a..c7995a4 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,14 @@ # ShellForge -**Governed AI agent runtime — one Go binary, local or cloud.** +**Governed AI coding CLI and agent runtime — one Go binary, local or cloud.** [![Go](https://img.shields.io/badge/Go-1.18+-00ADD8?style=for-the-badge&logo=go&logoColor=white)](https://go.dev) [![GitHub Pages](https://img.shields.io/badge/Live_Site-agentguardhq.github.io/shellforge-ff6b2b?style=for-the-badge)](https://agentguardhq.github.io/shellforge) [![License: MIT](https://img.shields.io/badge/License-MIT-blue?style=for-the-badge)](LICENSE) [![AgentGuard](https://img.shields.io/badge/Governed_by-AgentGuard-green?style=for-the-badge)](https://github.com/AgentGuardHQ/agentguard) -*Run autonomous AI agents with policy enforcement on every tool call. Local via Ollama or cloud via Anthropic API — your choice.* +*Interactive pair-programming with local models + autonomous multi-task execution — with governance on every tool call.* [Website](https://agentguardhq.github.io/shellforge) · [Docs](docs/architecture.md) · [Roadmap](docs/roadmap.md) · [AgentGuard](https://github.com/AgentGuardHQ/agentguard) @@ -54,12 +54,17 @@ shellforge setup # creates agentguard.yaml + output dirs This creates `agentguard.yaml` (governance policy) in your project root. Edit it to customize which actions are allowed/denied. -### 5. Run an agent +### 5. Start a chat session + +```bash +shellforge chat # interactive REPL — pair-program with a local model +``` + +Or run a one-shot agent: ```bash shellforge agent "describe what this project does" shellforge agent "find test gaps and suggest improvements" -shellforge agent "create a hello world program" ``` Every tool call (file reads, writes, shell commands) passes through governance before execution. @@ -70,17 +75,55 @@ Every tool call (file reads, writes, shell commands) passes through governance b ## What Is ShellForge? -ShellForge is a **governed agent runtime** — not an agent framework, not an orchestration layer, not a prompt wrapper. +ShellForge is a **governed AI coding CLI and agent runtime** — like Claude Code or Cursor, but with local models and policy enforcement built in. -It sits between any agent driver and the real world. The agent decides what it wants to do. ShellForge decides whether it's allowed. +Two modes: + +1. **Interactive REPL** (`shellforge chat`) — pair-program with a local or cloud model. Persistent conversation history, shell escapes, color output. +2. **Autonomous agents** (`shellforge agent`, `shellforge ralph`) — one-shot tasks or multi-task loops with automatic validation and commit. + +Both modes share the same governance layer. Every tool call passes through [AgentGuard](https://github.com/AgentGuardHQ/agentguard) policy enforcement before execution. ``` -Agent Driver (Goose, Claude Code, Copilot CLI) - → ShellForge Governance (allow / deny / correct) - → Your Environment (files, shell, git) +You (chat) or Octi Pulpo (dispatch) + → ShellForge Agent Loop (tool calling, drift detection) + → AgentGuard Governance (allow / deny / correct) + → Your Environment (files, shell, git) ``` -**The core insight:** ShellForge's value is governance, not the agent loop. [Goose](https://block.github.io/goose) handles local agent execution. [Dagu](https://github.com/dagu-org/dagu) handles workflow orchestration. ShellForge wraps them all with [AgentGuard](https://github.com/AgentGuardHQ/agentguard) policy enforcement on every tool call. +--- + +## Interactive REPL (`shellforge chat`) + +Pair-programming mode. Persistent conversation history across prompts — the model remembers what you discussed. + +```bash +shellforge chat # local model via Ollama (default) +shellforge chat --provider anthropic # Anthropic API (Haiku/Sonnet/Opus) +shellforge chat --model qwen3:14b # pick a specific model +``` + +Features: +- **Color output** — green prompt, red errors, yellow governance denials +- **Shell escapes** — `!git status` runs a command without leaving the session +- **Ctrl+C** — interrupts the current agent run without killing the session +- **Governance** — every tool call checked against `agentguard.yaml`, same as autonomous mode + +--- + +## Ralph Loop (`shellforge ralph`) + +Stateless-iterative multi-task execution. Each task gets a fresh context window — no accumulated confusion across tasks. + +```bash +shellforge ralph tasks.json # run tasks from a JSON file +shellforge ralph --validate "go test ./..." # validate after each task +shellforge ralph --dry-run # preview without executing +``` + +The loop: **PICK** a task → **IMPLEMENT** it → **VALIDATE** (run tests) → **COMMIT** on success → **RESET** context → next task. + +Tasks come from a JSON file or Octi Pulpo MCP dispatch. Failed validations skip the commit and move on — no broken code lands. --- @@ -112,8 +155,14 @@ shellforge status | Command | Description | |---------|-------------| -| `shellforge agent "prompt"` | Run a governed agent (Ollama, default) | -| `shellforge agent --provider anthropic "prompt"` | Run via Anthropic API (Haiku/Sonnet/Opus, prompt caching) | +| `shellforge chat` | Interactive REPL — pair-program with a local or cloud model | +| `shellforge chat --provider anthropic` | REPL via Anthropic API (Haiku/Sonnet/Opus) | +| `shellforge chat --model qwen3:14b` | REPL with a specific Ollama model | +| `shellforge ralph tasks.json` | Multi-task loop — stateless-iterative execution | +| `shellforge ralph --validate "go test ./..."` | Ralph Loop with post-task validation | +| `shellforge ralph --dry-run` | Preview tasks without executing | +| `shellforge agent "prompt"` | One-shot governed agent (Ollama, default) | +| `shellforge agent --provider anthropic "prompt"` | One-shot via Anthropic API (prompt caching) | | `shellforge agent --thinking-budget 8000 "prompt"` | Enable extended thinking (Sonnet/Opus) | | `shellforge run "prompt"` | Run a governed CLI driver (goose, claude, copilot, codex, gemini) | | `shellforge setup` | Install Ollama, create governance config, verify stack | @@ -125,6 +174,23 @@ shellforge status --- +## Built-in Tools + +The agent loop (used by `chat`, `agent`, and `ralph`) has 8 built-in tools, all governed: + +| Tool | What It Does | +|------|-------------| +| `read_file` | Read file contents | +| `write_file` | Write a complete file | +| `edit_file` | Targeted find-and-replace (like Claude Code's Edit tool) | +| `glob` | Pattern-based file discovery with recursive `**` support | +| `grep` | Regex content search with `file:line` output | +| `run_shell` | Execute shell commands (via RTK for token compression) | +| `list_directory` | List directory contents | +| `search_files` | Search files by name pattern | + +--- + ## Multi-Driver Governance ShellForge governs any CLI agent driver via AgentGuard hooks. Each driver keeps its own model and agent loop — ShellForge ensures governance is active and spawns the driver as a subprocess. @@ -151,6 +217,12 @@ See `dags/multi-driver-swarm.yaml` and `dags/workspace-swarm.yaml` for examples. ``` ┌───────────────────────────────────────────────────┐ +│ Entry Points │ +│ chat (REPL) · agent (one-shot) · ralph (multi) │ +│ run · serve (daemon) │ +└────────────────────┬──────────────────────────────┘ + │ prompt / task +┌────────────────────▼──────────────────────────────┐ │ Octi Pulpo (Coordination) │ │ Budget-aware dispatch · Memory · Model cascading │ └────────────────────┬──────────────────────────────┘ @@ -158,6 +230,7 @@ See `dags/multi-driver-swarm.yaml` and `dags/workspace-swarm.yaml` for examples. ┌────────────────────▼──────────────────────────────┐ │ ShellForge Agent Loop │ │ LLM provider · Tool calling · Drift detection │ +│ Sub-agent orchestrator (spawn sync/async) │ │ Anthropic API or Ollama │ └────────────────────┬──────────────────────────────┘ │ tool call @@ -171,6 +244,7 @@ See `dags/multi-driver-swarm.yaml` and `dags/workspace-swarm.yaml` for examples. ┌────────────────────▼──────────────────────────────┐ │ Your Environment │ │ Files · Shell (RTK) · Git · Network │ +│ 8 tools: read/write/edit/glob/grep/shell/ls/find │ │ Sandboxed by OpenShell │ └───────────────────────────────────────────────────┘ ``` diff --git a/docs/architecture.md b/docs/architecture.md index 751f3c7..0d41e80 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -4,6 +4,41 @@ ShellForge is a single Go binary (~7.5MB) that provides governed AI agent execution. Its core value is **governance** — every agent driver, whether a CLI tool, browser session, or local model, runs through AgentGuard policy enforcement on every action. +## Entry Points + +ShellForge provides multiple entry points, all sharing the same agent loop and governance layer: + +| Entry Point | Mode | Context | +|-------------|------|---------| +| `shellforge chat` | Interactive REPL | Persistent — conversation history across prompts | +| `shellforge agent "prompt"` | One-shot | Single task, single context window | +| `shellforge ralph tasks.json` | Multi-task loop | Stateless-iterative — fresh context per task | +| `shellforge run ` | CLI driver | Governed subprocess (Goose, Claude Code, etc.) | +| `shellforge serve agents.yaml` | Daemon | 24/7 swarm with memory-aware scheduling | + +### Interactive REPL (`chat`) + +Pair-programming mode. The user and model share a persistent conversation — the model remembers previous prompts and results within the session. Color output (green prompt, red errors, yellow governance denials). Shell escapes via `!command`. Ctrl+C interrupts the current agent run without killing the session. + +### Ralph Loop (`ralph`) + +Stateless-iterative execution for multi-task workloads. Each task gets a fresh context window to prevent accumulated confusion: + +``` +PICK task from queue → IMPLEMENT → VALIDATE (run tests) → COMMIT on success → RESET context → next +``` + +Tasks come from a JSON file or Octi Pulpo MCP dispatch. `--validate` runs a command (e.g., `go test ./...`) after each task. `--dry-run` previews without executing. + +### Sub-Agent Orchestrator + +The agent loop can spawn sub-agents for parallel work: + +- **SpawnSync** — block and wait for a sub-agent to complete +- **SpawnAsync** — fire multiple sub-agents, collect results +- Concurrency controlled via semaphore +- Sub-agent results compressed to ~750 tokens before returning to parent + ## Execution Model ShellForge supports three classes of agent driver, all governed uniformly: @@ -110,7 +145,6 @@ Octi Pulpo routes tasks to the cheapest capable driver: | **Optimize** | [RTK](https://github.com/rtk-ai/rtk) | Token compression — 70-90% reduction on shell output | | **Execute** | [Goose](https://block.github.io/goose) / [OpenClaw](https://github.com/openclaw/openclaw) | Agent execution + browser automation | | **Coordinate** | [Octi Pulpo](https://github.com/AgentGuardHQ/octi-pulpo) | Budget-aware dispatch, episodic memory, model cascading | -| **Coordinate** | [Octi Pulpo](https://github.com/AgentGuardHQ/octi-pulpo) | Swarm coordination via MCP | | **Govern** | [AgentGuard](https://github.com/AgentGuardHQ/agentguard) | Policy enforcement on every action | | **Sandbox** | [OpenShell](https://github.com/NVIDIA/OpenShell) | Kernel-level isolation (Docker on macOS) | | **Scan** | [DefenseClaw](https://github.com/cisco-ai-defense/defenseclaw) | Supply chain scanner — AI Bill of Materials | @@ -120,6 +154,8 @@ Octi Pulpo routes tasks to the cheapest capable driver: ``` cmd/shellforge/ ├── main.go # CLI entry point (cobra-style subcommands) +├── chat.go # Interactive REPL (`shellforge chat`) +├── ralph.go # Multi-task loop (`shellforge ralph`) └── status.go # Ecosystem health check internal/ @@ -128,10 +164,13 @@ internal/ │ └── anthropic.go# Anthropic API adapter (stdlib HTTP, prompt caching, tool_use) ├── agent/ # Agentic loop │ ├── loop.go # runProviderLoop (Anthropic) + runOllamaLoop, drift detection wiring -│ └── drift.go # Drift detector — self-score every 5 calls, steer/kill on low scores +│ ├── drift.go # Drift detector — self-score every 5 calls, steer/kill on low scores +│ └── repl.go # Interactive REPL — persistent history, color output, shell escapes +├── ralph/ # Ralph Loop — stateless-iterative multi-task execution +│ └── loop.go # PICK → IMPLEMENT → VALIDATE → COMMIT → RESET cycle ├── governance/ # agentguard.yaml parser + policy engine ├── ollama/ # Ollama HTTP client (chat, generate) -├── tools/ # 5 tool implementations + RTK wrapper +├── tools/ # 8 tool implementations (read/write/edit/glob/grep/shell/ls/find) + RTK wrapper ├── engine/ # Pluggable engine interface (Goose, OpenClaw, OpenCode) ├── logger/ # Structured JSON logging ├── scheduler/ # Memory-aware scheduling + cron @@ -146,17 +185,19 @@ internal/ ShellForge uses a pluggable engine system: -1. **Goose** (preferred local driver) — subprocess, native Ollama support, SHELL wrapped via `govern-shell.sh` -2. **OpenClaw** (browser + integrations) — browser automation, web app access, 100+ skills -3. **NemoClaw** (enterprise) — OpenClaw + NVIDIA OpenShell sandbox + Nemotron local models -4. **CLI Drivers** (cloud coding) — Claude Code, Codex, Copilot CLI, Gemini CLI -5. **Native** (fallback) — built-in multi-turn loop with Ollama + tool calling +1. **Native REPL** (`shellforge chat`) — interactive pair-programming, persistent history, 8 built-in tools +2. **Native Agent** (`shellforge agent`) — one-shot autonomous execution with the same tool set +3. **Ralph Loop** (`shellforge ralph`) — stateless-iterative multi-task with validation and auto-commit +4. **Goose** (local driver) — subprocess, native Ollama support, SHELL wrapped via `govern-shell.sh` +5. **OpenClaw** (browser + integrations) — browser automation, web app access, 100+ skills +6. **NemoClaw** (enterprise) — OpenClaw + NVIDIA OpenShell sandbox + Nemotron local models +7. **CLI Drivers** (cloud coding) — Claude Code, Codex, Copilot CLI, Gemini CLI ## Governance Flow ``` -User Request → Engine (Goose/OpenClaw/CLI/Native) - → Tool Call → Governance Check (agentguard.yaml) +User Request → Entry Point (chat/agent/ralph/run/serve) + → Agent Loop → Tool Call → Governance Check (agentguard.yaml) → ALLOW → Execute Tool → Return Result → DENY → Log Violation → Correction Feedback → Retry ``` diff --git a/docs/roadmap.md b/docs/roadmap.md index 1bfe4d6..517046f 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -40,7 +40,7 @@ - [x] Fixed catch-all deny bug (bounded-execution policy was denying everything) - [x] Dagu DAG templates (sdlc-swarm, studio-swarm, workspace-swarm, multi-driver) -### v0.7.0 — Anthropic API Provider ← CURRENT +### v0.7.0 — Anthropic API Provider - [x] LLM provider interface (`llm.Provider`) — pluggable Ollama vs Anthropic backends - [x] Anthropic API adapter — stdlib HTTP, structured `tool_use` blocks, multi-turn history - [x] Prompt caching — `cache_control: ephemeral` on system + tools, ~90% savings on cached tokens @@ -49,6 +49,21 @@ - [x] Drift detection — self-score every 5 tool calls, steer below 7, kill below 5 twice - [x] RTK token compression wired into `runShellWithRTK()` (70-90% savings on shell output) +### v0.8.0 — UMAAL (Interactive REPL + Ralph Loop + Enhanced Tools) +- [x] Interactive REPL (`shellforge chat`) — pair-programming with persistent conversation history +- [x] Color output (green prompt, red errors, yellow governance denials) +- [x] Shell escapes (`!command`) and Ctrl+C interrupt without session kill +- [x] Ollama (local) and Anthropic API provider support in REPL +- [x] Ralph Loop (`shellforge ralph`) — stateless-iterative multi-task execution +- [x] PICK → IMPLEMENT → VALIDATE → COMMIT → RESET cycle +- [x] Task input from JSON file or Octi Pulpo MCP dispatch +- [x] `--validate` flag for post-task test commands, `--dry-run` for preview +- [x] Sub-agent orchestrator — SpawnSync (block), SpawnAsync (fire and collect) +- [x] Concurrency control via semaphore, context compression (~750 tokens) +- [x] `edit_file` tool — targeted find-and-replace +- [x] `glob` tool — pattern-based file discovery with recursive `**` support +- [x] `grep` tool — regex content search with `file:line` output + --- ## In Progress @@ -142,17 +157,21 @@ Bugs identified during v0.6.x development. Fix before v1.0. --- -## Stack (as of v0.6.1) +## Stack (as of v0.8.0) | Component | Role | Status | |---|---|---| +| `shellforge chat` | Interactive REPL | Working | +| `shellforge ralph` | Multi-task loop | Working | +| `shellforge agent` | One-shot agent | Working | | Goose (Block) | Local model driver | Working | | Claude Code | API driver (Linux) | Working (via hooks) | | Copilot CLI | API driver (Linux) | Working (via hooks) | | Codex CLI | API driver (Linux) | Coming soon | | Gemini CLI | API driver (Linux) | Coming soon | | Ollama | Local inference | Working | +| Anthropic API | Cloud inference | Working (prompt caching) | | AgentGuard | Governance kernel | Working (YAML eval + Go kernel) | -| Dagu | Orchestration | Working (DAGs + web UI) | +| Octi Pulpo | Swarm coordination | Working (MCP) | | RTK | Token compression | Optional | | Docker | Sandbox | Optional | From 55b853ac6b97dc7df831546293ce75a82be4495c Mon Sep 17 00:00:00 2001 From: Jared Pleva Date: Thu, 2 Apr 2026 03:39:56 +0000 Subject: [PATCH 2/2] =?UTF-8?q?fix(scheduler):=20priority-aware=20Inferenc?= =?UTF-8?q?eQueue=20via=20container/heap=20=E2=80=94=20closes=20#49?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the FIFO semaphore channel with a max-priority heap so that higher-Priority requests (PriorityEvaluator > PriorityCorrector > PriorityWorker) are granted inference slots before lower-priority ones. Within equal priority, FIFO order is preserved via a monotonic seq counter. Submit() now returns a release func() that the caller MUST invoke once inference completes, correctly extending slot ownership beyond the call. Adds 7 tests: priority ordering, FIFO tie-breaking, concurrency cap, queue-full rejection, context cancellation, release correctness, and MaxParallel accessor. Co-Authored-By: Claude Sonnet 4.6 --- internal/scheduler/queue.go | 148 +++++++++++++++---- internal/scheduler/queue_test.go | 243 +++++++++++++++++++++++++++++++ 2 files changed, 361 insertions(+), 30 deletions(-) create mode 100644 internal/scheduler/queue_test.go diff --git a/internal/scheduler/queue.go b/internal/scheduler/queue.go index 616f66c..4722f3b 100644 --- a/internal/scheduler/queue.go +++ b/internal/scheduler/queue.go @@ -1,12 +1,14 @@ // Package scheduler provides a priority-aware inference queue with -// semaphore-based concurrency control. This prevents local model -// overload by limiting parallel inference slots and rejecting -// requests when the queue depth is exceeded. +// concurrency control. This prevents local model overload by limiting +// parallel inference slots. Requests are dispatched in descending +// Priority order; within the same priority they are served FIFO. package scheduler import ( + "container/heap" "context" "fmt" + "sync" "sync/atomic" "time" ) @@ -41,55 +43,141 @@ type InferenceResult struct { Tokens int } +// waitEntry holds a pending Submit() call in the priority heap. +type waitEntry struct { + req InferenceRequest + grant chan struct{} // closed when a slot is granted to this entry + seq int64 // insertion order for FIFO tie-breaking within equal priority + index int // position in heap; -1 once removed or dispatched +} + +// requestHeap is a max-heap over *waitEntry ordered by Priority (descending), +// then by seq (ascending, i.e. FIFO within equal priority). +type requestHeap []*waitEntry + +func (h requestHeap) Len() int { return len(h) } + +func (h requestHeap) Less(i, j int) bool { + if h[i].req.Priority != h[j].req.Priority { + return h[i].req.Priority > h[j].req.Priority + } + return h[i].seq < h[j].seq +} + +func (h requestHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *requestHeap) Push(x any) { + e := x.(*waitEntry) + e.index = len(*h) + *h = append(*h, e) +} + +func (h *requestHeap) Pop() any { + old := *h + n := len(old) + e := old[n-1] + old[n-1] = nil + *h = old[:n-1] + e.index = -1 + return e +} + // InferenceQueue manages concurrent access to local model inference. -// It uses a buffered channel as a semaphore to limit parallelism and -// an atomic counter to enforce maximum queue depth. +// Requests wait in a max-priority heap and are dispatched in Priority +// order (higher values first). Within the same priority, requests are +// served FIFO. type InferenceQueue struct { - slots chan struct{} - maxDepth int - pending int64 // atomic counter + mu sync.Mutex + waiting requestHeap + maxParallel int + running int + maxDepth int + pending int64 // atomic: waiting + running + seq int64 // monotonic insertion counter } -// NewInferenceQueue creates a queue that allows maxParallel concurrent -// inference calls and rejects submissions when maxDepth requests are -// already waiting. +// NewInferenceQueue creates a queue that dispatches up to maxParallel +// concurrent inference calls and rejects new submissions when maxDepth +// requests are already pending (waiting + running). func NewInferenceQueue(maxParallel, maxDepth int) *InferenceQueue { - return &InferenceQueue{ - slots: make(chan struct{}, maxParallel), - maxDepth: maxDepth, + q := &InferenceQueue{ + maxParallel: maxParallel, + maxDepth: maxDepth, } + heap.Init(&q.waiting) + return q } -// Submit attempts to acquire an inference slot. It blocks until a slot -// is available or the context is cancelled. Returns an error if the -// queue depth limit is exceeded or the context expires. -func (q *InferenceQueue) Submit(ctx context.Context, req InferenceRequest) error { +// Submit enqueues req and blocks until a slot is granted or ctx is +// cancelled. On success it returns a release function that the caller +// MUST invoke once inference is complete to free the slot. +func (q *InferenceQueue) Submit(ctx context.Context, req InferenceRequest) (func(), error) { if int(atomic.LoadInt64(&q.pending)) >= q.maxDepth { - return fmt.Errorf("inference queue full (%d pending)", q.maxDepth) + return nil, fmt.Errorf("inference queue full (%d pending)", q.maxDepth) } atomic.AddInt64(&q.pending, 1) - defer atomic.AddInt64(&q.pending, -1) - // Acquire slot (blocks until available or context cancelled) + entry := &waitEntry{ + req: req, + grant: make(chan struct{}), + seq: atomic.AddInt64(&q.seq, 1), + } + + q.mu.Lock() + heap.Push(&q.waiting, entry) + q.tryDispatch() + q.mu.Unlock() + select { - case q.slots <- struct{}{}: + case <-entry.grant: + return func() { + atomic.AddInt64(&q.pending, -1) + q.mu.Lock() + q.running-- + q.tryDispatch() + q.mu.Unlock() + }, nil + case <-ctx.Done(): - return ctx.Err() + q.mu.Lock() + if entry.index >= 0 { + // Still in heap — remove cleanly before it is dispatched. + heap.Remove(&q.waiting, entry.index) + q.mu.Unlock() + atomic.AddInt64(&q.pending, -1) + return nil, ctx.Err() + } + // index == -1: tryDispatch already popped this entry and incremented + // q.running. Release the slot immediately since we won't use it. + q.running-- + q.tryDispatch() + q.mu.Unlock() + atomic.AddInt64(&q.pending, -1) + return nil, ctx.Err() } - defer func() { <-q.slots }() +} - // Slot acquired — the caller's runFunc handles actual inference. - // This method only manages concurrency; execution is delegated - // to the orchestrator that owns this queue. - return nil +// tryDispatch grants slots to the highest-priority waiting entries until +// maxParallel is reached or the heap is empty. Must be called with q.mu held. +func (q *InferenceQueue) tryDispatch() { + for q.running < q.maxParallel && len(q.waiting) > 0 { + entry := heap.Pop(&q.waiting).(*waitEntry) + q.running++ + close(entry.grant) + } } -// Pending returns the current number of queued inference requests. +// Pending returns the current number of pending inference requests +// (both waiting for a slot and currently running). func (q *InferenceQueue) Pending() int64 { return atomic.LoadInt64(&q.pending) } // MaxParallel returns the concurrency limit for this queue. func (q *InferenceQueue) MaxParallel() int { - return cap(q.slots) + return q.maxParallel } diff --git a/internal/scheduler/queue_test.go b/internal/scheduler/queue_test.go new file mode 100644 index 0000000..10d74b5 --- /dev/null +++ b/internal/scheduler/queue_test.go @@ -0,0 +1,243 @@ +package scheduler + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +func req(priority Priority) InferenceRequest { + return InferenceRequest{Priority: priority} +} + +// TestPriorityOrdering verifies that a higher-priority request is granted +// before a lower-priority one when both are waiting for the same slot. +func TestPriorityOrdering(t *testing.T) { + q := NewInferenceQueue(1, 10) + + // Hold the single slot. + release0, err := q.Submit(context.Background(), req(PriorityWorker)) + if err != nil { + t.Fatalf("initial submit: %v", err) + } + + // Enqueue a low-priority and a high-priority request while slot is held. + var order []string + var wg sync.WaitGroup + mu := sync.Mutex{} + + wg.Add(2) + // Submit low-priority first so it enters the heap before high-priority. + go func() { + defer wg.Done() + release, err := q.Submit(context.Background(), req(PriorityWorker)) + if err != nil { + t.Errorf("low-priority submit: %v", err) + return + } + mu.Lock() + order = append(order, "low") + mu.Unlock() + release() + }() + // Small delay so the low-priority goroutine enters the heap first. + time.Sleep(5 * time.Millisecond) + go func() { + defer wg.Done() + release, err := q.Submit(context.Background(), req(PriorityEvaluator)) + if err != nil { + t.Errorf("high-priority submit: %v", err) + return + } + mu.Lock() + order = append(order, "high") + mu.Unlock() + release() + }() + time.Sleep(5 * time.Millisecond) + + // Freeing the held slot should dispatch the Evaluator (higher priority) first. + release0() + wg.Wait() + + if len(order) != 2 { + t.Fatalf("expected 2 completions, got %d", len(order)) + } + if order[0] != "high" { + t.Errorf("expected high-priority first, got order=%v", order) + } +} + +// TestFIFOWithinSamePriority verifies that equal-priority requests are +// served in submission order. +func TestFIFOWithinSamePriority(t *testing.T) { + q := NewInferenceQueue(1, 10) + + // Hold the single slot. + hold, _ := q.Submit(context.Background(), req(PriorityWorker)) + + var order []int + var wg sync.WaitGroup + mu := sync.Mutex{} + + for i := 0; i < 3; i++ { + i := i + wg.Add(1) + go func() { + defer wg.Done() + release, err := q.Submit(context.Background(), req(PriorityWorker)) + if err != nil { + t.Errorf("submit %d: %v", i, err) + return + } + mu.Lock() + order = append(order, i) + mu.Unlock() + release() + }() + time.Sleep(2 * time.Millisecond) // ensure ordered insertion + } + time.Sleep(10 * time.Millisecond) + + hold() + wg.Wait() + + for i, v := range order { + if v != i { + t.Errorf("expected FIFO order [0 1 2], got %v", order) + break + } + } +} + +// TestConcurrencyLimit verifies that at most maxParallel requests run at once. +func TestConcurrencyLimit(t *testing.T) { + const maxP = 2 + q := NewInferenceQueue(maxP, 20) + + var running int64 + var peak int64 + var wg sync.WaitGroup + + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + defer wg.Done() + release, err := q.Submit(context.Background(), req(PriorityWorker)) + if err != nil { + t.Errorf("submit: %v", err) + return + } + cur := atomic.AddInt64(&running, 1) + for { + old := atomic.LoadInt64(&peak) + if cur <= old || atomic.CompareAndSwapInt64(&peak, old, cur) { + break + } + } + time.Sleep(10 * time.Millisecond) + atomic.AddInt64(&running, -1) + release() + }() + } + wg.Wait() + + if peak > maxP { + t.Errorf("concurrency limit violated: peak=%d, maxParallel=%d", peak, maxP) + } +} + +// TestQueueFull verifies rejection when maxDepth is reached. +func TestQueueFull(t *testing.T) { + q := NewInferenceQueue(1, 2) + + // Hold the one slot. + hold, _ := q.Submit(context.Background(), req(PriorityWorker)) + + blocker := make(chan struct{}) + go func() { + release, err := q.Submit(context.Background(), req(PriorityWorker)) + if err == nil { + release() + } + close(blocker) + }() + time.Sleep(5 * time.Millisecond) // let goroutine enter heap (pending=2) + + _, err := q.Submit(context.Background(), req(PriorityWorker)) + if err == nil { + t.Error("expected queue-full error, got nil") + } + + hold() // release slot so the goroutine can proceed + <-blocker +} + +// TestContextCancellationWhileWaiting verifies that a cancelled context +// removes the entry from the heap and returns the context error. +func TestContextCancellationWhileWaiting(t *testing.T) { + q := NewInferenceQueue(1, 10) + + // Hold the slot so the next Submit must wait. + hold, _ := q.Submit(context.Background(), req(PriorityWorker)) + defer hold() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + + _, err := q.Submit(ctx, req(PriorityWorker)) + if err == nil { + t.Error("expected context error, got nil") + } + if q.Pending() != 1 { // only the held slot should remain + t.Errorf("expected pending=1 after cancellation, got %d", q.Pending()) + } +} + +// TestReleaseRestoresSlot verifies that calling release() makes the slot +// available for the next waiter. +func TestReleaseRestoresSlot(t *testing.T) { + q := NewInferenceQueue(1, 10) + + release, err := q.Submit(context.Background(), req(PriorityWorker)) + if err != nil { + t.Fatalf("first submit: %v", err) + } + if q.Pending() != 1 { + t.Errorf("expected pending=1 while slot held, got %d", q.Pending()) + } + + done := make(chan struct{}) + go func() { + r2, err := q.Submit(context.Background(), req(PriorityWorker)) + if err != nil { + t.Errorf("second submit: %v", err) + } else { + r2() + } + close(done) + }() + + time.Sleep(5 * time.Millisecond) + release() // should unblock the waiter + + select { + case <-done: + case <-time.After(500 * time.Millisecond): + t.Error("second Submit did not complete after release()") + } + + if q.Pending() != 0 { + t.Errorf("expected pending=0 after all releases, got %d", q.Pending()) + } +} + +// TestMaxParallelReturnsCorrectValue is a basic smoke test for MaxParallel(). +func TestMaxParallelReturnsCorrectValue(t *testing.T) { + q := NewInferenceQueue(3, 10) + if q.MaxParallel() != 3 { + t.Errorf("expected MaxParallel=3, got %d", q.MaxParallel()) + } +}