diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 5313a8a..95f5546 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,7 +14,7 @@ jobs: - uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.23.4' - name: Tidy run: go mod tidy diff --git a/Makefile b/Makefile index bee76cc..ec7d7e6 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: test all test-race test-fuzz lint clean +.PHONY: test all test-race test-fuzz lint clean bench all: lint clean test test-race test-fuzz bench @@ -9,7 +9,7 @@ test-race: go test -race -count=1 -v ./... test-fuzz: - go test -fuzz=FuzzWaitingRoom -fuzztime=30s ./... + go test -fuzz=FuzzWaitingRoom -fuzztime=30s . bench: go test -bench=. -benchmem ./... @@ -18,4 +18,4 @@ lint: go vet ./... clean: - go clean ./... + go clean ./... \ No newline at end of file diff --git a/README.md b/README.md index 12c4fe7..094a3ee 100644 --- a/README.md +++ b/README.md @@ -5,22 +5,82 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/andreimerlescu/room.svg)](https://pkg.go.dev/github.com/andreimerlescu/room) [![Go Report Card](https://goreportcard.com/badge/github.com/andreimerlescu/room)](https://goreportcard.com/report/github.com/andreimerlescu/room) +[![CI](https://github.com/andreimerlescu/room/actions/workflows/go.yml/badge.svg)](https://github.com/andreimerlescu/room/actions/workflows/go.yml) [![Apache 2.0 License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE) +When your Go service hits capacity, don't drop requests — queue them. + +`room` is a single-import middleware that sits in front of your Gin handlers +and turns excess traffic into an orderly waiting room. Every request gets a +ticket. Clients that can't be served immediately see a live-updating queue +page with their position. As slots open, they're admitted automatically in +FIFO order. Your handlers never know the difference — they see normal +requests arriving at the rate you chose. + +```go +wr := &room.WaitingRoom{} +wr.Init(500) +defer wr.Stop() +wr.RegisterRoutes(r) +// That's it. Request 501 sees the waiting room. +``` + +--- + +## See it in action — 30 seconds + +```bash +cd sample/basic-web-app +bash test.sh +``` + +The test script builds the server, launches 30 concurrent clients, and +prints a live dashboard while the waiting room queues and admits them. +Open `http://localhost:8080/` in your browser while it runs to see your +position tick down in real time. + +``` + [ 15s] sent:120 served:42 queued:78 err:0 active:83 ~2 req/s [wave 4] +``` + +``` +╔══════════════════════════════════════════════════╗ +║ Results ║ +╠══════════════════════════════════════════════════╣ +║ Total sent: 191 ║ +║ Served (200): 191 ║ +║ Queued (waited): 187 ║ +║ Errors: 0 ║ +║ Throughput: 3 req/s ║ +╠══════════════════════════════════════════════════╣ +║ FULL transitions: 7 ║ +║ DRAIN transitions: 7 ║ +║ QUEUE events: 187 ║ +╚══════════════════════════════════════════════════╝ + +✓ Waiting room activated — 187 requests queued. +``` + +No configuration files, no external dependencies, no infrastructure. +One `go get`, one `Init`, one `RegisterRoutes`. + --- ## Why room? -When your application is at capacity, you have three choices: drop the -request (429), queue it blindly (no ordering guarantee), or admit it -through a proper waiting room with FIFO ordering, position awareness, -and a live status page. +When your application is at capacity you have three choices: -`room` does the third. It sits in front of your gin handlers as middleware, -issues every arriving request a ticket and admits them in ticket order as -slots open, though clients that become eligible simultaneously may be served -in any order among themselves. Clients that must wait see a clean waiting -room page that updates their position automatically — no refresh required. +| Strategy | What happens | UX | +|---|---|---| +| **Drop (429)** | Reject the request | User sees an error, retries blindly, amplifies the spike | +| **Queue blindly** | Buffer with no ordering | No position awareness, no ETA, users refresh and make it worse | +| **Waiting room** | Issue a ticket, show position, admit in order | User waits calmly, knows their place, gets in automatically | + +`room` does the third. It gives you FIFO ordering, live position tracking, +a polished waiting-room page, lifecycle callbacks for autoscaling, a reaper +that cleans up abandoned clients, configurable cookie security, runtime +capacity adjustment, and a max-queue-depth circuit breaker — all behind a +single middleware call. --- @@ -30,28 +90,128 @@ room page that updates their position automatically — no refresh required. go get github.com/andreimerlescu/room ``` -Requires **Go 1.21+**. +Requires **Go 1.22+** and [gin](https://github.com/gin-gonic/gin). --- ## Quick start ```go -r := gin.Default() +package main -wr := &room.WaitingRoom{} -if err := wr.Init(500); err != nil { - log.Fatal(err) +import ( + "log" + + "github.com/andreimerlescu/room" + "github.com/gin-gonic/gin" +) + +func main() { + r := gin.Default() + + wr := &room.WaitingRoom{} + if err := wr.Init(500); err != nil { + log.Fatal(err) + } + defer wr.Stop() + + // Registers GET /queue/status and attaches the middleware. + // Every route registered AFTER this line is gated. + wr.RegisterRoutes(r) + + r.GET("/", func(c *gin.Context) { + c.String(200, "You're in!") + }) + + r.Run(":8080") } -defer wr.Stop() +``` -// Registers GET /queue/status and attaches the middleware. -wr.RegisterRoutes(r) -r.Run(":8080") +The 501st concurrent request sees the waiting room. The moment a slot +opens, the next client in line is admitted automatically — no refresh +required. + +--- + +## What your users see + +When a request can't be served immediately, `room` responds with a +self-contained HTML page that polls `/queue/status` every 3 seconds and +updates the position in place: + +- **Queue position** — a large, visible number that ticks down +- **Auto-admit** — the page reloads automatically when `ready=true` +- **No refresh needed** — the status text updates live +- **Dark theme** — clean, modern design that works on mobile +- **Accessible** — uses `aria-live` regions for screen readers + +Replace the default page with your own via `wr.SetHTML(myHTML)`. The only +contract is `{{.Position}}` for the queue number and a `fetch("/queue/status")` +poll loop in your JavaScript. + +--- + +## Lifecycle callbacks + +`room` exposes a full event system. Register handlers with `On` and react +to capacity changes in real time — without polling, without a sidecar, +without coupling business logic to the middleware. + +```go +// Scale out when the room fills up. +wr.On(room.EventFull, func(s room.Snapshot) { + log.Printf("room full (%d/%d)", s.Occupancy, s.Capacity) + go provisionHost() +}) + +// Scale back in when pressure drops. +wr.On(room.EventDrain, func(s room.Snapshot) { + go deregisterHost() +}) + +// Observe every admission and completion. +wr.On(room.EventEnter, func(s room.Snapshot) { metrics.Inc("room.enter") }) +wr.On(room.EventExit, func(s room.Snapshot) { metrics.Inc("room.exit") }) + +// React to queuing, abandoned tickets, and timeouts. +wr.On(room.EventQueue, func(s room.Snapshot) { metrics.Inc("room.queue") }) +wr.On(room.EventEvict, func(s room.Snapshot) { metrics.Inc("room.evict") }) +wr.On(room.EventTimeout, func(s room.Snapshot) { metrics.Inc("room.timeout") }) +``` + +Every handler receives a **Snapshot** — a point-in-time copy of the room's +state at the moment the event fired: + +```go +type Snapshot struct { + Event Event + Occupancy int // slots in use + Capacity int // maximum slots + QueueDepth int64 // requests waiting +} + +func (s Snapshot) Full() bool // Occupancy >= Capacity +func (s Snapshot) Empty() bool // Occupancy == 0 ``` -That's it. The 501st concurrent request sees the waiting room. The 500th -slot to free up admits them automatically. +Handlers run asynchronously in their own goroutines — a slow callback +never stalls the request path. Remove handlers at any time with `wr.Off(event)`. + +### Events at a glance + +| Event | Fires when | Use case | +|---|---|---| +| `EventEnter` | Request acquires a slot | Throughput metrics | +| `EventExit` | Request completes, slot released | Latency tracking | +| `EventFull` | Room transitions to full (edge, not every admission) | Scale-out trigger | +| `EventDrain` | Room transitions from full to available (edge) | Scale-in signal | +| `EventQueue` | Request issued a waiting-room ticket | Queue depth alerting | +| `EventEvict` | Reaper removes an abandoned token | Ghost ticket monitoring | +| `EventTimeout` | Request context cancelled before admission | Client timeout tracking | + +`EventFull` and `EventDrain` fire only on the **transition edge** — not +on every admission while full. This means your autoscaler callback fires +once when you need it, not 10,000 times during a traffic spike. --- @@ -68,71 +228,237 @@ defer wr.Stop() html, _ := os.ReadFile("my_waiting_room.html") wr.SetHTML(html) +// Production cookie security. +wr.SetSecureCookie(true) +wr.SetCookiePath("/app") +wr.SetCookieDomain(".example.com") + +// Queue depth circuit breaker — reject with 503 beyond this depth. +wr.SetMaxQueueDepth(10000) + // Tighten the reaper for a high-traffic event. wr.SetReaperInterval(15 * time.Second) -// Registers GET /queue/status and attaches the middleware. -wr.RegisterRoutes(r) +// Register lifecycle hooks before traffic arrives. +wr.On(room.EventFull, func(s room.Snapshot) { + go provisionHost() +}) +wr.RegisterRoutes(r) r.Run(":8080") ``` --- +## Runtime capacity adjustment + +Change capacity without restarting the server: + +```go +r.POST("/admin/cap", func(c *gin.Context) { + var body struct{ Cap int32 `json:"cap"` } + c.ShouldBindJSON(&body) + wr.SetCap(body.Cap) + c.JSON(200, gin.H{ + "cap": wr.Cap(), + "occupancy": wr.Len(), + "queue_depth": wr.QueueDepth(), + "utilization": wr.UtilizationSmoothed(), + }) +}) +``` + +```bash +# Double capacity — queued requests rush into the new slots +curl -X POST localhost:8080/admin/cap -d '{"cap":1000}' +``` + +`SetCap` takes effect immediately. Expanding capacity opens new semaphore +slots and queued requests start being admitted on their next poll. Shrinking +drains gracefully — in-flight requests finish normally. + +--- + ## Config reload ```go func onConfigReload(cfg Config) { wr.SetCap(int32(cfg.MaxConcurrent)) wr.SetReaperInterval(cfg.ReaperInterval) + wr.SetMaxQueueDepth(cfg.MaxQueueDepth) } ``` +Every setter is safe to call while traffic is flowing. + --- ## How it works +``` + ┌─────────────────────────────────────────────────────┐ + │ Incoming Request │ + └─────────────┬───────────────────────────────────────┘ + │ + ▼ + ┌────────────────┐ + │ Issue Ticket │ nextTicket.Add(1) + └───────┬────────┘ + │ + ▼ + ┌──────────────────────┐ YES ┌──────────────┐ + │ ticket ≤ nowServing │ ──────────▶ │ Acquire Slot │ + │ + cap? │ │ (fast path) │ + └──────────┬──────────┘ └──────┬───────┘ + │ NO │ + ▼ ▼ + ┌──────────────────────┐ ┌────────────────┐ + │ Serve Waiting Room │ │ Run Handler │ + │ + Issue Cookie │ │ defer Release│ + └──────────┬──────────┘ └────────────────┘ + │ + ▼ + ┌──────────────────────┐ + │ Client Polls │ GET /queue/status + │ /queue/status │ every 3s + jitter + └──────────┬──────────┘ + │ ready=true + ▼ + ┌──────────────────────┐ + │ Client Reloads │ Browser auto-redirects + │ → Fast Path │ with cookie → admitted + └──────────────────────┘ +``` + | Layer | Responsibility | |---|---| -| Ticket counter | Assigns each request a monotonically increasing position on arrival | -| FIFO gate | Blocks requests whose ticket is outside the serving window | -| sema | Manages how many requests are actively being served | -| Token store | Maps session cookies to tickets for `/queue/status` polling | -| Reaper | Evicts ghost tickets from clients that disconnected mid-queue | +| **Ticket counter** | Monotonically increasing position on arrival | +| **Serving window** | `nowServing + cap` determines who gets in | +| **Semaphore** | Enforces concurrent slot limit via [sema](https://github.com/andreimerlescu/sema) | +| **Token store** | Maps session cookies to tickets for poll-based admission | +| **Reaper** | Evicts abandoned tokens, advances the queue past ghost tickets | +| **Callbacks** | Fires lifecycle events for autoscaling and observability | + +--- + +## Security considerations + +| Concern | How room handles it | +|---|---| +| **Cookie theft / replay** | Tokens are 128-bit cryptographically random hex strings. `HttpOnly` flag prevents XSS reads. Call `SetSecureCookie(true)` in production for the `Secure` flag. | +| **Queue flooding** | `SetMaxQueueDepth(n)` rejects new arrivals with 503 when the queue exceeds `n`, preventing unbounded memory growth. | +| **Poll abuse** | Per-token rate limiting on `/queue/status` — polls faster than 1/second receive 429 with `Retry-After`. | +| **Ghost tickets** | The reaper runs on a configurable interval (default 5m), evicts expired tokens, and advances `nowServing` so the queue doesn't stall behind abandoned clients. | +| **Cookie scoping** | `SetCookiePath` and `SetCookieDomain` let you restrict cookie visibility in multi-app deployments. `SameSite=Lax` is set by default. | +| **Capacity enforcement** | The `nowServing` window guard prevents the serving window from inflating beyond `cap` even under adversarial client disconnection patterns. | --- -## API +## API reference ```go -// Simple path. -room.NewWaitingRoom(cap int32) gin.HandlerFunc +// ── Construction ────────────────────────────────────────── -// Full control path. +// Simple — one line, panics on invalid cap. +room.NewWaitingRoom(r *gin.Engine, cap int32) gin.HandlerFunc + +// Full control — error handling, lifecycle management. +wr := &room.WaitingRoom{} wr.Init(cap int32) error wr.Stop() -wr.Middleware() gin.HandlerFunc -wr.RegisterRoutes(r *gin.Engine) -wr.StatusHandler() gin.HandlerFunc -wr.SetHTML(html []byte) + +// ── Routing ─────────────────────────────────────────────── + +wr.RegisterRoutes(r *gin.Engine) // recommended: registers status + middleware +wr.Middleware() gin.HandlerFunc // manual: just the middleware +wr.StatusHandler() gin.HandlerFunc // manual: just the status endpoint + +// ── Configuration (safe to call at any time) ────────────── + wr.SetCap(cap int32) error +wr.SetHTML(html []byte) wr.SetReaperInterval(d time.Duration) error +wr.SetSecureCookie(secure bool) +wr.SetMaxQueueDepth(max int64) error +wr.SetCookiePath(path string) +wr.SetCookieDomain(domain string) + +// ── Introspection ───────────────────────────────────────── + wr.Cap() int32 wr.Len() int wr.QueueDepth() int64 wr.Utilization() float64 wr.UtilizationSmoothed() float64 wr.ReaperInterval() time.Duration +wr.MaxQueueDepth() int64 +wr.CookiePath() string +wr.CookieDomain() string + +// ── Lifecycle callbacks ─────────────────────────────────── + +wr.On(event room.Event, fn room.CallbackFunc) +wr.Off(event room.Event) +``` + +--- + +## Testing + +The test suite includes unit tests, race-detector tests, fuzz tests, and +benchmarks: + +```bash +make all # vet, test, race, fuzz (30s), bench +``` + +``` +BenchmarkFastPath-28 429842 2751 ns/op 5318 B/op 13 allocs/op +BenchmarkQueueDepth-28 1000000000 0.64 ns/op 0 B/op 0 allocs/op +BenchmarkUtilization-28 1000000000 0.88 ns/op 0 B/op 0 allocs/op +``` + +The fast path (request admitted immediately) completes in under 3μs +including cookie handling and semaphore acquisition. `QueueDepth` and +`UtilizationSmoothed` are sub-nanosecond — safe to call from hot +dashboards and autoscaler feedback loops. + +--- + +## Sample app + +The [`sample/basic-web-app`](sample/basic-web-app/) directory contains a +complete tutorial that walks through every feature: + +```bash +cd sample/basic-web-app +bash test.sh # automated load test with live dashboard ``` +Or run manually: + +```bash +cd sample/basic-web-app +go run . # Terminal 1: starts server on :8080 +open http://localhost:8080 # Browser: see the waiting room live +ab -c 100 -n 1000 localhost:8080/about # Terminal 2: generate load +``` + +The tutorial covers capacity tuning, lifecycle callbacks, log filtering, +runtime capacity adjustment, custom HTML, and common mistakes. + +\[ [Read the full tutorial →](sample/basic-web-app/README.md) \] + --- ## License -Apache 2.0 © [Andrei Merlescu](https://github.com/andreimerlescu) +Apache 2.0 — see [LICENSE](LICENSE). --- -*Built on [sema](https://github.com/andreimerlescu/sema). FIFO ordering, -live position tracking, and a reaper that keeps ghost tickets from stalling -your queue.* +*Built on [sema](https://github.com/andreimerlescu/sema) by +[Andrei Merlescu](https://github.com/andreimerlescu). FIFO ordering, live +position tracking, edge-triggered lifecycle callbacks, a reaper that keeps +ghost tickets from stalling your queue, and a circuit breaker that protects +your memory when the queue gets too deep.* \ No newline at end of file diff --git a/VERSION b/VERSION index 3eefcb9..afaf360 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0 +1.0.0 \ No newline at end of file diff --git a/callback.go b/callback.go new file mode 100644 index 0000000..b22dbe8 --- /dev/null +++ b/callback.go @@ -0,0 +1,167 @@ +package room + +import "sync" + +// Event describes a lifecycle moment in the WaitingRoom's operation. +// Handlers are registered via On and fired asynchronously — each in its +// own goroutine — so that slow callbacks never stall the middleware hot path. +// +// Related: WaitingRoom.On, WaitingRoom.Off, WaitingRoom.emit +type Event uint8 + +const ( + // EventEnter fires each time a request acquires a semaphore slot and + // is admitted into active service. Use this to track throughput or + // update external load-balancer weights. + EventEnter Event = iota + + // EventExit fires each time a request completes and releases its slot. + // Paired with EventEnter it gives you a complete picture of slot lifetime. + EventExit + + // EventFull fires once when the room transitions from below capacity to + // at capacity — i.e. every slot is now occupied. It does NOT fire on + // every admission while full; only on the transition edge. Use this to + // trigger scale-out logic such as provisioning a new host. + EventFull + + // EventDrain fires once when the room transitions from full (all slots + // occupied) back to having at least one free slot. It does NOT fire + // when the room becomes completely empty — only on the full→available + // edge. Use this to signal that scale-in is safe or to re-enable a + // previously throttled upstream. + EventDrain + + // EventQueue fires when an arriving request cannot be admitted immediately + // and is issued a ticket for the waiting room. + EventQueue + + // EventEvict fires when the reaper removes an expired token from the + // token store. The associated ticket is considered abandoned. + EventEvict + + // EventTimeout fires when a queued request's context is cancelled or + // its deadline expires before a slot becomes available. + EventTimeout +) + +// String returns the canonical name of the Event, suitable for logging. +func (e Event) String() string { + switch e { + case EventEnter: + return "Enter" + case EventExit: + return "Exit" + case EventFull: + return "Full" + case EventDrain: + return "Drain" + case EventQueue: + return "Queue" + case EventEvict: + return "Evict" + case EventTimeout: + return "Timeout" + default: + return "Unknown" + } +} + +// Snapshot is a point-in-time view of the WaitingRoom delivered to every +// callback. All fields are copied at trigger time and are safe to read +// after the room's state has changed. +type Snapshot struct { + // Event is the lifecycle event that produced this snapshot. + Event Event + + // Occupancy is the number of semaphore slots in use at the moment of + // the event. + Occupancy int + + // Capacity is the maximum number of concurrent occupants allowed. + Capacity int + + // QueueDepth is the number of requests currently waiting for a slot. + QueueDepth int64 +} + +// Full returns true when Occupancy equals or exceeds Capacity. +func (s Snapshot) Full() bool { return s.Occupancy >= s.Capacity } + +// Empty returns true when no slots are in use. +func (s Snapshot) Empty() bool { return s.Occupancy == 0 } + +// CallbackFunc is the function signature for all WaitingRoom lifecycle +// callbacks. The Snapshot argument is safe to retain beyond the call. +type CallbackFunc func(snap Snapshot) + +// callbackRegistry stores per-Event handler slices. It is embedded in +// WaitingRoom and owns its own RWMutex so that callback registration and +// dispatch never contend with wr.mu, which is held on the request hot path. +type callbackRegistry struct { + mu sync.RWMutex + callbacks map[Event][]CallbackFunc +} + +func newCallbackRegistry() *callbackRegistry { + return &callbackRegistry{ + callbacks: make(map[Event][]CallbackFunc), + } +} + +// On registers fn to be called whenever event fires. Multiple handlers +// may be registered for the same event; all are invoked, each in its own +// goroutine, in registration order. On is safe for concurrent use and may +// be called after the WaitingRoom is running. +// +// Example — scale out when the room is full: +// +// wr.On(room.EventFull, func(s room.Snapshot) { +// log.Printf("room full (%d/%d) — provisioning new host", s.Occupancy, s.Capacity) +// go provisionHost() +// }) +// +// Related: WaitingRoom.Off, WaitingRoom.emit +func (wr *WaitingRoom) On(event Event, fn CallbackFunc) { + wr.callbacks.mu.Lock() + defer wr.callbacks.mu.Unlock() + wr.callbacks.callbacks[event] = append(wr.callbacks.callbacks[event], fn) +} + +// Off removes all handlers registered for event. It is safe for concurrent +// use. Handlers that are already executing are not interrupted. +// +// Related: WaitingRoom.On +func (wr *WaitingRoom) Off(event Event) { + wr.callbacks.mu.Lock() + defer wr.callbacks.mu.Unlock() + delete(wr.callbacks.callbacks, event) +} + +// emit fires all handlers registered for event, each in its own goroutine. +// snap must be constructed immediately before calling emit so that it +// reflects the room's state at the moment the event occurred. +// emit is safe to call with no registered handlers — it is a no-op. +// +// Related: WaitingRoom.On, WaitingRoom.Off +func (wr *WaitingRoom) emit(event Event, snap Snapshot) { + wr.callbacks.mu.RLock() + handlers := make([]CallbackFunc, len(wr.callbacks.callbacks[event])) + copy(handlers, wr.callbacks.callbacks[event]) + wr.callbacks.mu.RUnlock() + + for _, fn := range handlers { + go fn(snap) + } +} + +// snapshot builds a Snapshot from the WaitingRoom's current state. +// Call this immediately before emit to capture the state at event time. +func (wr *WaitingRoom) snapshot(event Event) Snapshot { + return Snapshot{ + Event: event, + Occupancy: wr.Len(), + Capacity: int(wr.Cap()), + QueueDepth: wr.QueueDepth(), + } +} diff --git a/callback_test.go b/callback_test.go new file mode 100644 index 0000000..cf4c5d1 --- /dev/null +++ b/callback_test.go @@ -0,0 +1,430 @@ +package room + +import ( + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" +) + +// ── Event.String ───────────────────────────────────────────────────────────── + +func TestEvent_String(t *testing.T) { + t.Parallel() + cases := []struct { + event Event + want string + }{ + {EventEnter, "Enter"}, + {EventExit, "Exit"}, + {EventFull, "Full"}, + {EventDrain, "Drain"}, + {EventQueue, "Queue"}, + {EventEvict, "Evict"}, + {EventTimeout, "Timeout"}, + {Event(255), "Unknown"}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.want, func(t *testing.T) { + t.Parallel() + if got := tc.event.String(); got != tc.want { + t.Errorf("Event(%d).String() = %q, want %q", tc.event, got, tc.want) + } + }) + } +} + +// ── Snapshot helpers ────────────────────────────────────────────────────────── + +func TestSnapshot_Full(t *testing.T) { + t.Parallel() + s := Snapshot{Event: EventFull, Occupancy: 10, Capacity: 10} + if !s.Full() { + t.Error("expected Full() == true when Occupancy == Capacity") + } + s.Occupancy = 9 + if s.Full() { + t.Error("expected Full() == false when Occupancy < Capacity") + } +} + +func TestSnapshot_Empty(t *testing.T) { + t.Parallel() + s := Snapshot{Event: EventDrain, Occupancy: 0, Capacity: 10} + if !s.Empty() { + t.Error("expected Empty() == true when Occupancy == 0") + } + s.Occupancy = 1 + if s.Empty() { + t.Error("expected Empty() == false when Occupancy > 0") + } +} + +// ── On / emit — basic correctness ──────────────────────────────────────────── + +func TestOn_SingleHandler_Called(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + var called atomic.Int32 + wr.On(EventFull, func(s Snapshot) { called.Add(1) }) + wr.emit(EventFull, wr.snapshot(EventFull)) + + waitForCount(t, &called, 1, 100*time.Millisecond) +} + +func TestOn_MultipleHandlers_AllCalled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + var count atomic.Int32 + for range 5 { + wr.On(EventEnter, func(s Snapshot) { count.Add(1) }) + } + wr.emit(EventEnter, wr.snapshot(EventEnter)) + + waitForCount(t, &count, 5, 100*time.Millisecond) +} + +func TestOn_DifferentEvents_DoNotCross(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + var fullCalled, drainCalled atomic.Int32 + wr.On(EventFull, func(s Snapshot) { fullCalled.Add(1) }) + wr.On(EventDrain, func(s Snapshot) { drainCalled.Add(1) }) + + wr.emit(EventFull, wr.snapshot(EventFull)) + waitForCount(t, &fullCalled, 1, 100*time.Millisecond) + + time.Sleep(20 * time.Millisecond) + if drainCalled.Load() != 0 { + t.Errorf("EventDrain handler fired but EventDrain was never emitted") + } +} + +// ── Off ─────────────────────────────────────────────────────────────────────── + +func TestOff_RemovesAllHandlers(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + var called atomic.Int32 + wr.On(EventEvict, func(s Snapshot) { called.Add(1) }) + wr.On(EventEvict, func(s Snapshot) { called.Add(1) }) + wr.Off(EventEvict) + wr.emit(EventEvict, wr.snapshot(EventEvict)) + + time.Sleep(30 * time.Millisecond) + if called.Load() != 0 { + t.Errorf("expected 0 calls after Off, got %d", called.Load()) + } +} + +func TestOff_DoesNotAffectOtherEvents(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + var timeoutCalled, exitCalled atomic.Int32 + wr.On(EventTimeout, func(s Snapshot) { timeoutCalled.Add(1) }) + wr.On(EventExit, func(s Snapshot) { exitCalled.Add(1) }) + + wr.Off(EventTimeout) + wr.emit(EventTimeout, wr.snapshot(EventTimeout)) + wr.emit(EventExit, wr.snapshot(EventExit)) + + waitForCount(t, &exitCalled, 1, 100*time.Millisecond) + time.Sleep(20 * time.Millisecond) + + if timeoutCalled.Load() != 0 { + t.Errorf("EventTimeout handler fired after Off, got %d calls", timeoutCalled.Load()) + } +} + +// ── emit with no handlers ───────────────────────────────────────────────────── + +func TestEmit_NoHandlers_IsNoop(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + // must not panic + wr.emit(EventDrain, wr.snapshot(EventDrain)) +} + +// ── Snapshot payload correctness ────────────────────────────────────────────── + +func TestEmit_SnapshotDeliveredCorrectly(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + + got := make(chan Snapshot, 1) + wr.On(EventQueue, func(s Snapshot) { got <- s }) + + want := Snapshot{Event: EventQueue, Occupancy: 3, Capacity: 10, QueueDepth: 2} + wr.emit(EventQueue, want) + + select { + case s := <-got: + if s != want { + t.Errorf("snapshot mismatch:\n got %+v\n want %+v", s, want) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for callback") + } +} + +// ── snapshot() builds from live WaitingRoom state ──────────────────────────── + +func TestSnapshot_ReflectsLiveState(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + + s := wr.snapshot(EventEnter) + if s.Capacity != 10 { + t.Errorf("expected Capacity 10, got %d", s.Capacity) + } + if s.Event != EventEnter { + t.Errorf("expected Event EventEnter, got %s", s.Event) + } + if s.Occupancy < 0 { + t.Errorf("Occupancy should not be negative, got %d", s.Occupancy) + } + if s.QueueDepth < 0 { + t.Errorf("QueueDepth should not be negative, got %d", s.QueueDepth) + } +} + +// ── Concurrency safety ──────────────────────────────────────────────────────── + +func TestConcurrent_OnAndEmit(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + var wg sync.WaitGroup + for range 20 { + wg.Add(1) + go func() { + defer wg.Done() + wr.On(EventEnter, func(s Snapshot) {}) + }() + } + for range 20 { + wg.Add(1) + go func() { + defer wg.Done() + wr.emit(EventEnter, wr.snapshot(EventEnter)) + }() + } + wg.Wait() +} + +func TestConcurrent_OffAndEmit(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.On(EventFull, func(s Snapshot) {}) + + var wg sync.WaitGroup + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + wr.Off(EventFull) + }() + } + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + wr.emit(EventFull, wr.snapshot(EventFull)) + }() + } + wg.Wait() +} + +func TestConcurrent_OnOffEmit_AllEvents(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + events := []Event{EventEnter, EventExit, EventFull, EventDrain, EventQueue, EventEvict, EventTimeout} + var wg sync.WaitGroup + for _, ev := range events { + ev := ev + wg.Add(3) + go func() { defer wg.Done(); wr.On(ev, func(s Snapshot) {}) }() + go func() { defer wg.Done(); wr.emit(ev, wr.snapshot(ev)) }() + go func() { defer wg.Done(); wr.Off(ev) }() + } + wg.Wait() +} + +// ── Integration: EventFull fires on transition, not every admission ────────── + +func TestIntegration_EventFull_FiredWhenRoomFull(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + + var fullCount atomic.Int32 + wr.On(EventFull, func(s Snapshot) { fullCount.Add(1) }) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest("GET", "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + waitForCount(t, &fullCount, 1, 200*time.Millisecond) + close(release) +} + +// TestIntegration_EventFull_DoesNotFireRepeatedly verifies that EventFull +// fires only on the non-full→full transition, not on every admission while +// the room is already at capacity. +func TestIntegration_EventFull_DoesNotFireRepeatedly(t *testing.T) { + t.Parallel() + const cap = 2 + wr := newTestWR(t, int32(cap)) + + var fullCount atomic.Int32 + wr.On(EventFull, func(s Snapshot) { fullCount.Add(1) }) + + serving := make(chan struct{}, cap) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + // Fill both slots. + for i := 0; i < cap; i++ { + go func() { + req := httptest.NewRequest("GET", "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + } + for i := 0; i < cap; i++ { + select { + case <-serving: + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for handler %d", i) + } + } + + // EventFull should have fired exactly once (on the transition to full). + waitForCount(t, &fullCount, 1, 200*time.Millisecond) + time.Sleep(50 * time.Millisecond) + + if got := fullCount.Load(); got != 1 { + t.Errorf("expected EventFull to fire exactly 1 time, got %d", got) + } + + close(release) +} + +// ── Integration: EventDrain fires on full→not-full transition ───────────────── + +func TestIntegration_EventDrain_FiredAfterRelease(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + + var drainCount atomic.Int32 + wr.On(EventDrain, func(s Snapshot) { drainCount.Add(1) }) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest("GET", "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + close(release) + + waitForCount(t, &drainCount, 1, 200*time.Millisecond) +} + +// TestIntegration_EventDrain_OnlyFiresOnTransition verifies that EventDrain +// fires only on the full→not-full transition, not when the room goes from +// partially occupied to empty. +func TestIntegration_EventDrain_OnlyFiresOnTransition(t *testing.T) { + t.Parallel() + const cap = 3 + wr := newTestWR(t, int32(cap)) + + var drainCount atomic.Int32 + wr.On(EventDrain, func(s Snapshot) { drainCount.Add(1) }) + + serving := make(chan struct{}, cap) + gates := make([]chan struct{}, cap) + for i := range gates { + gates[i] = make(chan struct{}) + } + + r := newTestRouter(wr, serving, nil) + // Override the handler to use individual gates. + r = newTestRouterWithGates(wr, serving, gates) + + // Fill all 3 slots. + for i := 0; i < cap; i++ { + go func() { + req := httptest.NewRequest("GET", "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + } + for i := 0; i < cap; i++ { + select { + case <-serving: + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for handler %d", i) + } + } + + // Release first slot: full→not-full. EventDrain should fire. + close(gates[0]) + waitForCount(t, &drainCount, 1, 200*time.Millisecond) + + // Release second slot: not-full→still-not-full. No additional drain. + close(gates[1]) + time.Sleep(50 * time.Millisecond) + + // Release third slot: occupancy→0. Still no additional drain. + close(gates[2]) + time.Sleep(50 * time.Millisecond) + + if got := drainCount.Load(); got != 1 { + t.Errorf("expected EventDrain to fire exactly 1 time, got %d", got) + } +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +// newTestWR builds an initialised WaitingRoom and registers Stop on cleanup. +func newTestWR(t *testing.T, cap int32) *WaitingRoom { + t.Helper() + wr := &WaitingRoom{} + if err := wr.Init(cap); err != nil { + t.Fatalf("Init(%d): %v", cap, err) + } + t.Cleanup(wr.Stop) + return wr +} + +// waitForCount spins until the atomic counter reaches want or deadline passes. +func waitForCount(t *testing.T, counter *atomic.Int32, want int32, deadline time.Duration) { + t.Helper() + timeout := time.After(deadline) + for { + if counter.Load() >= want { + return + } + select { + case <-timeout: + t.Errorf("timed out: counter = %d, want %d", counter.Load(), want) + return + default: + time.Sleep(5 * time.Millisecond) + } + } +} diff --git a/const.go b/const.go index 8d41e5d..4da4189 100644 --- a/const.go +++ b/const.go @@ -24,6 +24,26 @@ const ( // reaperMaxInterval is the maximum value accepted by SetReaperInterval. reaperMaxInterval = 24 * time.Hour - // reaperBatchSize is the maximum tokens evicted per reap pass. + // reaperBatchSize is the maximum tokens evicted per single scan pass + // within a reap cycle. The reaper loops until a scan evicts fewer than + // this many, so all expired tokens are cleared in a single reap() call + // regardless of total volume. reaperBatchSize = 1000 + + // secureCookieDefault is the default value for the Secure cookie flag. + // Set to false so that plain-HTTP local development works out of the box. + // Production deployments behind TLS or a TLS-terminating proxy should + // call SetSecureCookie(true) or rely on SetSecureCookieFromRequest. + secureCookieDefault = false + + // defaultMaxQueueDepth is the default maximum number of requests that + // may be waiting in the queue simultaneously. Zero means unlimited + // (no cap on queue depth). When non-zero, requests arriving after the + // queue is full receive a 503 immediately. + defaultMaxQueueDepth int64 = 0 + + // statusPollMinInterval is the minimum time between successive + // /queue/status polls for a single token. Polls arriving faster + // than this receive a cached response with a Retry-After header. + statusPollMinInterval = 1 * time.Second ) diff --git a/errors.go b/errors.go index 67bf956..1e4fbd7 100644 --- a/errors.go +++ b/errors.go @@ -5,7 +5,6 @@ import ( "time" ) - // ErrReaperInterval is returned by SetReaperInterval when the provided // duration falls outside [reaperMinInterval, reaperMaxInterval]. type ErrReaperInterval struct { @@ -37,3 +36,13 @@ type ErrNotInitialised struct{} func (e ErrNotInitialised) Error() string { return "room: WaitingRoom not initialised — call Init before use" } + +// ErrInvalidMaxQueueDepth is returned by SetMaxQueueDepth when the +// value is negative. +type ErrInvalidMaxQueueDepth struct { + Given int64 +} + +func (e ErrInvalidMaxQueueDepth) Error() string { + return fmt.Sprintf("room: invalid max queue depth %d: must be >= 0", e.Given) +} diff --git a/go.mod b/go.mod index 1a7c2da..71fe38e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/andreimerlescu/room -go 1.21 +go 1.22 require ( github.com/andreimerlescu/sema v1.1.0 diff --git a/new.go b/new.go index 66dd52b..6d41f9d 100644 --- a/new.go +++ b/new.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/http" - "sync" "github.com/andreimerlescu/sema" "github.com/gin-gonic/gin" @@ -17,15 +16,21 @@ import ( // cap is the maximum number of requests actively served at any moment. // Any value between 1 and math.MaxInt32 is valid. // -// Usage: +// # Goroutine lifecycle // -// r := gin.Default() -// r.Use(room.NewWaitingRoom(500)) +// NewWaitingRoom starts a background reaper goroutine whose lifetime is tied +// to the process. The caller has no reference to the underlying WaitingRoom +// and therefore cannot call Stop(). This is acceptable for long-lived server +// processes where the goroutine is intentional and the process exit cleans up. +// For tests, embedded servers, or any scenario requiring explicit shutdown, +// construct a WaitingRoom manually and call Stop() via defer: // -// For access to SetCap, SetHTML, SetReaperInterval, or StatusHandler after -// initialisation, use NewWaitingRoomFromStruct instead. +// wr := &room.WaitingRoom{} +// wr.Init(500) +// defer wr.Stop() +// wr.RegisterRoutes(r) // -// Related: NewWaitingRoomFromStruct, WaitingRoom.Middleware +// Related: WaitingRoom.RegisterRoutes, WaitingRoom.Middleware func NewWaitingRoom(r *gin.Engine, cap int32) gin.HandlerFunc { wr := &WaitingRoom{} if err := wr.Init(cap); err != nil { @@ -60,6 +65,10 @@ func NewWaitingRoomFromStruct(wr *WaitingRoom) gin.HandlerFunc { // background reaper. It must be called before Middleware or RegisterRoutes // when constructing a WaitingRoom manually. // +// Init is not safe for concurrent use. Call it once during setup before +// any goroutines start serving traffic. For runtime capacity changes use +// SetCap; for runtime reaper changes use SetReaperInterval. +// // Returns ErrInvalidCap if cap < 1. // // Related: WaitingRoom.Stop, WaitingRoom.SetCap @@ -74,13 +83,17 @@ func (wr *WaitingRoom) Init(cap int32) error { wr.cap.Store(cap) wr.sem = sema.Must(int(cap)) - wr.cond = sync.NewCond(&wr.mu) wr.tokens = newTokenStore() wr.reaperRestart = make(chan struct{}, 1) wr.nowServing.Store(0) wr.nextTicket.Store(0) wr.reaperInterval.Store(int64(reaperInterval)) + wr.secureCookie.Store(secureCookieDefault) + wr.maxQueueDepth.Store(defaultMaxQueueDepth) + wr.cookiePath.Store("/") + wr.cookieDomain.Store("") wr.initialised.Store(true) + wr.callbacks = newCallbackRegistry() ctx, cancel := context.WithCancel(context.Background()) wr.stopReaper = cancel @@ -100,6 +113,72 @@ func (wr *WaitingRoom) Stop() { } } +// SetSecureCookie controls whether the waiting-room session cookie is +// issued with the Secure flag. The default is false so that plain-HTTP +// local development works without configuration. +// +// Call SetSecureCookie(true) in any production deployment served over +// HTTPS — either directly or via a TLS-terminating proxy (Cloudflare, +// nginx, AWS ALB, etc.) where c.Request.TLS may be nil even though the +// end-user connection is encrypted. +// +// Safe to call at any time before or after traffic starts. +func (wr *WaitingRoom) SetSecureCookie(secure bool) { + wr.secureCookie.Store(secure) +} + +// SetMaxQueueDepth sets the maximum number of requests that may wait in the +// queue simultaneously. When the queue is at this depth, new arrivals +// receive a 503 Service Unavailable immediately instead of being queued. +// +// A value of 0 disables the limit (unlimited queue depth). This is the +// default. Negative values return ErrInvalidMaxQueueDepth. +// +// Safe to call at any time including while requests are in flight. +func (wr *WaitingRoom) SetMaxQueueDepth(max int64) error { + if max < 0 { + return ErrInvalidMaxQueueDepth{Given: max} + } + wr.maxQueueDepth.Store(max) + return nil +} + +// MaxQueueDepth returns the current maximum queue depth. Zero means unlimited. +func (wr *WaitingRoom) MaxQueueDepth() int64 { + return wr.maxQueueDepth.Load() +} + +// SetCookiePath sets the Path attribute of the waiting-room session cookie. +// The default is "/". Use this to scope the cookie to a specific route +// prefix in multi-app deployments sharing a domain. +// +// Safe to call at any time. +func (wr *WaitingRoom) SetCookiePath(path string) { + if path == "" { + path = "/" + } + wr.cookiePath.Store(path) +} + +// CookiePath returns the current cookie Path setting. +func (wr *WaitingRoom) CookiePath() string { + return wr.cookiePath.Load().(string) +} + +// SetCookieDomain sets the Domain attribute of the waiting-room session +// cookie. The default is empty (browser uses the request host). Set this +// to restrict or expand cookie scope in multi-subdomain deployments. +// +// Safe to call at any time. +func (wr *WaitingRoom) SetCookieDomain(domain string) { + wr.cookieDomain.Store(domain) +} + +// CookieDomain returns the current cookie Domain setting. +func (wr *WaitingRoom) CookieDomain() string { + return wr.cookieDomain.Load().(string) +} + // checkInitialised aborts the request with 500 and returns false if the // WaitingRoom has not been initialised. Prevents nil pointer dereferences // on zero-value WaitingRoom structs. diff --git a/reaper.go b/reaper.go index 81209fa..bac96e7 100644 --- a/reaper.go +++ b/reaper.go @@ -76,27 +76,45 @@ func (wr *WaitingRoom) startReaper(ctx context.Context) { }() } -// reap performs a single eviction pass over the token store. -// Expired tokens are collected under the token store read lock, then -// deleted under the token store write lock. nowServing is advanced and -// cond is broadcast under wr.mu so waiters cannot miss the wakeup. +// reap performs a full eviction cycle over the token store. It loops over +// batch-sized scans until all expired tokens have been removed. // -// NOTE: evicted tickets may not be contiguous. Advancing nowServing -// by the total eviction count can admit later tickets slightly out -// of strict FIFO order when ghost tickets are non-adjacent. This is -// an accepted trade-off documented in the WaitingRoom type comment; -// a gap-tracking structure could tighten this in a future release. +// Only tokens whose ticket number is OUTSIDE the current serving window +// (i.e. ticket > nowServing + cap) are counted toward nowServing advances. +// Tickets inside the serving window already have an allocated semaphore +// slot conceptually; advancing nowServing for them would inflate the window +// beyond the configured capacity and allow more concurrent requests than cap. +// +// Because active pollers have their issuedAt refreshed on each +// /queue/status call, only genuinely abandoned (ghost) clients will be +// reaped under normal operation. // // Related: WaitingRoom.startReaper, WaitingRoom.SetReaperInterval func (wr *WaitingRoom) reap() { + for { + evictedCount := wr.reapBatch() + if evictedCount < reaperBatchSize { + return + } + } +} + +// reapBatch performs a single bounded eviction pass. It returns the number +// of tokens that were expired in the scan phase (before double-check). +// The caller uses this to decide whether another pass is needed. +func (wr *WaitingRoom) reapBatch() int { now := time.Now() // Collect expired tokens under token store read lock. wr.tokens.mu.RLock() - expired := make([]string, 0, min(len(wr.tokens.entries), reaperBatchSize)) + type expiredEntry struct { + token string + ticket int64 + } + expired := make([]expiredEntry, 0, min(len(wr.tokens.entries), reaperBatchSize)) for token, entry := range wr.tokens.entries { if now.Sub(entry.issuedAt) > cookieTTL { - expired = append(expired, token) + expired = append(expired, expiredEntry{token: token, ticket: entry.ticket}) } if len(expired) >= reaperBatchSize { break @@ -105,34 +123,46 @@ func (wr *WaitingRoom) reap() { wr.tokens.mu.RUnlock() if len(expired) == 0 { - return + return 0 } + scanned := len(expired) + // Evict under token store write lock with double-check. - // Count evictions separately so we only take wr.mu once. + // Count only tickets that were genuinely blocking the queue + // (outside the serving window) so we don't inflate nowServing + // beyond the configured capacity. + nowServing := wr.nowServing.Load() + cap := int64(wr.cap.Load()) + var evicted int64 wr.tokens.mu.Lock() - for _, token := range expired { - if entry, ok := wr.tokens.entries[token]; ok { - // Re-check under write lock: the entry may have been - // deleted or updated between the read-lock scan and now. + for _, e := range expired { + if entry, ok := wr.tokens.entries[e.token]; ok { + // Re-check expiry under write lock to close the TOCTOU + // window between the read-lock scan and now. if now.Sub(entry.issuedAt) > cookieTTL { - delete(wr.tokens.entries, token) - evicted++ + delete(wr.tokens.entries, e.token) + // Only advance nowServing for tickets that were outside + // the serving window. Tickets inside the window already + // consumed a semaphore slot allocation; advancing for + // them would double-count capacity. + if entry.ticket > nowServing+cap { + evicted++ + } } } } wr.tokens.mu.Unlock() - if evicted == 0 { - return + if evicted > 0 { + // Advance nowServing atomically. No mutex or broadcast needed: + // admission is poll-driven. The next /queue/status poll from a + // waiting client will see the updated nowServing and return + // ready=true if their ticket is now within the window. + wr.nowServing.Add(evicted) + wr.emit(EventEvict, wr.snapshot(EventEvict)) } - // Advance nowServing and wake waiters under wr.mu so no wakeup - // can be missed between a waiter checking the condition and - // calling cond.Wait(). - wr.mu.Lock() - wr.nowServing.Add(evicted) - wr.cond.Broadcast() - wr.mu.Unlock() + return scanned } diff --git a/reaper_test.go b/reaper_test.go new file mode 100644 index 0000000..87ac39f --- /dev/null +++ b/reaper_test.go @@ -0,0 +1,436 @@ +package room + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +// ── reap() — basic eviction ────────────────────────────────────────────────── + +func TestReap_EmptyTokenStore_IsNoop(t *testing.T) { + wr := newTestWR(t, 5) + before := wr.nowServing.Load() + wr.reap() + if wr.nowServing.Load() != before { + t.Error("nowServing changed on empty token store") + } +} + +func TestReap_AllLive_NoneEvicted(t *testing.T) { + wr := newTestWR(t, 5) + + for i := 0; i < 10; i++ { + wr.tokens.set(fmt.Sprintf("tok-%d", i), ticketEntry{ + ticket: int64(i + 1), + issuedAt: time.Now(), + }) + } + + wr.reap() + + if wr.tokens.len() != 10 { + t.Errorf("expected 10 live tokens, got %d", wr.tokens.len()) + } +} + +func TestReap_AllExpired_AllEvicted(t *testing.T) { + wr := newTestWR(t, 5) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + for i := 0; i < 10; i++ { + wr.tokens.set(fmt.Sprintf("tok-%d", i), ticketEntry{ + ticket: int64(100 + i), // all outside window (cap=5, nowServing=0) + issuedAt: expired, + }) + } + + wr.reap() + + if wr.tokens.len() != 0 { + t.Errorf("expected 0 tokens after reap, got %d", wr.tokens.len()) + } +} + +func TestReap_MixedExpiredAndLive(t *testing.T) { + wr := newTestWR(t, 5) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + wr.tokens.set("live-1", ticketEntry{ticket: 1, issuedAt: time.Now()}) + wr.tokens.set("live-2", ticketEntry{ticket: 2, issuedAt: time.Now()}) + wr.tokens.set("ghost-1", ticketEntry{ticket: 100, issuedAt: expired}) + wr.tokens.set("ghost-2", ticketEntry{ticket: 101, issuedAt: expired}) + + wr.reap() + + if _, ok := wr.tokens.get("live-1"); !ok { + t.Error("live-1 was evicted") + } + if _, ok := wr.tokens.get("live-2"); !ok { + t.Error("live-2 was evicted") + } + if _, ok := wr.tokens.get("ghost-1"); ok { + t.Error("ghost-1 should have been evicted") + } + if _, ok := wr.tokens.get("ghost-2"); ok { + t.Error("ghost-2 should have been evicted") + } +} + +// ── reap() — nowServing advancement ────────────────────────────────────────── + +func TestReap_AdvancesNowServingOnlyForOutOfWindowTickets(t *testing.T) { + // cap=2, nowServing=0 → window is tickets [1..2]. + wr := newTestWR(t, 2) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + // Inside window — must NOT advance nowServing. + wr.tokens.set("inside", ticketEntry{ticket: 1, issuedAt: expired}) + // Outside window — must advance nowServing. + wr.tokens.set("outside-1", ticketEntry{ticket: 10, issuedAt: expired}) + wr.tokens.set("outside-2", ticketEntry{ticket: 20, issuedAt: expired}) + + before := wr.nowServing.Load() + wr.reap() + + // 2 out-of-window tickets evicted → nowServing should advance by 2. + expected := before + 2 + if got := wr.nowServing.Load(); got != expected { + t.Errorf("expected nowServing=%d, got %d", expected, got) + } +} + +func TestReap_DoesNotAdvanceNowServingForWindowTickets(t *testing.T) { + // cap=10, nowServing=0 → window is tickets [1..10]. + wr := newTestWR(t, 10) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + for i := int64(1); i <= 5; i++ { + wr.tokens.set(fmt.Sprintf("win-%d", i), ticketEntry{ + ticket: i, + issuedAt: expired, + }) + } + + before := wr.nowServing.Load() + wr.reap() + + if wr.nowServing.Load() != before { + t.Errorf("nowServing advanced for within-window tickets: before=%d after=%d", + before, wr.nowServing.Load()) + } + // Tokens should still be evicted even if nowServing doesn't advance. + if wr.tokens.len() != 0 { + t.Errorf("expected all tokens evicted, got %d remaining", wr.tokens.len()) + } +} + +// ── reap() — multi-batch looping ───────────────────────────────────────────── + +func TestReap_ClearsMoreThanOneBatch(t *testing.T) { + wr := newTestWR(t, 1) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + // Insert more than reaperBatchSize expired tokens. + total := reaperBatchSize + 500 + for i := 0; i < total; i++ { + wr.tokens.set(fmt.Sprintf("ghost-%d", i), ticketEntry{ + ticket: int64(100 + i), // all outside window + issuedAt: expired, + }) + } + + if wr.tokens.len() != total { + t.Fatalf("setup: expected %d tokens, got %d", total, wr.tokens.len()) + } + + wr.reap() + + if remaining := wr.tokens.len(); remaining != 0 { + t.Errorf("expected 0 tokens after reap, got %d (batch looping may be broken)", remaining) + } +} + +// ── reap() — EventEvict callback ───────────────────────────────────────────── + +func TestReap_FiresEventEvict(t *testing.T) { + wr := newTestWR(t, 5) + + var evictCount atomic.Int32 + wr.On(EventEvict, func(s Snapshot) { evictCount.Add(1) }) + + expired := time.Now().Add(-(cookieTTL + time.Minute)) + wr.tokens.set("ghost", ticketEntry{ticket: 100, issuedAt: expired}) + + wr.reap() + + // Wait for async callback. + deadline := time.After(200 * time.Millisecond) + for { + if evictCount.Load() >= 1 { + break + } + select { + case <-deadline: + t.Error("EventEvict not fired after reap eviction") + return + default: + time.Sleep(5 * time.Millisecond) + } + } +} + +func TestReap_DoesNotFireEventEvictWhenNothingExpired(t *testing.T) { + wr := newTestWR(t, 5) + + var evictCount atomic.Int32 + wr.On(EventEvict, func(s Snapshot) { evictCount.Add(1) }) + + wr.tokens.set("live", ticketEntry{ticket: 1, issuedAt: time.Now()}) + wr.reap() + + time.Sleep(50 * time.Millisecond) + if evictCount.Load() != 0 { + t.Error("EventEvict fired when no tokens were expired") + } +} + +func TestReap_DoesNotFireEventEvictForWindowOnlyEvictions(t *testing.T) { + // When only within-window tokens are evicted, nowServing doesn't + // advance, so EventEvict should not fire (evicted == 0 in the code). + wr := newTestWR(t, 10) + + var evictCount atomic.Int32 + wr.On(EventEvict, func(s Snapshot) { evictCount.Add(1) }) + + expired := time.Now().Add(-(cookieTTL + time.Minute)) + wr.tokens.set("win-ghost", ticketEntry{ticket: 1, issuedAt: expired}) + + wr.reap() + + time.Sleep(50 * time.Millisecond) + if evictCount.Load() != 0 { + t.Errorf("EventEvict fired for within-window eviction (no queue advancement), got %d", evictCount.Load()) + } +} + +// ── reapBatch() — TOCTOU double-check ──────────────────────────────────────── + +func TestReapBatch_DoubleCheckPreventsRaceEviction(t *testing.T) { + // Simulate a token that was expired during the read-lock scan but + // was refreshed (touchIssuedAt) before the write-lock eviction. + wr := newTestWR(t, 5) + + almostExpired := time.Now().Add(-(cookieTTL - 10*time.Millisecond)) + wr.tokens.set("borderline", ticketEntry{ + ticket: 100, + issuedAt: almostExpired, + }) + + // Sleep until the token has just crossed the TTL boundary. + time.Sleep(15 * time.Millisecond) + + // Simulate the client refreshing the token right before eviction. + wr.tokens.touchIssuedAt("borderline") + + // Now reap — the token should survive because touchIssuedAt refreshed it. + wr.reap() + + if _, ok := wr.tokens.get("borderline"); !ok { + t.Error("borderline token should have survived reap after touchIssuedAt refresh") + } +} + +// ── SetReaperInterval — reaper restarts with new interval ──────────────────── + +func TestSetReaperInterval_RestartSignalSent(t *testing.T) { + wr := newTestWR(t, 5) + + // Drain any pending signal from Init. + select { + case <-wr.reaperRestart: + default: + } + + if err := wr.SetReaperInterval(10 * time.Second); err != nil { + t.Fatal(err) + } + + select { + case <-wr.reaperRestart: + // Signal received — correct. + default: + // No signal but that's okay if one was already pending. + // The important thing is the interval was stored. + } + + if wr.ReaperInterval() != 10*time.Second { + t.Errorf("expected 10s, got %s", wr.ReaperInterval()) + } +} + +func TestSetReaperInterval_DuplicateSignalDoesNotBlock(t *testing.T) { + wr := newTestWR(t, 5) + + // Fill the channel. + select { + case wr.reaperRestart <- struct{}{}: + default: + } + + // This must not block even though the channel is full. + done := make(chan struct{}) + go func() { + wr.SetReaperInterval(15 * time.Second) + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("SetReaperInterval blocked on full reaperRestart channel") + } +} + +// ── Concurrency: reap under concurrent token mutations ─────────────────────── + +func TestReap_ConcurrentWithTokenStoreWrites(t *testing.T) { + wr := newTestWR(t, 5) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + // Pre-populate some expired tokens. + for i := 0; i < 100; i++ { + wr.tokens.set(fmt.Sprintf("ghost-%d", i), ticketEntry{ + ticket: int64(100 + i), + issuedAt: expired, + }) + } + + var wg sync.WaitGroup + + // Concurrent reaps. + for range 5 { + wg.Add(1) + go func() { + defer wg.Done() + wr.reap() + }() + } + + // Concurrent writes. + for i := 0; i < 50; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + wr.tokens.set(fmt.Sprintf("new-%d", i), ticketEntry{ + ticket: int64(1000 + i), + issuedAt: time.Now(), + }) + }() + } + + // Concurrent reads. + for i := 0; i < 50; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + wr.tokens.get(fmt.Sprintf("ghost-%d", i)) + }() + } + + wg.Wait() + + // All ghosts should be evicted, all new tokens should survive. + for i := 0; i < 100; i++ { + if _, ok := wr.tokens.get(fmt.Sprintf("ghost-%d", i)); ok { + t.Errorf("ghost-%d should have been evicted", i) + } + } + for i := 0; i < 50; i++ { + if _, ok := wr.tokens.get(fmt.Sprintf("new-%d", i)); !ok { + t.Errorf("new-%d should still exist", i) + } + } +} + +// ── startReaper — shutdown via context cancellation ────────────────────────── + +func TestStartReaper_StopsOnContextCancel(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(5); err != nil { + t.Fatal(err) + } + + // Stop should not block or panic. + wr.Stop() + + // After Stop, the reaper should not be running. Verify by checking + // that we can re-init without issues. + if err := wr.Init(10); err != nil { + t.Fatalf("re-Init after Stop failed: %v", err) + } + wr.Stop() +} + +// ── tokenStore.len() ───────────────────────────────────────────────────────── + +func TestTokenStore_Len(t *testing.T) { + ts := newTokenStore() + + if ts.len() != 0 { + t.Errorf("expected len 0, got %d", ts.len()) + } + + ts.set("a", ticketEntry{ticket: 1, issuedAt: time.Now()}) + ts.set("b", ticketEntry{ticket: 2, issuedAt: time.Now()}) + + if ts.len() != 2 { + t.Errorf("expected len 2, got %d", ts.len()) + } + + ts.delete("a") + + if ts.len() != 1 { + t.Errorf("expected len 1, got %d", ts.len()) + } +} + +// ── tokenStore.touchLastPoll() ─────────────────────────────────────────────── + +func TestTokenStore_TouchLastPoll(t *testing.T) { + ts := newTokenStore() + + // Non-existent token. + if _, ok := ts.touchLastPoll("missing"); ok { + t.Error("expected ok=false for missing token") + } + + ts.set("tok", ticketEntry{ticket: 1, issuedAt: time.Now()}) + + // First touch — previous should be zero. + prev, ok := ts.touchLastPoll("tok") + if !ok { + t.Fatal("expected ok=true") + } + if !prev.IsZero() { + t.Errorf("expected zero time on first touch, got %v", prev) + } + + time.Sleep(5 * time.Millisecond) + + // Second touch — previous should be recent. + prev2, ok := ts.touchLastPoll("tok") + if !ok { + t.Fatal("expected ok=true") + } + if prev2.IsZero() { + t.Error("expected non-zero time on second touch") + } + if time.Since(prev2) > time.Second { + t.Errorf("previous lastPoll too old: %v", prev2) + } +} diff --git a/room.go b/room.go index fb89bc1..af3e7b6 100644 --- a/room.go +++ b/room.go @@ -25,12 +25,22 @@ var defaultWaitingRoomBytes []byte // This design avoids writing two responses to the same ResponseWriter by // never calling c.Next() on a request that was served the waiting room page. // +// # Admission model +// +// Admission is poll-driven: queued clients reload the page after +// /queue/status reports ready=true. There are no server-side goroutines +// blocking on behalf of waiting clients; the Middleware is stateless per +// request beyond the token store lookup. +// // Related: WaitingRoom.RegisterRoutes, WaitingRoom.StatusHandler func (wr *WaitingRoom) Middleware() gin.HandlerFunc { return func(c *gin.Context) { if !wr.checkInitialised(c) { return } + + secure := wr.secureCookie.Load() || c.Request.TLS != nil + // Resume an existing queued position if the client presents a // valid room_ticket cookie. This preserves queue position across // page reloads and polling retries. @@ -38,25 +48,40 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { if entry, ok := wr.tokens.get(cookie.Value); ok { if wr.ticketReady(entry.ticket) { // Client's ticket is now within the serving window. + + // Snapshot occupancy BEFORE acquiring the slot so we + // can detect the non-full→full transition edge. + wasFull := wr.Len() >= int(wr.Cap()) + // Acquire a slot and let them through. if err := wr.sem.AcquireWith(c.Request.Context()); err != nil { // Acquire failed (client disconnected, context - // cancelled). Clean up the dead token and advance - // nowServing so the queue doesn't stall waiting - // for the reaper to evict this ticket. + // cancelled). Clean up the dead token. Only + // advance nowServing if the ticket was outside + // the serving window — tickets inside the window + // already consumed a conceptual slot allocation + // and advancing for them inflates capacity. wr.tokens.delete(cookie.Value) - wr.mu.Lock() - wr.nowServing.Add(1) - wr.cond.Broadcast() - wr.mu.Unlock() + if entry.ticket > wr.nowServing.Load()+int64(wr.cap.Load()) { + wr.nowServing.Add(1) + } + wr.emit(EventTimeout, wr.snapshot(EventTimeout)) c.AbortWithStatus(http.StatusServiceUnavailable) return } wr.tokens.delete(cookie.Value) defer wr.release("") + wr.emit(EventEnter, wr.snapshot(EventEnter)) + if !wasFull && wr.Len() >= int(wr.Cap()) { + wr.emit(EventFull, wr.snapshot(EventFull)) + } c.Next() return } + // Touch the token's issuedAt so active pollers do not + // get reaped during normal operation. + wr.tokens.touchIssuedAt(cookie.Value) + // Still waiting — serve updated position and abort. position := wr.positionOf(entry.ticket) if position < 1 { @@ -69,22 +94,36 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { } } + // Check queue depth limit before issuing a new ticket. + maxDepth := wr.maxQueueDepth.Load() + if maxDepth > 0 && wr.QueueDepth() >= maxDepth { + c.AbortWithStatus(http.StatusServiceUnavailable) + return + } + ticket := wr.nextTicket.Add(1) ctx := c.Request.Context() + // Snapshot occupancy BEFORE acquiring the slot for edge detection. + wasFull := wr.Len() >= int(wr.Cap()) + // Fast path — ticket is within the serving window. if wr.ticketReady(ticket) { if err := wr.sem.AcquireWith(ctx); err != nil { - // Ticket consumed but not served — advance nowServing - // so the gap doesn't stall the queue. - wr.mu.Lock() - wr.nowServing.Add(1) - wr.cond.Broadcast() - wr.mu.Unlock() + // Ticket consumed but not served. Only advance + // nowServing if the ticket was outside the window. + if ticket > wr.nowServing.Load()+int64(wr.cap.Load()) { + wr.nowServing.Add(1) + } + wr.emit(EventTimeout, wr.snapshot(EventTimeout)) c.AbortWithStatus(http.StatusServiceUnavailable) return } defer wr.release("") + wr.emit(EventEnter, wr.snapshot(EventEnter)) + if !wasFull && wr.Len() >= int(wr.Cap()) { + wr.emit(EventFull, wr.snapshot(EventFull)) + } c.Next() return } @@ -93,10 +132,7 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { // abort. The client will poll /queue/status and reload when ready. token, err := generateToken() if err != nil { - wr.mu.Lock() wr.nowServing.Add(1) - wr.cond.Broadcast() - wr.mu.Unlock() c.AbortWithStatus(http.StatusInternalServerError) return } @@ -106,17 +142,20 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { issuedAt: time.Now(), }) + wr.emit(EventQueue, wr.snapshot(EventQueue)) + http.SetCookie(c.Writer, &http.Cookie{ Name: cookieName, Value: token, - Path: "/", + Path: wr.CookiePath(), + Domain: wr.CookieDomain(), MaxAge: int(cookieTTL.Seconds()), HttpOnly: true, - Secure: true, // default to true since proxies like cloudflare can terminate due to c.Request.TLS being nil when served over HTTPS + Secure: secure, SameSite: http.SameSiteLaxMode, }) - position := ticket - (wr.nowServing.Load() + int64(wr.cap.Load())) + position := wr.positionOf(ticket) if position < 1 { position = 1 } @@ -133,17 +172,33 @@ func (wr *WaitingRoom) ticketReady(ticket int64) bool { } // release returns a semaphore slot, optionally removes a session token, -// advances nowServing, and broadcasts to all waiting goroutines. +// advances nowServing, and fires exit/drain lifecycle events. +// +// EventDrain fires on the transition from full to not-full — i.e. when +// the room was at capacity before this release and now has at least one +// free slot. This matches the documented semantics and is useful for +// scale-in decisions. +// +// Note: nowServing is advanced here without holding wr.mu because the +// WaitingRoom uses a poll-driven admission model. There are no goroutines +// performing cond.Wait(); the advance only needs to be atomic, which +// atomic.Int64.Add guarantees. func (wr *WaitingRoom) release(token string) { + // Snapshot BEFORE releasing the slot so we can detect the + // full→not-full transition. + wasFull := wr.Len() >= int(wr.Cap()) + if token != "" { wr.tokens.delete(token) } wr.sem.Release() - - wr.mu.Lock() wr.nowServing.Add(1) - wr.cond.Broadcast() - wr.mu.Unlock() + + snap := wr.snapshot(EventExit) + wr.emit(EventExit, snap) + if wasFull && !snap.Full() { + wr.emit(EventDrain, wr.snapshot(EventDrain)) + } } // resolveHTML returns the HTML bytes to serve. Custom HTML set via SetHTML @@ -179,9 +234,8 @@ func (wr *WaitingRoom) SetHTML(html []byte) { } // SetCap adjusts the number of concurrently active requests at runtime. -// Expanding capacity immediately admits waiting tickets by broadcasting -// to all blocked goroutines so they can recheck ticketReady against the -// new cap value. Shrinking drains in-flight work first. +// Expanding capacity immediately opens new semaphore slots. Shrinking +// drains in-flight work via the underlying sema implementation. // // Returns ErrInvalidCap if cap < 1. // @@ -190,13 +244,13 @@ func (wr *WaitingRoom) SetCap(cap int32) error { if cap < 1 { return ErrInvalidCap{Given: cap} } - wr.mu.Lock() - defer wr.mu.Unlock() + // Delegate entirely to sema which manages its own internal mutex. + // We update wr.cap after the semaphore resize succeeds so that + // ticketReady and positionOf remain consistent with actual capacity. if err := wr.sem.SetCap(int(cap)); err != nil { return err } wr.cap.Store(cap) - wr.cond.Broadcast() return nil } diff --git a/room_test.go b/room_test.go index 7217da3..56e7500 100644 --- a/room_test.go +++ b/room_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "sync" "sync/atomic" "testing" @@ -34,6 +35,26 @@ func newTestRouter(wr *WaitingRoom, serving chan struct{}, release chan struct{} return r } +// newTestRouterWithGates builds a gin engine where each handler blocks on +// its own gate channel. This allows tests to release individual slots in +// a controlled order. +func newTestRouterWithGates(wr *WaitingRoom, serving chan struct{}, gates []chan struct{}) *gin.Engine { + var idx atomic.Int32 + r := gin.New() + wr.RegisterRoutes(r) + r.GET("/", func(c *gin.Context) { + i := int(idx.Add(1)) - 1 + if serving != nil { + serving <- struct{}{} + } + if i < len(gates) { + <-gates[i] + } + c.Status(http.StatusOK) + }) + return r +} + // serveWithCookie performs a GET / with an optional cookie and returns // the recorder and any Set-Cookie header value for cookieName. func serveWithCookie(r *gin.Engine, cookie string) (*httptest.ResponseRecorder, string) { @@ -52,7 +73,9 @@ func serveWithCookie(r *gin.Engine, cookie string) (*httptest.ResponseRecorder, } // pollStatus calls GET /queue/status with the given token cookie and -// returns the decoded statusResponse. +// returns the decoded statusResponse. If the server returns 429 (rate +// limited), the response is treated as not-ready so callers retry after +// respecting the poll interval. func pollStatus(r *gin.Engine, token string) statusResponse { req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) if token != "" { @@ -60,15 +83,38 @@ func pollStatus(r *gin.Engine, token string) statusResponse { } w := httptest.NewRecorder() r.ServeHTTP(w, req) + + // If rate-limited, return a synthetic not-ready response so callers + // back off and retry rather than seeing a decode artifact. + if w.Code == http.StatusTooManyRequests { + return statusResponse{Ready: false} + } + var resp statusResponse json.NewDecoder(w.Body).Decode(&resp) return resp } +// pollStatusRaw calls GET /queue/status and returns the raw recorder +// so callers can inspect status codes and headers. +func pollStatusRaw(r *gin.Engine, token string) *httptest.ResponseRecorder { + req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) + if token != "" { + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + } + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + return w +} + // waitForStatus polls /queue/status until ready=true or deadline passes. +// The poll interval is set to statusPollMinInterval + a small margin so +// that the per-token rate limiter in StatusHandler does not reject polls. func waitForStatus(t *testing.T, r *gin.Engine, token string, deadline time.Duration) { t.Helper() timeout := time.After(deadline) + // Poll at slightly more than the rate limit interval to avoid 429s. + pollInterval := statusPollMinInterval + 50*time.Millisecond for { select { case <-timeout: @@ -77,7 +123,7 @@ func waitForStatus(t *testing.T, r *gin.Engine, token string, deadline time.Dura if pollStatus(r, token).Ready { return } - time.Sleep(10 * time.Millisecond) + time.Sleep(pollInterval) } } } @@ -261,7 +307,7 @@ func TestSlowPath_PositionInjectedInHTML(t *testing.T) { w, _ := serveWithCookie(r, "") body := w.Body.String() // {{.Position}} should have been replaced with a number. - if contains(body, "{{.Position}}") { + if strings.Contains(body, "{{.Position}}") { t.Error("expected {{.Position}} to be replaced in HTML") } @@ -369,11 +415,13 @@ func TestFIFO_RequestsAdmittedInOrder(t *testing.T) { } // Release each slot in sequence and verify the next waiter is admitted. + // The deadline is longer here because each waitForStatus poll sleeps + // for statusPollMinInterval + margin to avoid 429 rate limiting. for i := 0; i < total; i++ { close(gates[i]) if i+1 < total { // Poll until the next token is ready then re-request. - waitForStatus(t, r, tokens[i+1], 2*time.Second) + waitForStatus(t, r, tokens[i+1], 10*time.Second) go func(idx int, tok string) { req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/req/%d", idx), nil) req.AddCookie(&http.Cookie{Name: cookieName, Value: tok}) @@ -469,7 +517,203 @@ func TestStatusEndpoint_ReturnsReadyAfterSlotOpens(t *testing.T) { close(release) // Status should eventually return ready=true. - waitForStatus(t, r, token, 2*time.Second) + // The deadline is generous to accommodate the per-token rate limiter + // which requires ~1s between polls. + waitForStatus(t, r, token, 5*time.Second) +} + +// TestStatusEndpoint_RateLimitRejectsFastPolling verifies that polling +// /queue/status faster than statusPollMinInterval returns 429. +func TestStatusEndpoint_RateLimitRejectsFastPolling(t *testing.T) { + const cap = 1 + wr := &WaitingRoom{} + if err := wr.Init(cap); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + _, token := serveWithCookie(r, "") + if token == "" { + t.Fatal("no token issued") + } + + // First poll — should succeed. + w1 := pollStatusRaw(r, token) + if w1.Code != http.StatusOK { + t.Errorf("first poll: expected 200, got %d", w1.Code) + } + + // Immediate second poll — should be rate limited. + w2 := pollStatusRaw(r, token) + if w2.Code != http.StatusTooManyRequests { + t.Errorf("rapid second poll: expected 429, got %d", w2.Code) + } + if ra := w2.Header().Get("Retry-After"); ra != "1" { + t.Errorf("expected Retry-After: 1, got %q", ra) + } + + close(release) +} + +// ── MaxQueueDepth tests ────────────────────────────────────────────────────── + +func TestMaxQueueDepth_RejectsWhenFull(t *testing.T) { + const cap = 1 + wr := &WaitingRoom{} + if err := wr.Init(cap); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + if err := wr.SetMaxQueueDepth(2); err != nil { + t.Fatal(err) + } + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + // Fill the slot. + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + // Queue 2 requests (the max). + for i := 0; i < 2; i++ { + serveWithCookie(r, "") + } + + // Third queued request should be rejected. + w, _ := serveWithCookie(r, "") + if w.Code != http.StatusServiceUnavailable { + t.Errorf("expected 503 when queue is full, got %d", w.Code) + } + + close(release) +} + +func TestMaxQueueDepth_ZeroMeansUnlimited(t *testing.T) { + const cap = 1 + wr := &WaitingRoom{} + if err := wr.Init(cap); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + // Default is 0 (unlimited). + if wr.MaxQueueDepth() != 0 { + t.Errorf("expected default max queue depth 0, got %d", wr.MaxQueueDepth()) + } + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + // Queue many requests — none should be rejected. + for i := 0; i < 50; i++ { + w, _ := serveWithCookie(r, "") + if w.Code == http.StatusServiceUnavailable { + t.Fatalf("request %d rejected with unlimited queue depth", i) + } + } + + close(release) +} + +func TestMaxQueueDepth_NegativeRejected(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(5); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + err := wr.SetMaxQueueDepth(-1) + if err == nil { + t.Error("expected error for negative max queue depth") + } + if _, ok := err.(ErrInvalidMaxQueueDepth); !ok { + t.Errorf("expected ErrInvalidMaxQueueDepth, got %T", err) + } +} + +// ── Cookie configuration tests ─────────────────────────────────────────────── + +func TestCookiePath_DefaultIsSlash(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(5); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + if wr.CookiePath() != "/" { + t.Errorf("expected default cookie path '/', got %q", wr.CookiePath()) + } +} + +func TestCookiePath_CustomPathUsed(t *testing.T) { + const cap = 1 + wr := &WaitingRoom{} + if err := wr.Init(cap); err != nil { + t.Fatal(err) + } + defer wr.Stop() + wr.SetCookiePath("/app") + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + for _, c := range w.Result().Cookies() { + if c.Name == cookieName { + if c.Path != "/app" { + t.Errorf("expected cookie path '/app', got %q", c.Path) + } + close(release) + return + } + } + // Cookie might not be set if room wasn't full — skip. + close(release) +} + +func TestCookieDomain_DefaultIsEmpty(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(5); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + if wr.CookieDomain() != "" { + t.Errorf("expected default cookie domain '', got %q", wr.CookieDomain()) + } } // ── SetCap tests ───────────────────────────────────────────────────────────── @@ -511,7 +755,7 @@ func TestSetCap_ExpandAdmitsWaiters(t *testing.T) { tok := tok go func() { defer wg.Done() - waitForStatus(t, r, tok, 2*time.Second) + waitForStatus(t, r, tok, 10*time.Second) }() } @@ -523,7 +767,7 @@ func TestSetCap_ExpandAdmitsWaiters(t *testing.T) { select { case <-done: - case <-time.After(3 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("timed out waiting for waiters to become ready after SetCap") } @@ -592,6 +836,8 @@ func TestSetHTML_NilRevertsToDefault(t *testing.T) { } // ── Reaper tests ───────────────────────────────────────────────────────────── +// Basic reaper correctness is tested here. Full reaper coverage is in +// reaper_test.go. func TestReaper_EvictsExpiredTokens(t *testing.T) { wr := &WaitingRoom{} @@ -633,20 +879,52 @@ func TestReaper_PreservesLiveTokens(t *testing.T) { func TestReaper_AdvancesNowServingOnEviction(t *testing.T) { wr := &WaitingRoom{} - if err := wr.Init(5); err != nil { + if err := wr.Init(1); err != nil { t.Fatal(err) } defer wr.Stop() - before := wr.nowServing.Load() + if ns := wr.nowServing.Load(); ns != 0 { + t.Fatalf("expected nowServing=0 initially, got %d", ns) + } + wr.tokens.set("ghost", ticketEntry{ - ticket: 1, + ticket: 10, issuedAt: time.Now().Add(-(cookieTTL + time.Minute)), }) + + before := wr.nowServing.Load() wr.reap() if wr.nowServing.Load() != before+1 { - t.Errorf("expected nowServing to advance by 1 after eviction, got %d", wr.nowServing.Load()) + t.Errorf("expected nowServing to advance by 1 after evicting an out-of-window ghost, got %d (before=%d)", + wr.nowServing.Load(), before) + } +} + +func TestReaper_DoesNotAdvanceNowServingForWindowTicket(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(5); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + wr.tokens.set("window-ghost", ticketEntry{ + ticket: 1, + issuedAt: time.Now().Add(-(cookieTTL + time.Minute)), + }) + + before := wr.nowServing.Load() + wr.reap() + + if _, ok := wr.tokens.get("window-ghost"); ok { + t.Error("expected window-ghost token to be evicted") + } + + if wr.nowServing.Load() != before { + t.Errorf("nowServing advanced for a within-window ghost: before=%d after=%d (cap=5) — "+ + "this would inflate capacity beyond configured limit", + before, wr.nowServing.Load()) } } @@ -697,6 +975,83 @@ func TestSetReaperInterval_InvalidRange(t *testing.T) { } } +// ── SetSecureCookie tests ──────────────────────────────────────────────────── + +func TestSetSecureCookie_DefaultIsFalse(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(1); err != nil { + t.Fatal(err) + } + defer wr.Stop() + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + var found bool + for _, c := range w.Result().Cookies() { + if c.Name == cookieName { + found = true + if c.Secure { + t.Error("expected Secure=false on cookie for plain-HTTP request with default secureCookieDefault=false") + } + } + } + if !found { + t.Skip("no room_ticket cookie issued — room may not have been full; skipping Secure flag check") + } + + close(release) +} + +func TestSetSecureCookie_TrueSetsCookieSecure(t *testing.T) { + wr := &WaitingRoom{} + if err := wr.Init(1); err != nil { + t.Fatal(err) + } + defer wr.Stop() + wr.SetSecureCookie(true) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + var found bool + for _, c := range w.Result().Cookies() { + if c.Name == cookieName { + found = true + if !c.Secure { + t.Error("expected Secure=true on cookie after SetSecureCookie(true)") + } + } + } + if !found { + t.Skip("no room_ticket cookie issued — room may not have been full; skipping Secure flag check") + } + + close(release) +} + // ── Introspection tests ────────────────────────────────────────────────────── func TestQueueDepth_AccurateWhileWaiting(t *testing.T) { @@ -906,19 +1261,3 @@ func FuzzWaitingRoom(f *testing.F) { } }) } - -// ── Helpers ────────────────────────────────────────────────────────────────── - -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || - len(s) > 0 && containsRune(s, substr)) -} - -func containsRune(s, substr string) bool { - for i := range s { - if i+len(substr) <= len(s) && s[i:i+len(substr)] == substr { - return true - } - } - return false -} diff --git a/sample/basic-web-app/README.md b/sample/basic-web-app/README.md new file mode 100644 index 0000000..4993dc5 --- /dev/null +++ b/sample/basic-web-app/README.md @@ -0,0 +1,538 @@ +# basic-web-app — room middleware tutorial + +This sample walks you through adding a FIFO waiting room to a four-page +Gin web application from scratch. By the end you will have a running server +that admits at most N concurrent requests, queues the rest, and admits them +automatically in arrival order — with no client-side refresh required. + +--- + +## Prerequisites + +| Tool | Minimum version | Install | +|---|---|---| +| Go | 1.22 | https://go.dev/dl | +| Apache Bench | any | `apt install apache2-utils` / `brew install httpd` | + +--- + +## Step 1 — Create the module + +```bash +mkdir basic-web-app && cd basic-web-app +go mod init github.com/your-username/basic-web-app +``` + +Fetch the two dependencies the sample uses: + +```bash +go get github.com/andreimerlescu/room +go get github.com/gin-gonic/gin +``` + +--- + +## Step 2 — Why `gin.New()` instead of `gin.Default()` + +`gin.Default()` installs gin's own `Logger` middleware, which buffers each +log line and prints it **after** the handler returns. During a load test that +means you see nothing until the request is already complete — room events +and request logs appear out of order and the queue activity is invisible. + +`gin.New()` gives you a blank engine. We install two middlewares manually: + +```go +r := gin.New() +r.Use(gin.Recovery()) // keep the panic recovery middleware +r.Use(requestLogger()) // our logger — prints on entry AND exit +``` + +The custom `requestLogger` middleware in this sample prints a line the moment +a request arrives (`-->`) and another when it completes (`<--`). That means +you can see which requests are being held by the waiting room versus which +are executing their handler, in real time, as the load test runs. + +--- + +## Step 3 — Initialise the WaitingRoom with a small capacity + +```go +wr = &room.WaitingRoom{} +if err := wr.Init(5); err != nil { + log.Fatalf("room.Init: %v", err) +} +defer wr.Stop() +``` + +A cap of **5** is deliberately small so that `ab -c 100` fills the room +immediately and you can watch the queue build and drain in the terminal. +In production you would set this to match your actual concurrency budget +— typically the size of your database connection pool or the rate limit of +your slowest downstream dependency. + +--- + +## Step 4 — Add simulated latency to every handler + +This is the most important step for making the waiting room visible during +a load test. Without it, handlers return in microseconds and the room never +fills up even at `-c 100` because requests complete faster than they arrive. + +```go +const simulatedLatency = 500 * time.Millisecond + +func aboutPage(c *gin.Context) { + time.Sleep(simulatedLatency) // holds the semaphore slot for 500 ms + c.Data(http.StatusOK, "text/html; charset=utf-8", page("About", body)) +} +``` + +With `cap=5` and each request taking 500 ms, the server can process at most +10 requests per second. At `ab -c 100` you are sending 100 concurrent +requests, so roughly 95 of them will be queued immediately and admitted one +by one as slots open. + +--- + +## Step 5 — Register lifecycle callbacks + +Callbacks are what you will actually see in the terminal during the load +test. They fire asynchronously in their own goroutines — a slow callback +never stalls the request path. + +```go +wr.On(room.EventFull, func(s room.Snapshot) { + roomLog("FULL ", fmt.Sprintf( + "capacity reached occupancy=%d/%d queue=%d util=%.0f%%", + s.Occupancy, s.Capacity, s.QueueDepth, + pct(s.Occupancy, s.Capacity), + )) +}) + +wr.On(room.EventDrain, func(s room.Snapshot) { + roomLog("DRAIN ", fmt.Sprintf( + "room no longer full occupancy=%d/%d queue=%d", + s.Occupancy, s.Capacity, s.QueueDepth, + )) +}) + +wr.On(room.EventQueue, func(s room.Snapshot) { + roomLog("QUEUE ", fmt.Sprintf( + "request queued depth=%d occupancy=%d/%d util=%.0f%%", + s.QueueDepth, s.Occupancy, s.Capacity, + pct(s.Occupancy, s.Capacity), + )) +}) + +wr.On(room.EventEnter, func(s room.Snapshot) { + roomLog("ENTER ", fmt.Sprintf( + "slot acquired occupancy=%d/%d queue=%d util=%.0f%%", + s.Occupancy, s.Capacity, s.QueueDepth, + pct(s.Occupancy, s.Capacity), + )) +}) + +wr.On(room.EventExit, func(s room.Snapshot) { + roomLog("EXIT ", fmt.Sprintf( + "slot released occupancy=%d/%d queue=%d util=%.0f%%", + s.Occupancy, s.Capacity, s.QueueDepth, + pct(s.Occupancy, s.Capacity), + )) +}) + +wr.On(room.EventEvict, func(s room.Snapshot) { + roomLog("EVICT ", fmt.Sprintf( + "ghost ticket removed queue=%d occupancy=%d/%d", + s.QueueDepth, s.Occupancy, s.Capacity, + )) +}) + +wr.On(room.EventTimeout, func(s room.Snapshot) { + roomLog("TIMEOUT", fmt.Sprintf( + "context cancelled before admission occupancy=%d/%d queue=%d", + s.Occupancy, s.Capacity, s.QueueDepth, + )) +}) +``` + +The `roomLog` helper prefixes every line with a fixed-width tag so you can +filter the output with `grep`: + +```bash +go run main.go 2>&1 | grep '\[QUEUE\]' # only queuing events +go run main.go 2>&1 | grep '\[FULL\]' # only full-capacity events +go run main.go 2>&1 | grep -v '\[REQ\]' # room events only, no request logs +``` + +--- + +## Step 6 — Register routes in the correct order + +```go +// Routes registered before RegisterRoutes bypass the waiting room. +// Use this for health checks and readiness probes that must always succeed. +// r.GET("/healthz", func(c *gin.Context) { c.Status(http.StatusOK) }) + +wr.RegisterRoutes(r) + +// Routes registered after RegisterRoutes are gated by the waiting room. +r.GET("/", homePage) +r.GET("/about", aboutPage) +r.GET("/pricing", pricingPage) +r.GET("/contact", contactPage) +``` + +--- + +## Step 7 — Run the server + +**Terminal 1:** + +```bash +go run main.go +``` + +You should see: + +``` +[ INFO ] listening on http://localhost:8080 cap=5 +``` + +--- + +## Step 8 — Run the load test + +**Terminal 2:** + +```bash +ab -t 60 -n 1000 -c 100 http://localhost:8080/about +``` + +| Flag | Meaning | +|---|---| +| `-t 60` | run for 60 seconds | +| `-n 1000` | send at most 1000 total requests | +| `-c 100` | maintain 100 concurrent connections | + +--- + +## Step 9 — Read the logs + +Switch back to Terminal 1. You will see output like this: + +``` +[ INFO ] listening on http://localhost:8080 cap=5 +[ REQ ] --> GET /about remote=127.0.0.1 +[ REQ ] --> GET /about remote=127.0.0.1 +[ REQ ] --> GET /about remote=127.0.0.1 +[ REQ ] --> GET /about remote=127.0.0.1 +[ REQ ] --> GET /about remote=127.0.0.1 +[ ENTER ] slot acquired occupancy=1/5 queue=0 util=20% +[ ENTER ] slot acquired occupancy=2/5 queue=0 util=40% +[ ENTER ] slot acquired occupancy=3/5 queue=0 util=60% +[ ENTER ] slot acquired occupancy=4/5 queue=0 util=80% +[ ENTER ] slot acquired occupancy=5/5 queue=0 util=100% +[ FULL ] capacity reached occupancy=5/5 queue=0 util=100% +[ QUEUE ] request queued depth=1 occupancy=5/5 util=100% +[ QUEUE ] request queued depth=2 occupancy=5/5 util=100% +[ QUEUE ] request queued depth=3 occupancy=5/5 util=100% +... +[ EXIT ] slot released occupancy=4/5 queue=95 util=80% +[ DRAIN ] room no longer full occupancy=4/5 queue=95 +[ ENTER ] slot acquired occupancy=5/5 queue=94 util=100% +[ FULL ] capacity reached occupancy=5/5 queue=94 util=100% +[ REQ ] <-- GET /about status=200 latency=500ms +[ REQ ] --> GET /about remote=127.0.0.1 +[ EXIT ] slot released occupancy=4/5 queue=93 util=80% +... +``` + +Here is what each tag means in the context of this load test: + +| Tag | What you are seeing | +|---|---| +| `[ REQ ] -->` | A new HTTP connection arrived at the server | +| `[ ENTER ]` | The request passed through the waiting room and is now running its handler | +| `[ FULL ]` | All 5 slots are occupied — the next request will queue | +| `[ QUEUE ]` | A request landed in the waiting room; `depth=N` is its position | +| `[ EXIT ]` | A handler finished and released its slot | +| `[ DRAIN ]` | The room dropped below full capacity — queued requests can now enter | +| `[ REQ ] <--` | The HTTP response was sent; `latency` includes waiting room time | +| `[ EVICT ]` | The reaper cleaned up a ghost ticket (ab closed a connection mid-wait) | +| `[ TIMEOUT ]` | A queued request's context was cancelled before it was admitted | + +### What the queue depth column tells you + +The `queue=N` value in `QUEUE` events shows how many requests are waiting +behind the one that just joined. Watch it climb during the flood and fall +as handlers complete and admit the next waiter. When `queue=0` appears in +`EXIT` events, the backlog has cleared. + +### What a healthy load test looks like + +- `FULL` fires once at the start and only fires again after a `DRAIN`. +- `DRAIN` fires every time the occupancy drops below 5 and a queued request + enters. +- `QUEUE` depth climbs quickly at the start then stays roughly stable or + trends downward as ab's concurrency saturates. +- `TIMEOUT` events appear only if ab's connection timeout is shorter than + the time a request spends waiting in the queue. Increase `-t` on ab or + add `-s 60` (socket timeout) to reduce spurious timeouts. +- `EVICT` events appear only after the reaper runs (every 10 s in this + sample). Each eviction means a client disappeared mid-queue — normal + during ab runs since ab recycles connections aggressively. + +--- + +## Step 10 — Read the ab report + +When ab finishes it prints a summary. With `cap=5` and 500 ms handlers the +numbers will look roughly like this: + +``` +Concurrency Level: 100 +Time taken for tests: 60.012 seconds +Complete requests: 1000 +Failed requests: 0 +Requests per second: 16.66 [#/sec] (mean) +Time per request: 6001.2 [ms] (mean) +Time per request: 60.01 [ms] (mean, across all concurrent requests) +``` + +The mean time per request of ~6 s reflects queuing time: a request that +arrives when the queue is 10 deep waits 10 × 500 ms before its handler +runs. That is the waiting room working as designed — absorbing burst traffic +instead of dropping it or crashing the downstream. + +To increase throughput, call `wr.SetCap` with a higher value and re-run ab: + +```bash +# in a third terminal while the server is running +curl -s http://localhost:8080/ # confirm server is up, then edit main.go +# or wire up an admin endpoint as shown in the runtime-adjustment section +``` + +--- + +## Grepping the logs for specific events + +```bash +# Room events only — filter out per-request noise +go run main.go 2>&1 | grep -v '\[ REQ' + +# Only queueing events — see the queue depth grow +go run main.go 2>&1 | grep '\[ QUEUE' + +# Only full-capacity moments +go run main.go 2>&1 | grep '\[ FULL' + +# Count how many requests were queued +go run main.go 2>&1 | grep -c '\[ QUEUE' + +# Watch the queue depth trend +go run main.go 2>&1 | grep '\[ QUEUE' | awk '{print $6}' +``` + +--- + +## Runtime capacity adjustment + +Because `wr` is a package-level variable you can change capacity without +restarting the server. Wire up an admin endpoint: + +```go +// Register this BEFORE wr.RegisterRoutes so it bypasses the waiting room. +r.POST("/admin/cap", func(c *gin.Context) { + var body struct { + Cap int32 `json:"cap"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if err := wr.SetCap(body.Cap); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{ + "cap": wr.Cap(), + "occupancy": wr.Len(), + "queue_depth": wr.QueueDepth(), + "utilization": fmt.Sprintf("%.0f%%", wr.Utilization()*100), + }) +}) +``` + +While ab is running in Terminal 2, change the cap in Terminal 3: + +```bash +# Double capacity — queued requests will immediately start being admitted +curl -s -X POST http://localhost:8080/admin/cap \ + -H 'Content-Type: application/json' \ + -d '{"cap": 10}' | jq + +# Halve it again +curl -s -X POST http://localhost:8080/admin/cap \ + -H 'Content-Type: application/json' \ + -d '{"cap": 5}' | jq +``` + +Watch the server logs — you will see a burst of `ENTER` events as queued +requests rush into the newly opened slots when you expand, and then `FULL` +almost immediately as the new capacity fills. + +--- + +## Common mistakes + +### Handlers return too fast — the room never fills up + +```go +// ✗ Wrong — returns in microseconds, room stays at occupancy=1 +func aboutPage(c *gin.Context) { + c.String(http.StatusOK, "About") +} + +// ✓ Correct for testing — holds the slot long enough to observe queuing +func aboutPage(c *gin.Context) { + time.Sleep(500 * time.Millisecond) + c.String(http.StatusOK, "About") +} +``` + +In production you do not need `time.Sleep` — real database queries, +template rendering, and downstream API calls provide the natural latency +that holds slots open. + +### Using `gin.Default()` — room events are buried in buffered output + +```go +// ✗ gin's Logger buffers and only prints after the handler returns. +// Room events appear out of order; the queue activity is invisible. +r := gin.Default() + +// ✓ Build the logger yourself so it prints on arrival, not completion. +r := gin.New() +r.Use(gin.Recovery()) +r.Use(requestLogger()) +``` + +### Registering application routes before `RegisterRoutes` + +```go +// ✗ /about is not gated — it bypasses the waiting room entirely +r.GET("/about", aboutPage) +wr.RegisterRoutes(r) + +// ✓ All four pages are protected +wr.RegisterRoutes(r) +r.GET("/about", aboutPage) +``` + +### Forgetting `defer wr.Stop()` + +Without it the reaper goroutine outlives the `http.Server`. In tests that +construct and discard `WaitingRoom` instances it leaks goroutines and +triggers the race detector. + +--- + +## File layout + +``` +sample/basic-web-app/ +├── main.go ← the result of this tutorial +├── README.md ← this file +└── go.mod +``` + +--- + +## Testing It Yourself + +This allows you to connect to [localhost:8080](http://localhost:8080/about) and see +yourself in the waiting room, then get entered. Hit refresh, you're back in the room. + +It's easy to do: + +```bash +chmod +x test.sh +./test.sh +``` + +Then [connect to localhost](https://localhost:8080/about) and see it for yourself! + +```log +╭─andrei@Andreis-Mac-Studio ~/work/personal/room/sample/basic-web-app ‹main› +╰─$ ./test.sh + +╔══════════════════════════════════════════════════╗ +║ room — Waiting Room Load Test ║ +╚══════════════════════════════════════════════════╝ + + target: http://localhost:8080/about + concurrency: 30 simultaneous clients + duration: 30s + ramp delay: 50ms between client launches + +Building server... +Starting server... +✓ Server is up at http://localhost:8080 (PID 68079) + +────────────────────────────────────────────────────────────────────── + Open http://localhost:8080/ in your browser to see the waiting room. +────────────────────────────────────────────────────────────────────── + + Server log: tail -f /var/folders/0m/y8d29v892039ldkgqkxbfvvh0000gn/T/tmp.9v9lhtoiY9/server.log + +▶ Starting load test... + + [ 29s] sent:191 served:61 queued:125 err:0 active:130 ~2 req/s [wave 7] + +⏳ Draining in-flight requests (up to 30s)... + + [ 59s] sent:191 served:131 queued:55 err:0 active:60 ~2 req/s [draining] + +Server lifecycle events: +────────────────────────────────────────────────────────────────────── +2026/04/13 15:24:05 [ ENTER ] slot acquired occupancy=1/5 queue=0 util=20% +2026/04/13 15:24:05 [ EXIT ] slot released occupancy=0/5 queue=0 util=0% +2026/04/13 15:24:07 [ ENTER ] slot acquired occupancy=1/5 queue=0 util=20% +2026/04/13 15:24:08 [ ENTER ] slot acquired occupancy=2/5 queue=0 util=40% +2026/04/13 15:24:08 [ ENTER ] slot acquired occupancy=3/5 queue=0 util=60% +2026/04/13 15:24:08 [ ENTER ] slot acquired occupancy=4/5 queue=0 util=80% +2026/04/13 15:24:08 [ FULL ] capacity reached occupancy=5/5 queue=0 util=100% +2026/04/13 15:24:08 [ ENTER ] slot acquired occupancy=5/5 queue=0 util=100% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=1 occupancy=5/5 util=100% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=2 occupancy=5/5 util=100% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=3 occupancy=5/5 util=100% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=4 occupancy=5/5 util=100% +2026/04/13 15:24:08 [ EXIT ] slot released occupancy=4/5 queue=3 util=80% +2026/04/13 15:24:08 [ DRAIN ] room no longer full occupancy=4/5 queue=3 +2026/04/13 15:24:08 [ QUEUE ] request queued depth=4 occupancy=4/5 util=80% +2026/04/13 15:24:08 [ EXIT ] slot released occupancy=3/5 queue=3 util=60% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=4 occupancy=3/5 util=60% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=5 occupancy=3/5 util=60% +2026/04/13 15:24:08 [ EXIT ] slot released occupancy=2/5 queue=4 util=40% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=5 occupancy=2/5 util=40% +2026/04/13 15:24:08 [ EXIT ] slot released occupancy=1/5 queue=4 util=20% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=5 occupancy=1/5 util=20% +2026/04/13 15:24:08 [ EXIT ] slot released occupancy=0/5 queue=4 util=0% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=5 occupancy=0/5 util=0% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=6 occupancy=0/5 util=0% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=7 occupancy=0/5 util=0% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=8 occupancy=0/5 util=0% +2026/04/13 15:24:08 [ QUEUE ] request queued depth=9 occupancy=0/5 util=0% +2026/04/13 15:24:09 [ QUEUE ] request queued depth=10 occupancy=0/5 util=0% +2026/04/13 15:24:09 [ QUEUE ] request queued depth=11 occupancy=0/5 util=0% + (no lifecycle events captured) +^C +Stopping server (PID 68079)... +``` + +--- + +## License + +Apache 2.0 — see the root [`LICENSE`](../../LICENSE) file. +``` \ No newline at end of file diff --git a/sample/basic-web-app/main.go b/sample/basic-web-app/main.go new file mode 100644 index 0000000..659155f --- /dev/null +++ b/sample/basic-web-app/main.go @@ -0,0 +1,306 @@ +package main + +import ( + "context" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/andreimerlescu/room" + "github.com/gin-gonic/gin" +) + +// wr is the WaitingRoom instance. Keeping it package-level lets you call +// wr.SetCap, wr.SetReaperInterval, or wr.On from a config-reload handler +// without restarting the server. +var wr *room.WaitingRoom + +func main() { + // ── 1. Use gin.New() instead of gin.Default() ───────────────────────── + // + // gin.Default() installs gin's own Logger middleware, which buffers + // output and formats it after the handler returns. That makes it hard + // to see room events interleaved with request logs in real time. + // gin.New() gives us a blank engine so we can install our own logger + // that prints immediately, before and after each request. + r := gin.New() + r.Use(gin.Recovery()) // keep the panic recovery middleware + r.Use(requestLogger()) // our structured logger — prints on entry AND exit + + // ── 2. Create and initialise the WaitingRoom ───────────────────────── + // + // Cap of 5 is deliberately small so that `ab -c 100` fills the room + // immediately and you can watch the queue build and drain in the logs. + // In production you would set this to match your actual concurrency budget. + wr = &room.WaitingRoom{} + if err := wr.Init(5); err != nil { + log.Fatalf("room.Init: %v", err) + } + defer wr.Stop() + + // ── 3. Configure the WaitingRoom ───────────────────────────────────── + + // Leave SetSecureCookie at its default (false) for local development + // so the cookie works over plain http://localhost. + // Call wr.SetSecureCookie(true) in production behind TLS. + + // Tighten the reaper so ghost tickets from ab's aborted connections + // are cleaned up quickly during the load test. + if err := wr.SetReaperInterval(10 * time.Second); err != nil { + log.Fatalf("room.SetReaperInterval: %v", err) + } + + // ── 4. Lifecycle callbacks ──────────────────────────────────────────── + // + // These callbacks are what you will see in the terminal during ab. + // Each line is prefixed with a tag so you can grep for it: + // + // grep '\[FULL\]' — moments the room hit capacity + // grep '\[QUEUE\]' — every request that had to wait + // grep '\[ENTER\]' — every admission into active service + // grep '\[EXIT\]' — every slot release + // grep '\[DRAIN\]' — moments the room dropped below capacity + // grep '\[EVICT\]' — abandoned ghost tickets removed by the reaper + // grep '\[TIMEOUT\]'— requests whose context was cancelled mid-queue + + wr.On(room.EventFull, func(s room.Snapshot) { + roomLog("FULL ", fmt.Sprintf( + "capacity reached occupancy=%d/%d queue=%d util=%.0f%%", + s.Occupancy, s.Capacity, s.QueueDepth, + pct(s.Occupancy, s.Capacity), + )) + }) + + wr.On(room.EventDrain, func(s room.Snapshot) { + roomLog("DRAIN ", fmt.Sprintf( + "room no longer full occupancy=%d/%d queue=%d", + s.Occupancy, s.Capacity, s.QueueDepth, + )) + }) + + wr.On(room.EventQueue, func(s room.Snapshot) { + roomLog("QUEUE ", fmt.Sprintf( + "request queued depth=%d occupancy=%d/%d util=%.0f%%", + s.QueueDepth, s.Occupancy, s.Capacity, + pct(s.Occupancy, s.Capacity), + )) + }) + + wr.On(room.EventEnter, func(s room.Snapshot) { + roomLog("ENTER ", fmt.Sprintf( + "slot acquired occupancy=%d/%d queue=%d util=%.0f%%", + s.Occupancy, s.Capacity, s.QueueDepth, + pct(s.Occupancy, s.Capacity), + )) + }) + + wr.On(room.EventExit, func(s room.Snapshot) { + roomLog("EXIT ", fmt.Sprintf( + "slot released occupancy=%d/%d queue=%d util=%.0f%%", + s.Occupancy, s.Capacity, s.QueueDepth, + pct(s.Occupancy, s.Capacity), + )) + }) + + wr.On(room.EventEvict, func(s room.Snapshot) { + roomLog("EVICT ", fmt.Sprintf( + "ghost ticket removed queue=%d occupancy=%d/%d", + s.QueueDepth, s.Occupancy, s.Capacity, + )) + }) + + wr.On(room.EventTimeout, func(s room.Snapshot) { + roomLog("TIMEOUT", fmt.Sprintf( + "context cancelled before admission occupancy=%d/%d queue=%d", + s.Occupancy, s.Capacity, s.QueueDepth, + )) + }) + + // ── 5. Register the WaitingRoom routes ─────────────────────────────── + // + // RegisterRoutes must come BEFORE your application routes. + // It installs, in order: + // OPTIONS /queue/status — CORS preflight + // GET /queue/status — polling endpoint for the waiting-room page + // r.Use(wr.Middleware()) — gates every route registered after this + wr.RegisterRoutes(r) + + // ── 6. Application routes (all gated by the waiting room) ──────────── + + r.GET("/", homePage) + r.GET("/about", aboutPage) + r.GET("/pricing", pricingPage) + r.GET("/contact", contactPage) + + // ── 7. Graceful shutdown ────────────────────────────────────────────── + + srv := &http.Server{ + Addr: ":8080", + Handler: r, + } + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + go func() { + log.Printf("[ INFO ] listening on http://localhost:8080 cap=%d", wr.Cap()) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("ListenAndServe: %v", err) + } + }() + + <-quit + log.Println("[ INFO ] shutdown signal received — draining in-flight requests...") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("[ ERROR ] server forced to shut down: %v", err) + } + log.Println("[ INFO ] server exited cleanly") +} + +// ── Page handlers ───────────────────────────────────────────────────────────── +// +// Each handler sleeps for a realistic duration so that concurrent ab requests +// actually hold their semaphore slots long enough for the room to fill up. +// Without the sleep, handlers return in microseconds and you will never see +// the waiting room trigger, even at -c 100. + +const simulatedLatency = 500 * time.Millisecond + +func homePage(c *gin.Context) { + time.Sleep(simulatedLatency) + c.Data(http.StatusOK, "text/html; charset=utf-8", page( + "Home", + `

Welcome

+

This server admits at most 5 concurrent requests.

+

+ Run ab -t 60 -n 1000 -c 100 http://localhost:8080/about + in a second terminal and watch this terminal for room events. +

+ `, + )) +} + +func aboutPage(c *gin.Context) { + time.Sleep(simulatedLatency) + c.Data(http.StatusOK, "text/html; charset=utf-8", page( + "About", + `

About Us

+

+ We use room — a FIFO waiting room middleware for + Go + Gin — to keep this service stable under sudden load spikes. + Instead of dropping excess requests with a 429, callers wait their + turn and are admitted in the order they arrived. +

+ ← Home`, + )) +} + +func pricingPage(c *gin.Context) { + time.Sleep(simulatedLatency) + c.Data(http.StatusOK, "text/html; charset=utf-8", page( + "Pricing", + `

Pricing

+ + + + + + +
TierRequests / dayQueue priority
Free100Standard
ProUnlimitedStandard
+ ← Home`, + )) +} + +func contactPage(c *gin.Context) { + time.Sleep(simulatedLatency) + c.Data(http.StatusOK, "text/html; charset=utf-8", page( + "Contact", + `

Contact

+

Email us at hello@example.com

+ ← Home`, + )) +} + +// ── Middleware ──────────────────────────────────────────────────────────────── + +// requestLogger returns a gin middleware that prints a line when the request +// arrives and another when it completes. Printing on arrival makes it +// immediately visible which requests are being held by the waiting room +// versus which are actively executing their handler. +func requestLogger() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + + // Skip logging the status-polling endpoint — it fires every 3 s per + // queued client and would bury the room events in noise. + if c.Request.URL.Path == "/queue/status" { + c.Next() + return + } + + log.Printf("[ REQ ] --> %s %s remote=%s", + c.Request.Method, c.Request.URL.Path, c.ClientIP()) + + c.Next() + + log.Printf("[ REQ ] <-- %s %s status=%d latency=%s", + c.Request.Method, c.Request.URL.Path, + c.Writer.Status(), time.Since(start).Round(time.Millisecond)) + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +// roomLog prints a room event line with a consistent format so all room +// events sort together when the output is piped through sort or grep. +func roomLog(tag, msg string) { + log.Printf("[ %s ] %s", tag, msg) +} + +// pct converts an occupancy/capacity pair to a percentage, guarding +// against division by zero if capacity is somehow zero. +func pct(occupancy, capacity int) float64 { + if capacity == 0 { + return 0 + } + return float64(occupancy) / float64(capacity) * 100 +} + +// page wraps a body fragment in a complete, styled HTML document. +func page(title, body string) []byte { + return []byte(` + + + + + ` + title + ` — Basic Web App + + +` + body + ` +`) +} diff --git a/sample/basic-web-app/test.sh b/sample/basic-web-app/test.sh new file mode 100755 index 0000000..9d30df4 --- /dev/null +++ b/sample/basic-web-app/test.sh @@ -0,0 +1,408 @@ +#!/usr/bin/env bash +# +# test.sh — live waiting-room load test for the basic-web-app sample. +# +# Usage: +# cd sample/basic-web-app +# bash test.sh +# +# The script builds and starts the server, runs a load test, prints a +# live dashboard, then shuts everything down. Open http://localhost:8080/ +# in a browser while it runs to see yourself in the queue. +# +# Requirements: +# bash >= 5.2 +# go (to build and run the server) +# curl (any recent version) +# jq (for JSON parsing) +# +# Works on macOS and Linux — no flock, no GNU coreutils required. +# +# ───────────────────────────────────────────────────────────────────── + +set -euo pipefail + +# ── Guard: bash version ────────────────────────────────────────────── + +if [[ "${BASH_VERSINFO[0]}" -lt 5 ]] || { [[ "${BASH_VERSINFO[0]}" -eq 5 ]] && [[ "${BASH_VERSINFO[1]}" -lt 2 ]]; }; then + echo "error: bash >= 5.2 required (found ${BASH_VERSION})" >&2 + exit 1 +fi + +# ── Guard: not root ────────────────────────────────────────────────── + +if [[ "$(id -u)" -eq 0 ]]; then + echo "error: do not run as root" >&2 + exit 1 +fi + +# ── Guard: dependencies ────────────────────────────────────────────── + +for cmd in go curl jq; do + if ! command -v "$cmd" &>/dev/null; then + echo "error: $cmd is required but not found in PATH" >&2 + exit 1 + fi +done + +# ── Configuration ──────────────────────────────────────────────────── + +BASE_URL="${BASE_URL:-http://localhost:8080}" +TARGET_PATH="${TARGET_PATH:-/about}" +CONCURRENCY="${CONCURRENCY:-30}" +DURATION_SECS="${DURATION_SECS:-30}" +RAMP_DELAY_MS="${RAMP_DELAY_MS:-50}" + +# ── Colors ─────────────────────────────────────────────────────────── + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +MAGENTA='\033[0;35m' +CYAN='\033[0;36m' +BOLD='\033[1m' +DIM='\033[2m' +RESET='\033[0m' + +# ── Temp directory and tally directories ───────────────────────────── +# +# Each client session touches a unique file in a per-event directory. +# The dashboard counts files. Lock-free, atomic, works everywhere. + +TMPDIR_TEST="$(mktemp -d)" + +mkdir -p "${TMPDIR_TEST}/tally_sent" +mkdir -p "${TMPDIR_TEST}/tally_served" +mkdir -p "${TMPDIR_TEST}/tally_queued" +mkdir -p "${TMPDIR_TEST}/tally_errors" + +SERVER_PID="" +SERVER_LOG="${TMPDIR_TEST}/server.log" + +# ── Cleanup ────────────────────────────────────────────────────────── + +cleanup() { + echo "" + if [[ -n "$SERVER_PID" ]] && kill -0 "$SERVER_PID" 2>/dev/null; then + echo -e "${DIM}Stopping server (PID ${SERVER_PID})...${RESET}" + kill -SIGTERM "$SERVER_PID" 2>/dev/null || true + wait "$SERVER_PID" 2>/dev/null || true + fi + jobs -rp 2>/dev/null | xargs kill 2>/dev/null || true + wait 2>/dev/null || true + rm -rf "$TMPDIR_TEST" +} +trap cleanup EXIT + +# ── Tally helpers (lock-free, subshell-safe) ───────────────────────── + +tally() { + local name="$1" + local id="$2" + touch "${TMPDIR_TEST}/tally_${name}/${id}" +} + +tally_count() { + local name="$1" + find "${TMPDIR_TEST}/tally_${name}" -type f 2>/dev/null | wc -l | tr -d ' ' +} + +# ── Safe grep count (always returns a plain integer) ───────────────── +# grep -c can produce unexpected output on macOS with certain inputs. +# This helper always returns a single integer on stdout. + +grep_count() { + local pattern="$1" + local file="$2" + local result + result=$(grep -c "$pattern" "$file" 2>/dev/null || true) + # Strip whitespace and take only the first line. + result=$(echo "$result" | head -1 | tr -d '[:space:]') + if [[ -z "$result" ]] || ! [[ "$result" =~ ^[0-9]+$ ]]; then + echo "0" + else + echo "$result" + fi +} + +# ── Start the server ───────────────────────────────────────────────── + +start_server() { + echo -e "${DIM}Building server...${RESET}" + + if [[ ! -f "main.go" ]]; then + echo "error: main.go not found. Run this from sample/basic-web-app/" >&2 + exit 1 + fi + + go build -o "${TMPDIR_TEST}/basic-web-app" . 2>&1 + + echo -e "${DIM}Starting server...${RESET}" + "${TMPDIR_TEST}/basic-web-app" > "$SERVER_LOG" 2>&1 & + SERVER_PID=$! + + local attempts=0 + while [[ $attempts -lt 100 ]]; do + if curl -s -o /dev/null --max-time 1 "${BASE_URL}/" 2>/dev/null; then + echo -e "${GREEN}✓${RESET} Server is up at ${BASE_URL} (PID ${SERVER_PID})" + return 0 + fi + sleep 0.1 + ((attempts++)) || true + done + + echo -e "${RED}error:${RESET} server did not start within 10 seconds" >&2 + if [[ -f "$SERVER_LOG" ]]; then + echo "Last 20 lines of server log:" >&2 + tail -20 "$SERVER_LOG" >&2 + fi + exit 1 +} + +# ── Single client session ──────────────────────────────────────────── + +client_session() { + local id="$1" + local cookie_jar="${TMPDIR_TEST}/cookies_${id}.txt" + local body_file="${TMPDIR_TEST}/body_${id}.txt" + + local http_code + http_code=$(curl -s -o "$body_file" -w '%{http_code}' \ + -c "$cookie_jar" -b "$cookie_jar" \ + --max-time 10 \ + "${BASE_URL}${TARGET_PATH}" 2>/dev/null || echo "000") + + tally "sent" "$id" + + if [[ -f "$body_file" ]] && grep -q "You're in the queue" "$body_file" 2>/dev/null; then + tally "queued" "$id" + + local max_polls=40 + local poll_count=0 + while [[ $poll_count -lt $max_polls ]]; do + local jitter_ms=$(( (RANDOM % 1000) + 2500 )) + sleep "$(printf '%d.%03d' $((jitter_ms / 1000)) $((jitter_ms % 1000)))" + + local status_json + status_json=$(curl -s -b "$cookie_jar" --max-time 5 \ + "${BASE_URL}/queue/status" 2>/dev/null || echo '{}') + + local ready + ready=$(echo "$status_json" | jq -r '.ready // false' 2>/dev/null || echo "false") + + if [[ "$ready" == "true" ]]; then + http_code=$(curl -s -o /dev/null -w '%{http_code}' \ + -c "$cookie_jar" -b "$cookie_jar" \ + --max-time 10 \ + "${BASE_URL}${TARGET_PATH}" 2>/dev/null || echo "000") + + if [[ "$http_code" == "200" ]]; then + tally "served" "$id" + else + tally "errors" "$id" + fi + rm -f "$cookie_jar" "$body_file" + return + fi + + ((poll_count++)) || true + done + + tally "errors" "$id" + elif [[ "$http_code" == "200" ]]; then + tally "served" "$id" + else + tally "errors" "$id" + fi + + rm -f "$cookie_jar" "$body_file" +} + +# ── Dashboard ──────────────────────────────────────────────────────── + +print_dashboard() { + local elapsed="$1" + local phase="$2" + + local c_sent c_served c_queued c_errors + c_sent=$(tally_count "sent") + c_served=$(tally_count "served") + c_queued=$(tally_count "queued") + c_errors=$(tally_count "errors") + + local active + active=$(jobs -rp 2>/dev/null | wc -l | tr -d ' ') + if [[ -n "$SERVER_PID" ]] && kill -0 "$SERVER_PID" 2>/dev/null; then + active=$((active - 1)) + if [[ $active -lt 0 ]]; then active=0; fi + fi + + local rps=0 + if [[ $elapsed -gt 0 ]]; then + rps=$((c_served / elapsed)) + fi + + local queue_now=$((c_queued - c_served - c_errors)) + if [[ $queue_now -lt 0 ]]; then queue_now=0; fi + + printf "\r ${BOLD}[%3ds]${RESET} " "$elapsed" + printf "${CYAN}sent:${RESET}%-4d " "$c_sent" + printf "${GREEN}served:${RESET}%-4d " "$c_served" + printf "${YELLOW}queued:${RESET}%-4d " "$queue_now" + printf "${RED}err:${RESET}%-3d " "$c_errors" + printf "${MAGENTA}active:${RESET}%-3d " "$active" + printf "${DIM}~%d req/s${RESET} " "$rps" + printf "${DIM}[%s]${RESET} " "$phase" +} + +# ── Main ───────────────────────────────────────────────────────────── + +main() { + echo "" + echo -e "${BOLD}╔══════════════════════════════════════════════════╗${RESET}" + echo -e "${BOLD}║ room — Waiting Room Load Test ║${RESET}" + echo -e "${BOLD}╚══════════════════════════════════════════════════╝${RESET}" + echo "" + echo -e " target: ${BOLD}${BASE_URL}${TARGET_PATH}${RESET}" + echo -e " concurrency: ${BOLD}${CONCURRENCY}${RESET} simultaneous clients" + echo -e " duration: ${BOLD}${DURATION_SECS}s${RESET}" + echo -e " ramp delay: ${BOLD}${RAMP_DELAY_MS}ms${RESET} between client launches" + echo "" + + start_server + echo "" + + echo -e "${DIM}──────────────────────────────────────────────────────────────────────${RESET}" + echo -e "${BOLD} Open ${CYAN}${BASE_URL}/${RESET}${BOLD} in your browser to see the waiting room.${RESET}" + echo -e "${DIM}──────────────────────────────────────────────────────────────────────${RESET}" + echo "" + echo -e " ${DIM}Server log: tail -f ${SERVER_LOG}${RESET}" + echo "" + + sleep 2 + + local start_time=$SECONDS + local end_time=$((SECONDS + DURATION_SECS)) + local wave=0 + + echo -e "${GREEN}▶${RESET} Starting load test..." + echo "" + + while [[ $SECONDS -lt $end_time ]]; do + ((wave++)) || true + + local batch_size=$CONCURRENCY + local remaining=$((end_time - SECONDS)) + if [[ $remaining -lt 5 ]]; then + batch_size=$(( (CONCURRENCY / 3) + 1 )) + fi + + for (( i=0; i/dev/null || sleep 0.05 + done + + local wave_start=$SECONDS + while [[ $((SECONDS - wave_start)) -lt 3 ]] && [[ $SECONDS -lt $end_time ]]; do + print_dashboard "$((SECONDS - start_time))" "wave ${wave}" + sleep 0.5 + done + done + + echo "" + echo "" + echo -e "${YELLOW}⏳${RESET} Draining in-flight requests (up to 30s)..." + echo "" + + local drain_deadline=$((SECONDS + 30)) + while [[ $SECONDS -lt $drain_deadline ]]; do + local bg_count + bg_count=$(jobs -rp 2>/dev/null | wc -l | tr -d ' ') + if [[ $bg_count -le 1 ]]; then + break + fi + print_dashboard "$((SECONDS - start_time))" "draining" + sleep 1 + done + + echo "" + echo "" + + # ── Server log highlights ──────────────────────────────────────── + # The server log uses tags like "[ FULL ]" with internal spaces, + # so we grep for the keyword anywhere on the line. + + echo -e "${BOLD}Server lifecycle events:${RESET}" + echo -e "${DIM}──────────────────────────────────────────────────────────────────────${RESET}" + if [[ -f "$SERVER_LOG" ]]; then + # Match the actual log format: [ FULL ], [ DRAIN ], etc. + grep -E '(FULL|DRAIN|QUEUE|ENTER|EXIT|EVICT|TIMEOUT)' "$SERVER_LOG" \ + | head -30 || echo " (no lifecycle events captured)" + + local event_count + event_count=$(grep_count -E 'FULL|DRAIN|QUEUE|ENTER|EXIT|EVICT|TIMEOUT' "$SERVER_LOG") + if [[ "$event_count" -gt 30 ]]; then + echo -e " ${DIM}... and $((event_count - 30)) more events${RESET}" + fi + else + echo " (server log not found)" + fi + echo -e "${DIM}──────────────────────────────────────────────────────────────────────${RESET}" + echo "" + + # ── Summary ────────────────────────────────────────────────────── + + local c_sent c_served c_queued c_errors + c_sent=$(tally_count "sent") + c_served=$(tally_count "served") + c_queued=$(tally_count "queued") + c_errors=$(tally_count "errors") + + local total_elapsed=$((SECONDS - start_time)) + local effective_rps=0 + if [[ $total_elapsed -gt 0 ]]; then + effective_rps=$((c_served / total_elapsed)) + fi + + local full_events drain_events queue_events + full_events=$(grep_count 'FULL' "$SERVER_LOG") + drain_events=$(grep_count 'DRAIN' "$SERVER_LOG") + queue_events=$(grep_count 'QUEUE' "$SERVER_LOG") + + echo -e "${BOLD}╔══════════════════════════════════════════════════╗${RESET}" + echo -e "${BOLD}║ Results ║${RESET}" + echo -e "${BOLD}╠══════════════════════════════════════════════════╣${RESET}" + printf "${BOLD}║${RESET} %-22s ${CYAN}%5d${RESET} ${BOLD}║${RESET}\n" "Total sent:" "$c_sent" + printf "${BOLD}║${RESET} %-22s ${GREEN}%5d${RESET} ${BOLD}║${RESET}\n" "Served (200):" "$c_served" + printf "${BOLD}║${RESET} %-22s ${YELLOW}%5d${RESET} ${BOLD}║${RESET}\n" "Queued (waited):" "$c_queued" + printf "${BOLD}║${RESET} %-22s ${RED}%5d${RESET} ${BOLD}║${RESET}\n" "Errors:" "$c_errors" + printf "${BOLD}║${RESET} %-22s %3ds ${BOLD}║${RESET}\n" "Elapsed:" "$total_elapsed" + printf "${BOLD}║${RESET} %-22s %3d req/s ${BOLD}║${RESET}\n" "Throughput:" "$effective_rps" + printf "${BOLD}║${RESET} %-22s %3d ${BOLD}║${RESET}\n" "Waves:" "$wave" + echo -e "${BOLD}╠══════════════════════════════════════════════════╣${RESET}" + printf "${BOLD}║${RESET} %-22s %5d ${BOLD}║${RESET}\n" "FULL transitions:" "$full_events" + printf "${BOLD}║${RESET} %-22s %5d ${BOLD}║${RESET}\n" "DRAIN transitions:" "$drain_events" + printf "${BOLD}║${RESET} %-22s %5d ${BOLD}║${RESET}\n" "QUEUE events:" "$queue_events" + echo -e "${BOLD}╚══════════════════════════════════════════════════╝${RESET}" + echo "" + + if [[ "$c_queued" -gt 0 ]]; then + echo -e "${GREEN}✓${RESET} Waiting room activated — ${c_queued} requests queued." + echo -e " ${full_events} FULL / ${drain_events} DRAIN transitions." + else + echo -e "${YELLOW}⚠${RESET} No requests were queued. Try:" + echo " CONCURRENCY=100 bash test.sh" + fi + + if [[ "$c_errors" -gt 0 ]]; then + echo "" + echo -e "${YELLOW}⚠${RESET} ${c_errors} errors — expected for clients whose poll timeout" + echo " expired before admission." + fi + + echo "" + echo -e "${DIM}Full server log: ${SERVER_LOG}${RESET}" + echo "" +} + +main "$@" \ No newline at end of file diff --git a/status.go b/status.go index 033ffa8..45d79ce 100644 --- a/status.go +++ b/status.go @@ -18,6 +18,16 @@ import ( // it returns ready=true so the client retries the original request and // either enters or re-queues cleanly. // +// Each successful status poll (where the client is still actively waiting) +// refreshes the token's issuedAt timestamp, preventing the reaper from +// evicting tokens that belong to actively polling clients. This makes the +// effective TTL a sliding window from the last poll rather than a fixed +// window from initial issuance. +// +// A per-token rate limit prevents clients from hammering this endpoint +// faster than statusPollMinInterval. Polls arriving too quickly receive +// a Retry-After header and a 429 status. +// // Related: WaitingRoom.Middleware, WaitingRoom.RegisterRoutes func (wr *WaitingRoom) StatusHandler() gin.HandlerFunc { return func(c *gin.Context) { @@ -27,28 +37,53 @@ func (wr *WaitingRoom) StatusHandler() gin.HandlerFunc { cookie, err := c.Request.Cookie(cookieName) if err != nil { + // No cookie — client has no queued position; send them back + // to try the main handler. c.JSON(http.StatusOK, statusResponse{Ready: true}) return } - entry, ok := wr.tokens.get(cookie.Value) - if !ok { + // Use deleteIfExpired to atomically check and remove in a single + // write-lock scope, eliminating the TOCTOU window between a + // separate isExpired check and delete. + if wr.tokens.deleteIfExpired(cookie.Value) { c.JSON(http.StatusOK, statusResponse{Ready: true}) return } - if wr.tokens.isExpired(cookie.Value) { - wr.tokens.delete(cookie.Value) + entry, ok := wr.tokens.get(cookie.Value) + if !ok { + // Token was deleted between deleteIfExpired and get — treat + // as expired/admitted. c.JSON(http.StatusOK, statusResponse{Ready: true}) return } + // Per-token poll rate limiting. If the client is polling faster + // than statusPollMinInterval, return 429 with a Retry-After + // header to shed excess load without touching the token store + // write lock repeatedly. + if prevPoll, found := wr.tokens.touchLastPoll(cookie.Value); found { + if !prevPoll.IsZero() && time.Since(prevPoll) < statusPollMinInterval { + c.Header("Retry-After", "1") + c.JSON(http.StatusTooManyRequests, statusResponse{ + Ready: false, + Position: wr.positionOf(entry.ticket), + }) + return + } + } + position := wr.positionOf(entry.ticket) if position <= 0 { c.JSON(http.StatusOK, statusResponse{Ready: true}) return } + // Client is still actively waiting — refresh the sliding TTL so + // that the reaper does not evict tokens from polling clients. + wr.tokens.touchIssuedAt(cookie.Value) + c.JSON(http.StatusOK, statusResponse{ Ready: false, Position: position, @@ -60,6 +95,10 @@ func (wr *WaitingRoom) StatusHandler() gin.HandlerFunc { // positionOf returns the raw queue position for a ticket. A value <= 0 // means the ticket is within the serving window and eligible for admission. // Callers that need a display-safe value (minimum 1) should clamp separately. +// +// This is the single authoritative formula for queue position used by both +// StatusHandler and Middleware. Having one implementation prevents the two +// call sites from silently diverging during future edits. func (wr *WaitingRoom) positionOf(ticket int64) int64 { return ticket - wr.nowServing.Load() - int64(wr.cap.Load()) } @@ -69,19 +108,28 @@ func (wr *WaitingRoom) positionOf(ticket int64) int64 { // always bypasses the queue — if you register routes manually, always add // StatusHandler before Use(Middleware()). // +// # CORS note +// +// If your deployment serves the waiting-room page from a different origin +// than the API (or if any CORS middleware is active), register an OPTIONS +// handler for /queue/status as well so that preflight requests from the +// polling fetch() call succeed: +// +// r.OPTIONS("/queue/status", func(c *gin.Context) { c.Status(http.StatusNoContent) }) +// r.GET("/queue/status", wr.StatusHandler()) +// r.Use(wr.Middleware()) +// // Usage: // // wr := &room.WaitingRoom{} // wr.Init(500) // wr.RegisterRoutes(r) // -// This is equivalent to: -// -// r.GET("/queue/status", wr.StatusHandler()) -// r.Use(wr.Middleware()) -// // Related: WaitingRoom.StatusHandler, WaitingRoom.Middleware func (wr *WaitingRoom) RegisterRoutes(r *gin.Engine) { + r.OPTIONS("/queue/status", func(c *gin.Context) { + c.Status(http.StatusNoContent) + }) r.GET("/queue/status", wr.StatusHandler()) r.Use(wr.Middleware()) } @@ -95,15 +143,3 @@ func generateToken() (string, error) { } return hex.EncodeToString(b), nil } - -// issuedAt records when a token was created. -// Used by the reaper to enforce cookieTTL eviction. -func (ts *tokenStore) isExpired(token string) bool { - ts.mu.RLock() - defer ts.mu.RUnlock() - entry, ok := ts.entries[token] - if !ok { - return true - } - return time.Since(entry.issuedAt) > cookieTTL -} diff --git a/types.go b/types.go index 8f9a851..e47c2c2 100644 --- a/types.go +++ b/types.go @@ -20,6 +20,15 @@ import ( // The zero value is not usable. Always construct via NewWaitingRoom or // initialise manually with Init. // +// # Cookie security +// +// By default the waiting-room session cookie is issued WITHOUT the Secure +// flag so that plain-HTTP local development works without configuration. +// Call SetSecureCookie(true) before traffic arrives in any deployment that +// serves the application over HTTPS (directly or via a TLS-terminating +// proxy such as Cloudflare, nginx, or an AWS ALB). Alternatively, use +// SetSecureCookieFromRequest to derive the flag from each incoming request. +// // Related: NewWaitingRoom, Init, Middleware, RegisterRoutes type WaitingRoom struct { sem sema.Semaphore @@ -27,19 +36,24 @@ type WaitingRoom struct { nextTicket atomic.Int64 nowServing atomic.Int64 mu sync.Mutex - cond *sync.Cond html []byte tokens *tokenStore stopReaper context.CancelFunc reaperInterval atomic.Int64 reaperRestart chan struct{} initialised atomic.Bool + callbacks *callbackRegistry + secureCookie atomic.Bool + maxQueueDepth atomic.Int64 + cookiePath atomic.Value // string + cookieDomain atomic.Value // string } // ticketEntry holds the state for a single queued client. type ticketEntry struct { ticket int64 issuedAt time.Time + lastPoll time.Time } // tokenStore maps random token strings to ticketEntry values. @@ -73,6 +87,70 @@ func (ts *tokenStore) delete(token string) { delete(ts.entries, token) } +// deleteIfExpired atomically checks expiry and deletes the token under a +// single write lock. Returns true if the token existed and was expired. +// This eliminates the TOCTOU window between separate isExpired + delete calls. +func (ts *tokenStore) deleteIfExpired(token string) bool { + ts.mu.Lock() + defer ts.mu.Unlock() + entry, ok := ts.entries[token] + if !ok { + return false + } + if time.Since(entry.issuedAt) > cookieTTL { + delete(ts.entries, token) + return true + } + return false +} + +// touchIssuedAt resets the issuedAt timestamp for a token to now, +// preventing the reaper from evicting a client that is actively polling. +func (ts *tokenStore) touchIssuedAt(token string) { + ts.mu.Lock() + defer ts.mu.Unlock() + entry, ok := ts.entries[token] + if !ok { + return + } + entry.issuedAt = time.Now() + ts.entries[token] = entry +} + +// touchLastPoll updates the lastPoll timestamp and returns the previous +// value. Callers use this to enforce per-token poll rate limits. +func (ts *tokenStore) touchLastPoll(token string) (previous time.Time, ok bool) { + ts.mu.Lock() + defer ts.mu.Unlock() + entry, exists := ts.entries[token] + if !exists { + return time.Time{}, false + } + previous = entry.lastPoll + entry.lastPoll = time.Now() + ts.entries[token] = entry + return previous, true +} + +// len returns the number of entries in the token store. +func (ts *tokenStore) len() int { + ts.mu.RLock() + defer ts.mu.RUnlock() + return len(ts.entries) +} + +// isExpired reports whether the token exists and has exceeded cookieTTL. +// Deprecated: prefer deleteIfExpired to avoid the TOCTOU window. +func (ts *tokenStore) isExpired(token string) bool { + ts.mu.RLock() + defer ts.mu.RUnlock() + entry, ok := ts.entries[token] + if !ok { + return true + } + return time.Since(entry.issuedAt) > cookieTTL +} + // statusResponse is the JSON payload served by StatusHandler. type statusResponse struct { Ready bool `json:"ready"`