diff --git a/README.md b/README.md index 094a3ee..977a24f 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,11 @@ page with their position. As slots open, they're admitted automatically in FIFO order. Your handlers never know the difference — they see normal requests arriving at the rate you chose. +High-value clients can **skip the line** by paying a per-position fee. The +price updates in real time as the queue moves. Paid clients receive a +**time-limited VIP pass** that auto-promotes them on re-entry — no second +payment required for the configured window. + ```go wr := &room.WaitingRoom{} wr.Init(500) @@ -37,7 +42,8 @@ bash test.sh The test script builds the server, launches 30 concurrent clients, and prints a live dashboard while the waiting room queues and admits them. Open `http://localhost:8080/` in your browser while it runs to see your -position tick down in real time. +position tick down in real time — and click "Skip the line" to test the +payment flow. ``` [ 15s] sent:120 served:42 queued:78 err:0 active:83 ~2 req/s [wave 4] @@ -79,8 +85,9 @@ When your application is at capacity you have three choices: `room` does the third. It gives you FIFO ordering, live position tracking, a polished waiting-room page, lifecycle callbacks for autoscaling, a reaper that cleans up abandoned clients, configurable cookie security, runtime -capacity adjustment, and a max-queue-depth circuit breaker — all behind a -single middleware call. +capacity adjustment, a max-queue-depth circuit breaker, skip-the-line +payments with real-time pricing, and time-limited VIP passes — all behind +a single middleware call. --- @@ -135,6 +142,8 @@ required. ## What your users see +waiting_room.html preview + When a request can't be served immediately, `room` responds with a self-contained HTML page that polls `/queue/status` every 3 seconds and updates the position in place: @@ -142,12 +151,240 @@ updates the position in place: - **Queue position** — a large, visible number that ticks down - **Auto-admit** — the page reloads automatically when `ready=true` - **No refresh needed** — the status text updates live +- **Skip the line** — optional payment card with live pricing (appears only when configured) +- **VIP pass badge** — shown when the client has an active pass (auto-promoting) - **Dark theme** — clean, modern design that works on mobile - **Accessible** — uses `aria-live` regions for screen readers -Replace the default page with your own via `wr.SetHTML(myHTML)`. The only -contract is `{{.Position}}` for the queue number and a `fetch("/queue/status")` -poll loop in your JavaScript. +Replace the default page with your own via `wr.SetHTML(myHTML)`. The +template contract is: + +| Placeholder | Replaced with | +|---|---| +| `{{.Position}}` | Client's queue position (integer) | +| `{{.SkipURL}}` | Payment page URL (empty string if not configured) | + +Your JavaScript should poll `fetch("/queue/status")` and react to these +JSON fields: `ready`, `position`, `skip_cost`, `rate_per_pos`, `has_pass`. + +--- + +## Skip the line — paid queue jumping + +`room` has built-in support for letting clients pay to skip the queue. The +pricing model is simple: the cost to jump is `distance × rate`, where +distance is how many positions the client wants to skip and rate is a +per-position price you define. + +### How it works + +1. You configure a `RateFunc` (pricing) and a `SkipURL` (payment page) +2. The waiting room page shows a "Skip the line" card with the live price +3. The client clicks "Pay to skip" and is sent to your payment handler +4. After payment verification, you call `wr.PromoteTokenToFront(token)` +5. The client's next poll returns `ready=true` and they're admitted + +The price updates on every poll — if the queue shrinks while they're +deciding, the price drops in real time. + +### Basic setup + +```go +wr := &room.WaitingRoom{} +wr.Init(500) +defer wr.Stop() + +// $2.50 per position. Position 40 → front costs $97.50. +wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + +// Where the "Pay to skip" button sends the client. +wr.SetSkipURL("/queue/purchase") + +// Register your payment handler BEFORE RegisterRoutes so it +// bypasses the waiting room. +r.GET("/queue/purchase", handlePurchasePage) +r.POST("/queue/purchase/confirm", handlePurchaseConfirm) + +wr.RegisterRoutes(r) +``` + +### Surge pricing + +The `RateFunc` receives the current queue depth, so you can implement +dynamic pricing: + +```go +// Base $1.00 + 5¢ per person in the queue. +// Queue of 100 → $6.00/position. Queue of 10 → $1.50/position. +wr.SetRateFunc(func(depth int64) float64 { + return 1.00 + float64(depth)*0.05 +}) +``` + +### Quoting the price + +Use `QuoteCost` to show the client what they'll pay before they commit. +It's read-only — no state changes: + +```go +cost, err := wr.QuoteCost(token, 1) // cost to jump to position 1 +if err != nil { + // ErrTokenNotFound, ErrAlreadyAdmitted, ErrPromotionDisabled, etc. +} +``` + +### Processing the payment + +After your payment provider confirms the charge: + +```go +result, err := wr.PromoteTokenToFront(token) +if err != nil { ... } + +log.Printf("promoted for $%.2f", result.Cost) + +// Set the VIP pass cookie if a pass was issued. +if result.PassToken != "" { + http.SetCookie(w, &http.Cookie{ + Name: "room_pass", + Value: result.PassToken, + Path: wr.CookiePath(), + MaxAge: int(wr.PassDuration().Seconds()), + HttpOnly: true, + Secure: true, + }) +} +``` + +`PromoteToken` also accepts an intermediate target position if you want +to offer partial skips at a lower price: + +```go +// Jump from position 50 to position 10 instead of position 1. +result, err := wr.PromoteToken(token, 10) +``` + +### Stripe integration pattern + +```go +// Before RegisterRoutes — bypasses the waiting room. +r.GET("/queue/purchase", func(c *gin.Context) { + cookie, _ := c.Request.Cookie("room_ticket") + cost, _ := wr.QuoteCost(cookie.Value, 1) + + session, _ := stripe.CheckoutSessions.New(&stripe.CheckoutSessionParams{ + LineItems: []*stripe.CheckoutSessionLineItemParams{{ + PriceData: &stripe.CheckoutSessionLineItemPriceDataParams{ + Currency: stripe.String("usd"), + UnitAmount: stripe.Int64(int64(cost * 100)), + ProductData: &stripe.CheckoutSessionLineItemPriceDataProductDataParams{ + Name: stripe.String("Skip the line"), + }, + }, + Quantity: stripe.Int64(1), + }}, + Mode: stripe.String("payment"), + ClientReferenceID: stripe.String(cookie.Value), + SuccessURL: stripe.String("https://example.com/"), + CancelURL: stripe.String("https://example.com/"), + }) + c.Redirect(http.StatusSeeOther, session.URL) +}) + +// Stripe webhook. +r.POST("/stripe/webhook", func(c *gin.Context) { + event := verifyStripeSignature(c) + session := event.Data.Object + token := session["client_reference_id"].(string) + + result, err := wr.PromoteTokenToFront(token) + if err != nil { c.Status(400); return } + + // Pass cookie is set on the client's next page load via the + // success URL handler — Stripe webhooks can't set cookies. + storePassForClient(token, result.PassToken) + + c.Status(200) +}) +``` + +--- + +## VIP passes — durable skip-the-line access + +By default, a promotion is single-use: the client is moved to the front +once, and if they re-enter the queue later, they wait like everyone else. + +**VIP passes** change this. When configured, every promotion automatically +issues a time-limited pass. If the client is evicted, times out, refreshes, +or re-enters the queue during the pass window, they are silently +auto-promoted to the front — no second payment required. + +### Enabling passes + +```go +// Pay once, skip for 90 minutes. +wr.SetPassDuration(90 * time.Minute) +``` + +That's it. `PromoteToken` and `PromoteTokenToFront` now return a +`PassToken` in their result, and the middleware checks for the `room_pass` +cookie on every request. + +### How it works under the hood + +1. Client pays and you call `PromoteTokenToFront` → result includes `PassToken` +2. You set `PassToken` as the `room_pass` cookie (alongside `room_ticket`) +3. Client is admitted, browses, eventually their slot is released +4. Client returns → new ticket is issued → middleware sees `room_pass` cookie +5. Middleware calls `autoPromote` → client jumps to front automatically +6. Pass expires after the configured duration → client returns to standard FIFO + +### Pass lifecycle + +| Event | What happens | +|---|---| +| Payment confirmed | `PromoteTokenToFront` returns `PassToken`; you set the `room_pass` cookie | +| Client admitted | Normal flow — pass is not consumed, just checked | +| Client re-enters queue | Middleware sees valid `room_pass`, calls `autoPromote` | +| Pass expires | `room_pass` cookie expires; `passStore` reaper cleans up server-side | +| Client returns after expiry | Standard FIFO — must pay again to skip | + +### Configuration + +```go +// Enable 90-minute passes. +wr.SetPassDuration(90 * time.Minute) + +// Valid range: 1 minute – 24 hours. 0 disables passes (default). +wr.SetPassDuration(0) // back to single-use promotions + +// Check programmatically. +if wr.PassDuration() > 0 { + log.Printf("passes enabled: %s", wr.PassDuration()) +} + +// Check a specific pass token. +if wr.HasValidPass(passToken) { + log.Println("client has active VIP pass") +} +``` + +### The status endpoint and passes + +When a client has a valid pass, the `/queue/status` response changes: + +```json +{ + "ready": false, + "position": 3, + "has_pass": true +} +``` + +Note that `skip_cost` and `rate_per_pos` are omitted — the client already +paid. The default waiting room page reacts to `has_pass` by showing a +"VIP pass active" badge instead of the purchase offer. --- @@ -177,6 +414,12 @@ wr.On(room.EventExit, func(s room.Snapshot) { metrics.Inc("room.exit") }) wr.On(room.EventQueue, func(s room.Snapshot) { metrics.Inc("room.queue") }) wr.On(room.EventEvict, func(s room.Snapshot) { metrics.Inc("room.evict") }) wr.On(room.EventTimeout, func(s room.Snapshot) { metrics.Inc("room.timeout") }) + +// Track skip-the-line revenue. +wr.On(room.EventPromote, func(s room.Snapshot) { + metrics.Inc("room.promote") + log.Printf("queue jump — depth now %d", s.QueueDepth) +}) ``` Every handler receives a **Snapshot** — a point-in-time copy of the room's @@ -208,6 +451,7 @@ never stalls the request path. Remove handlers at any time with `wr.Off(event)`. | `EventQueue` | Request issued a waiting-room ticket | Queue depth alerting | | `EventEvict` | Reaper removes an abandoned token | Ghost ticket monitoring | | `EventTimeout` | Request context cancelled before admission | Client timeout tracking | +| `EventPromote` | Client promoted via payment or VIP pass | Revenue tracking, fairness monitoring | `EventFull` and `EventDrain` fire only on the **transition edge** — not on every admission while full. This means your autoscaler callback fires @@ -239,10 +483,20 @@ wr.SetMaxQueueDepth(10000) // Tighten the reaper for a high-traffic event. wr.SetReaperInterval(15 * time.Second) +// Skip-the-line with surge pricing and 2-hour VIP passes. +wr.SetRateFunc(func(depth int64) float64 { + return 1.00 + float64(depth)*0.05 +}) +wr.SetSkipURL("/queue/purchase") +wr.SetPassDuration(2 * time.Hour) + // Register lifecycle hooks before traffic arrives. wr.On(room.EventFull, func(s room.Snapshot) { go provisionHost() }) +wr.On(room.EventPromote, func(s room.Snapshot) { + metrics.Inc("room.skip_revenue") +}) wr.RegisterRoutes(r) r.Run(":8080") @@ -286,6 +540,7 @@ func onConfigReload(cfg Config) { wr.SetCap(int32(cfg.MaxConcurrent)) wr.SetReaperInterval(cfg.ReaperInterval) wr.SetMaxQueueDepth(cfg.MaxQueueDepth) + wr.SetPassDuration(cfg.PassDuration) } ``` @@ -319,6 +574,12 @@ Every setter is safe to call while traffic is flowing. │ ▼ ┌──────────────────────┐ + │ Has room_pass? │──YES──▶ autoPromote + │ (VIP pass cookie) │ (jump to front) + └──────────┬──────────┘ + │ NO + ▼ + ┌──────────────────────┐ │ Client Polls │ GET /queue/status │ /queue/status │ every 3s + jitter └──────────┬──────────┘ @@ -336,8 +597,10 @@ Every setter is safe to call while traffic is flowing. | **Serving window** | `nowServing + cap` determines who gets in | | **Semaphore** | Enforces concurrent slot limit via [sema](https://github.com/andreimerlescu/sema) | | **Token store** | Maps session cookies to tickets for poll-based admission | -| **Reaper** | Evicts abandoned tokens, advances the queue past ghost tickets | +| **Pass store** | Maps VIP pass cookies to expiry times for auto-promotion | +| **Reaper** | Evicts abandoned tokens and expired passes, advances the queue past ghost tickets | | **Callbacks** | Fires lifecycle events for autoscaling and observability | +| **Promoter** | Serialized ticket reassignment for skip-the-line with unique ticket guarantees | --- @@ -351,6 +614,8 @@ Every setter is safe to call while traffic is flowing. | **Ghost tickets** | The reaper runs on a configurable interval (default 5m), evicts expired tokens, and advances `nowServing` so the queue doesn't stall behind abandoned clients. | | **Cookie scoping** | `SetCookiePath` and `SetCookieDomain` let you restrict cookie visibility in multi-app deployments. `SameSite=Lax` is set by default. | | **Capacity enforcement** | The `nowServing` window guard prevents the serving window from inflating beyond `cap` even under adversarial client disconnection patterns. | +| **Promotion serialization** | `PromoteToken` acquires a dedicated mutex and uses a monotonic insert counter to guarantee unique ticket assignment even under concurrent promotions. | +| **Pass expiry** | VIP passes have a server-side expiry check (not just cookie MaxAge). The reaper sweeps expired passes on every cycle. Expired `room_pass` cookies are ignored. | --- @@ -382,6 +647,9 @@ wr.SetSecureCookie(secure bool) wr.SetMaxQueueDepth(max int64) error wr.SetCookiePath(path string) wr.SetCookieDomain(domain string) +wr.SetRateFunc(fn room.RateFunc) +wr.SetSkipURL(url string) +wr.SetPassDuration(d time.Duration) error // ── Introspection ───────────────────────────────────────── @@ -394,6 +662,16 @@ wr.ReaperInterval() time.Duration wr.MaxQueueDepth() int64 wr.CookiePath() string wr.CookieDomain() string +wr.SkipURL() string +wr.PassDuration() time.Duration +wr.HasValidPass(passToken string) bool + +// ── Skip the line ───────────────────────────────────────── + +wr.QuoteCost(token string, targetPosition int64) (float64, error) +wr.PromoteToken(token string, targetPosition int64) (room.PromoteResult, error) +wr.PromoteTokenToFront(token string) (room.PromoteResult, error) +wr.GrantPass() string // ── Lifecycle callbacks ─────────────────────────────────── @@ -401,6 +679,22 @@ wr.On(event room.Event, fn room.CallbackFunc) wr.Off(event room.Event) ``` +### PromoteResult + +```go +type PromoteResult struct { + Cost float64 // price computed at promotion time + PassToken string // VIP pass token (empty if passes disabled) +} +``` + +### RateFunc + +```go +// Receives current queue depth, returns per-position cost. +type RateFunc func(queueDepth int64) float64 +``` + --- ## Testing @@ -416,19 +710,25 @@ make all # vet, test, race, fuzz (30s), bench BenchmarkFastPath-28 429842 2751 ns/op 5318 B/op 13 allocs/op BenchmarkQueueDepth-28 1000000000 0.64 ns/op 0 B/op 0 allocs/op BenchmarkUtilization-28 1000000000 0.88 ns/op 0 B/op 0 allocs/op +BenchmarkQuoteCost-28 ... +BenchmarkPromoteToken-28 ... +BenchmarkGrantPass-28 ... +BenchmarkHasValidPass-28 ... ``` The fast path (request admitted immediately) completes in under 3μs including cookie handling and semaphore acquisition. `QueueDepth` and `UtilizationSmoothed` are sub-nanosecond — safe to call from hot -dashboards and autoscaler feedback loops. +dashboards and autoscaler feedback loops. `QuoteCost` and `HasValidPass` +are lock-free reads suitable for high-frequency polling. --- ## Sample app The [`sample/basic-web-app`](sample/basic-web-app/) directory contains a -complete tutorial that walks through every feature: +complete tutorial that walks through every feature including skip-the-line +payments and VIP passes: ```bash cd sample/basic-web-app @@ -444,8 +744,15 @@ open http://localhost:8080 # Browser: see the waiting room live ab -c 100 -n 1000 localhost:8080/about # Terminal 2: generate load ``` +The sample configures `$2.50/position` flat pricing with 90-minute VIP +passes. When you land in the waiting room, click "Skip the line" to see +the payment confirmation page, complete the (simulated) payment, and +get admitted immediately. Refresh the page — you'll be auto-promoted +thanks to your VIP pass. + The tutorial covers capacity tuning, lifecycle callbacks, log filtering, -runtime capacity adjustment, custom HTML, and common mistakes. +runtime capacity adjustment, skip-the-line pricing, VIP pass configuration, +custom HTML, and common mistakes. \[ [Read the full tutorial →](sample/basic-web-app/README.md) \] @@ -459,6 +766,7 @@ Apache 2.0 — see [LICENSE](LICENSE). *Built on [sema](https://github.com/andreimerlescu/sema) by [Andrei Merlescu](https://github.com/andreimerlescu). FIFO ordering, live -position tracking, edge-triggered lifecycle callbacks, a reaper that keeps -ghost tickets from stalling your queue, and a circuit breaker that protects -your memory when the queue gets too deep.* \ No newline at end of file +position tracking, edge-triggered lifecycle callbacks, skip-the-line +payments with real-time surge pricing, time-limited VIP passes with +auto-promotion, a reaper that keeps ghost tickets from stalling your queue, +and a circuit breaker that protects your memory when the queue gets too deep.* \ No newline at end of file diff --git a/callback.go b/callback.go index b22dbe8..5540058 100644 --- a/callback.go +++ b/callback.go @@ -43,6 +43,11 @@ const ( // EventTimeout fires when a queued request's context is cancelled or // its deadline expires before a slot becomes available. EventTimeout + + // EventPromote fires when a queued token is promoted to a higher + // position via PromoteToken. Use this to track revenue events or + // log queue jumps for fairness monitoring. + EventPromote ) // String returns the canonical name of the Event, suitable for logging. @@ -62,6 +67,8 @@ func (e Event) String() string { return "Evict" case EventTimeout: return "Timeout" + case EventPromote: + return "Promote" default: return "Unknown" } diff --git a/callback_test.go b/callback_test.go index cf4c5d1..b88dd75 100644 --- a/callback_test.go +++ b/callback_test.go @@ -23,6 +23,7 @@ func TestEvent_String(t *testing.T) { {EventQueue, "Queue"}, {EventEvict, "Evict"}, {EventTimeout, "Timeout"}, + {EventPromote, "Promote"}, {Event(255), "Unknown"}, } for _, tc := range cases { @@ -246,7 +247,7 @@ func TestConcurrent_OnOffEmit_AllEvents(t *testing.T) { t.Parallel() wr := newTestWR(t, 5) - events := []Event{EventEnter, EventExit, EventFull, EventDrain, EventQueue, EventEvict, EventTimeout} + events := []Event{EventEnter, EventExit, EventFull, EventDrain, EventQueue, EventEvict, EventTimeout, EventPromote} var wg sync.WaitGroup for _, ev := range events { ev := ev diff --git a/const.go b/const.go index 4da4189..95ac2f3 100644 --- a/const.go +++ b/const.go @@ -6,6 +6,12 @@ const ( // cookieName is the HTTP-only session cookie issued to queued clients. cookieName = "room_ticket" + // passCookieName is the HTTP-only cookie that holds the VIP pass + // token. This cookie outlives individual queue tickets — it persists + // for the configured pass duration so that clients who paid to skip + // are auto-promoted on re-entry without paying again. + passCookieName = "room_pass" + // cookieTTL is how long a queued client's token remains valid. // If a client disappears before being admitted, their token is // evicted by the reaper after this duration. @@ -46,4 +52,15 @@ const ( // /queue/status polls for a single token. Polls arriving faster // than this receive a cached response with a Retry-After header. statusPollMinInterval = 1 * time.Second + + // defaultPassDuration is the default lifetime of a VIP pass issued + // by PromoteTokenToFront / GrantPass. Zero means passes are disabled + // (single-use promotion only). Call SetPassDuration to enable. + defaultPassDuration = 0 + + // passMinDuration is the minimum value accepted by SetPassDuration. + passMinDuration = 1 * time.Minute + + // passMaxDuration is the maximum value accepted by SetPassDuration. + passMaxDuration = 24 * time.Hour ) diff --git a/errors.go b/errors.go index 1e4fbd7..5d598a9 100644 --- a/errors.go +++ b/errors.go @@ -46,3 +46,51 @@ type ErrInvalidMaxQueueDepth struct { func (e ErrInvalidMaxQueueDepth) Error() string { return fmt.Sprintf("room: invalid max queue depth %d: must be >= 0", e.Given) } + +// ErrPromotionDisabled is returned when PromoteToken or QuoteCost is +// called without a RateFunc configured via SetRateFunc. +type ErrPromotionDisabled struct{} + +func (e ErrPromotionDisabled) Error() string { + return "room: promotions disabled — call SetRateFunc first" +} + +// ErrTokenNotFound is returned when the token does not exist in the +// token store (expired, already admitted, or never issued). +type ErrTokenNotFound struct{} + +func (e ErrTokenNotFound) Error() string { + return "room: token not found" +} + +// ErrAlreadyAdmitted is returned when a promotion is attempted on a +// token that is already within the serving window. +type ErrAlreadyAdmitted struct{} + +func (e ErrAlreadyAdmitted) Error() string { + return "room: token already within serving window" +} + +// ErrInvalidTargetPosition is returned when targetPosition < 1. +type ErrInvalidTargetPosition struct { + Given int64 +} + +func (e ErrInvalidTargetPosition) Error() string { + return fmt.Sprintf("room: invalid target position %d: must be >= 1", e.Given) +} + +// ErrPassDuration is returned by SetPassDuration when the provided +// duration falls outside [passMinDuration, passMaxDuration]. +type ErrPassDuration struct { + Given time.Duration + Min time.Duration + Max time.Duration +} + +func (e ErrPassDuration) Error() string { + return fmt.Sprintf( + "room: pass duration %s out of range [%s, %s]", + e.Given, e.Min, e.Max, + ) +} diff --git a/new.go b/new.go index 6d41f9d..7cea3fd 100644 --- a/new.go +++ b/new.go @@ -3,7 +3,9 @@ package room import ( "context" "fmt" + "math" "net/http" + "time" "github.com/andreimerlescu/sema" "github.com/gin-gonic/gin" @@ -84,6 +86,7 @@ func (wr *WaitingRoom) Init(cap int32) error { wr.cap.Store(cap) wr.sem = sema.Must(int(cap)) wr.tokens = newTokenStore() + wr.passes = newPassStore() wr.reaperRestart = make(chan struct{}, 1) wr.nowServing.Store(0) wr.nextTicket.Store(0) @@ -92,6 +95,10 @@ func (wr *WaitingRoom) Init(cap int32) error { wr.maxQueueDepth.Store(defaultMaxQueueDepth) wr.cookiePath.Store("/") wr.cookieDomain.Store("") + wr.rateFunc.Store((*rateFuncHolder)(nil)) + wr.promoteInsert.Store(math.MaxInt64) + wr.skipURL.Store("") + wr.passDuration.Store(int64(defaultPassDuration)) wr.initialised.Store(true) wr.callbacks = newCallbackRegistry() @@ -179,6 +186,119 @@ func (wr *WaitingRoom) CookieDomain() string { return wr.cookieDomain.Load().(string) } +// SetSkipURL sets the URL that the waiting room "Pay to skip" button +// navigates to. This is typically a payment page (Stripe Checkout, +// crypto invoice, etc.) that your application hosts. +// +// The handler at this URL can read the room_ticket cookie to identify +// which queued client is paying. After payment verification, call +// PromoteTokenToFront to move them to the front of the queue. +// +// If empty (the default), the skip-the-line offer is never shown on +// the waiting room page, even if a RateFunc is configured. Both +// SetRateFunc and SetSkipURL must be set for the offer to appear. +// +// Safe to call at any time. +// +// Related: SetRateFunc, PromoteToken, PromoteTokenToFront +func (wr *WaitingRoom) SetSkipURL(url string) { + wr.skipURL.Store(url) +} + +// SkipURL returns the current skip-the-line payment URL. +func (wr *WaitingRoom) SkipURL() string { + return wr.skipURL.Load().(string) +} + +// SetPassDuration sets the lifetime of VIP passes issued by GrantPass. +// After a client pays to skip the line, they receive a pass that +// auto-promotes them for this duration on any subsequent queue entry, +// without requiring another payment. +// +// A value of 0 disables passes (the default). When disabled, promotions +// are single-use: the client is promoted once and must pay again if they +// re-enter the queue. +// +// Valid range when non-zero: 1m – 24h. Values outside this range return +// ErrPassDuration. +// +// Safe to call at any time. +// +// Related: GrantPass, HasValidPass, PassDuration +func (wr *WaitingRoom) SetPassDuration(d time.Duration) error { + if d == 0 { + wr.passDuration.Store(0) + return nil + } + if d < passMinDuration || d > passMaxDuration { + return ErrPassDuration{Given: d, Min: passMinDuration, Max: passMaxDuration} + } + wr.passDuration.Store(int64(d)) + return nil +} + +// PassDuration returns the current VIP pass lifetime. Zero means passes +// are disabled (single-use promotions only). +func (wr *WaitingRoom) PassDuration() time.Duration { + return time.Duration(wr.passDuration.Load()) +} + +// GrantPass creates a time-limited VIP pass and returns the pass token. +// The caller is responsible for setting the room_pass cookie on the +// HTTP response with this token value. +// +// The pass expires after PassDuration from now. If PassDuration is 0 +// (passes disabled), GrantPass returns an empty string and no pass is +// created. +// +// Typical usage in your payment confirmation handler: +// +// cost, err := wr.PromoteTokenToFront(ticketToken) +// if err != nil { ... } +// passToken := wr.GrantPass() +// if passToken != "" { +// http.SetCookie(w, &http.Cookie{ +// Name: "room_pass", +// Value: passToken, +// Path: wr.CookiePath(), +// MaxAge: int(wr.PassDuration().Seconds()), +// HttpOnly: true, +// Secure: true, +// }) +// } +// +// Related: SetPassDuration, HasValidPass, PromoteTokenToFront +func (wr *WaitingRoom) GrantPass() string { + d := wr.PassDuration() + if d == 0 { + return "" + } + + token, err := generateToken() + if err != nil { + return "" + } + + wr.passes.set(token, passEntry{ + expiresAt: time.Now().Add(d), + }) + + return token +} + +// HasValidPass reports whether the given pass token exists and has not +// expired. Use this in your middleware or handlers to check if a client +// should be auto-promoted. +// +// Related: GrantPass, SetPassDuration +func (wr *WaitingRoom) HasValidPass(passToken string) bool { + if passToken == "" { + return false + } + _, ok := wr.passes.get(passToken) + return ok +} + // checkInitialised aborts the request with 500 and returns false if the // WaitingRoom has not been initialised. Prevents nil pointer dereferences // on zero-value WaitingRoom structs. diff --git a/promote.go b/promote.go new file mode 100644 index 0000000..d23d54b --- /dev/null +++ b/promote.go @@ -0,0 +1,200 @@ +package room + +// RateFunc returns the per-position cost given the current queue depth. +// Implementations can return a flat rate, a curve, or surge pricing. +// +// Examples: +// +// // Flat: $1 per position +// func(depth int64) float64 { return 1.00 } +// +// // Surge: price increases with queue depth +// func(depth int64) float64 { return 0.50 + float64(depth)*0.01 } +type RateFunc func(queueDepth int64) float64 + +// SetRateFunc sets the per-position pricing function used by QuoteCost +// and PromoteToken. If nil, promotions are disabled and PromoteToken +// returns ErrPromotionDisabled. +// +// Safe to call at any time including while requests are in flight. +func (wr *WaitingRoom) SetRateFunc(fn RateFunc) { + if fn == nil { + wr.rateFunc.Store((*rateFuncHolder)(nil)) + return + } + wr.rateFunc.Store(&rateFuncHolder{fn: fn}) +} + +// rateFuncHolder wraps a RateFunc so that atomic.Value always stores the +// same concrete type. atomic.Value panics if Store is called with a +// different concrete type than a previous Store, so we cannot alternate +// between a nil interface and a concrete function value. Wrapping in a +// pointer-to-struct avoids this: we always store *rateFuncHolder (which +// may itself be nil). +type rateFuncHolder struct { + fn RateFunc +} + +// rateFuncLoad returns the current RateFunc or nil if unset. +func (wr *WaitingRoom) rateFuncLoad() RateFunc { + v := wr.rateFunc.Load() + if v == nil { + return nil + } + h, ok := v.(*rateFuncHolder) + if !ok || h == nil { + return nil + } + return h.fn +} + +// PromoteResult is returned by PromoteToken and PromoteTokenToFront. +// It contains the computed cost and, when pass duration is configured, +// a pass token that should be set as the room_pass cookie on the +// client's response. +type PromoteResult struct { + // Cost is the price computed at promotion time based on the + // current RateFunc and queue position. + Cost float64 + + // PassToken is the VIP pass token to set as the room_pass cookie. + // Empty if SetPassDuration was not called or is 0. + PassToken string +} + +// QuoteCost returns the cost for a queued token to jump to targetPosition. +// targetPosition=1 means next to be admitted. Returns the cost without +// modifying any state — use this to display pricing on the waiting room page. +// +// Returns ErrPromotionDisabled if no RateFunc is set. +// Returns ErrTokenNotFound if the token does not exist in the queue. +// Returns ErrAlreadyAdmitted if the token is already within the serving window. +// Returns ErrInvalidTargetPosition if targetPosition < 1. +// +// Related: PromoteToken, SetRateFunc +func (wr *WaitingRoom) QuoteCost(token string, targetPosition int64) (float64, error) { + if targetPosition < 1 { + return 0, ErrInvalidTargetPosition{Given: targetPosition} + } + + fn := wr.rateFuncLoad() + if fn == nil { + return 0, ErrPromotionDisabled{} + } + + entry, ok := wr.tokens.get(token) + if !ok { + return 0, ErrTokenNotFound{} + } + + currentPosition := wr.positionOf(entry.ticket) + if currentPosition <= 0 { + return 0, ErrAlreadyAdmitted{} + } + + if targetPosition >= currentPosition { + return 0, nil // already at or ahead of target + } + + distance := currentPosition - targetPosition + rate := fn(wr.QueueDepth()) + return float64(distance) * rate, nil +} + +// PromoteToken moves a queued token to targetPosition in the queue. +// targetPosition=1 means next to be admitted. +// +// The caller is responsible for payment verification before calling this. +// PromoteToken does not handle payments — it only reassigns the ticket. +// +// When SetPassDuration has been configured with a non-zero duration, the +// returned PromoteResult includes a PassToken. The caller must set this +// as the room_pass cookie on the HTTP response so that the client is +// auto-promoted on subsequent queue entries without paying again. +// +// Promotion is serialized: only one PromoteToken call executes at a time +// to prevent two promotions from claiming the same ticket slot. Each +// successive promotion to the same target position is placed one position +// behind the previous promotee, ensuring unique ticket assignment. +// +// Returns ErrPromotionDisabled if no RateFunc is set. +// Returns ErrTokenNotFound if the token does not exist. +// Returns ErrAlreadyAdmitted if already within the serving window. +// Returns ErrInvalidTargetPosition if targetPosition < 1. +// +// Related: QuoteCost, PromoteTokenToFront, SetRateFunc, SetPassDuration +func (wr *WaitingRoom) PromoteToken(token string, targetPosition int64) (PromoteResult, error) { + if targetPosition < 1 { + return PromoteResult{}, ErrInvalidTargetPosition{Given: targetPosition} + } + + fn := wr.rateFuncLoad() + if fn == nil { + return PromoteResult{}, ErrPromotionDisabled{} + } + + // Serialize promotions so two concurrent callers cannot claim + // the same synthetic ticket number. + wr.promoteMu.Lock() + defer wr.promoteMu.Unlock() + + entry, ok := wr.tokens.get(token) + if !ok { + return PromoteResult{}, ErrTokenNotFound{} + } + + currentPosition := wr.positionOf(entry.ticket) + if currentPosition <= 0 { + return PromoteResult{}, ErrAlreadyAdmitted{} + } + + if targetPosition >= currentPosition { + return PromoteResult{}, nil // no-op, already ahead + } + + distance := currentPosition - targetPosition + rate := fn(wr.QueueDepth()) + cost := float64(distance) * rate + + // Compute the new ticket number. We use promoteInsert to guarantee + // uniqueness: it tracks the lowest ticket number that the next + // promotion is allowed to claim. Each promotion takes the minimum + // of the natural ceiling (where the target position would land in + // the ticket space) and promoteInsert, then decrements promoteInsert + // so the next caller gets a strictly lower value. + // + // promoteInsert is initialised to math.MaxInt64, so the first + // promotion always uses the natural ceiling. Subsequent promotions + // use whichever is lower — the ceiling or the running counter — + // preventing collisions even when multiple tokens target the same + // position. + ceiling := wr.nowServing.Load() + int64(wr.cap.Load()) + targetPosition + insert := wr.promoteInsert.Load() + if ceiling < insert { + insert = ceiling + } + + newTicket := insert + wr.promoteInsert.Store(insert - 1) + + // Update the entry with the new ticket. + entry.ticket = newTicket + entry.promoted = true + wr.tokens.set(token, entry) + + wr.emit(EventPromote, wr.snapshot(EventPromote)) + + // Grant a time-limited VIP pass if pass duration is configured. + result := PromoteResult{Cost: cost} + result.PassToken = wr.GrantPass() + + return result, nil +} + +// PromoteTokenToFront is a convenience wrapper that promotes a token +// to position 1 (next to be admitted). +// +// Related: PromoteToken, QuoteCost +func (wr *WaitingRoom) PromoteTokenToFront(token string) (PromoteResult, error) { + return wr.PromoteToken(token, 1) +} diff --git a/promote_test.go b/promote_test.go new file mode 100644 index 0000000..ecf96a1 --- /dev/null +++ b/promote_test.go @@ -0,0 +1,1218 @@ +package room + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gin-gonic/gin" +) + +// ── RateFunc / SetRateFunc ─────────────────────────────────────────────────── + +func TestSetRateFunc_NilDisablesPromotions(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + wr.SetRateFunc(nil) + + if fn := wr.rateFuncLoad(); fn != nil { + t.Error("expected nil RateFunc after SetRateFunc(nil)") + } +} + +func TestSetRateFunc_StoresAndLoads(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + + fn := wr.rateFuncLoad() + if fn == nil { + t.Fatal("expected non-nil RateFunc") + } + if got := fn(0); got != 2.50 { + t.Errorf("expected rate 2.50, got %f", got) + } +} + +func TestRateFuncLoad_DefaultIsNil(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + if fn := wr.rateFuncLoad(); fn != nil { + t.Error("expected nil RateFunc by default") + } +} + +// ── QuoteCost ──────────────────────────────────────────────────────────────── + +func TestQuoteCost_NoRateFunc_ReturnsDisabled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + _, err := wr.QuoteCost("tok", 1) + if err == nil { + t.Fatal("expected ErrPromotionDisabled, got nil") + } + if _, ok := err.(ErrPromotionDisabled); !ok { + t.Errorf("expected ErrPromotionDisabled, got %T: %v", err, err) + } +} + +func TestQuoteCost_TokenNotFound(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + _, err := wr.QuoteCost("nonexistent", 1) + if err == nil { + t.Fatal("expected ErrTokenNotFound, got nil") + } + if _, ok := err.(ErrTokenNotFound); !ok { + t.Errorf("expected ErrTokenNotFound, got %T: %v", err, err) + } +} + +func TestQuoteCost_InvalidTargetPosition(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + for _, target := range []int64{0, -1, -100} { + _, err := wr.QuoteCost("tok", target) + if err == nil { + t.Errorf("expected ErrInvalidTargetPosition for target=%d, got nil", target) + continue + } + if _, ok := err.(ErrInvalidTargetPosition); !ok { + t.Errorf("expected ErrInvalidTargetPosition for target=%d, got %T", target, err) + } + } +} + +func TestQuoteCost_AlreadyAdmitted(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 5, issuedAt: time.Now()}) + + _, err := wr.QuoteCost("tok", 1) + if err == nil { + t.Fatal("expected ErrAlreadyAdmitted, got nil") + } + if _, ok := err.(ErrAlreadyAdmitted); !ok { + t.Errorf("expected ErrAlreadyAdmitted, got %T: %v", err, err) + } +} + +func TestQuoteCost_AlreadyAtOrAheadOfTarget(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 4, issuedAt: time.Now()}) + + cost, err := wr.QuoteCost("tok", 3) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 0 { + t.Errorf("expected cost 0 for same position, got %f", cost) + } + + cost, err = wr.QuoteCost("tok", 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 0 { + t.Errorf("expected cost 0 for behind position, got %f", cost) + } +} + +func TestQuoteCost_CorrectCalculation_FlatRate(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 2.00 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + cost, err := wr.QuoteCost("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 18.00 { + t.Errorf("expected cost 18.00, got %f", cost) + } + + cost, err = wr.QuoteCost("tok", 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 10.00 { + t.Errorf("expected cost 10.00, got %f", cost) + } +} + +func TestQuoteCost_SurgeRateUsesQueueDepth(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + + wr.SetRateFunc(func(depth int64) float64 { + return 1.0 + float64(depth)*0.10 + }) + + for i := int64(2); i <= 6; i++ { + wr.nextTicket.Store(i) + wr.tokens.set(fmt.Sprintf("tok-%d", i), ticketEntry{ + ticket: i, + issuedAt: time.Now(), + }) + } + wr.nextTicket.Store(6) + + cost, err := wr.QuoteCost("tok-6", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 6.00 { + t.Errorf("expected cost 6.00, got %f", cost) + } +} + +func TestQuoteCost_DoesNotMutateState(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + ticketBefore := int64(11) + nowServingBefore := wr.nowServing.Load() + + _, _ = wr.QuoteCost("tok", 1) + + entry, ok := wr.tokens.get("tok") + if !ok { + t.Fatal("token disappeared after QuoteCost") + } + if entry.ticket != ticketBefore { + t.Errorf("QuoteCost mutated ticket: was %d, now %d", ticketBefore, entry.ticket) + } + if wr.nowServing.Load() != nowServingBefore { + t.Errorf("QuoteCost mutated nowServing: was %d, now %d", + nowServingBefore, wr.nowServing.Load()) + } +} + +// ── PromoteToken ───────────────────────────────────────────────────────────── + +func TestPromoteToken_NoRateFunc_ReturnsDisabled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + _, err := wr.PromoteToken("tok", 1) + if err == nil { + t.Fatal("expected ErrPromotionDisabled") + } + if _, ok := err.(ErrPromotionDisabled); !ok { + t.Errorf("expected ErrPromotionDisabled, got %T", err) + } +} + +func TestPromoteToken_TokenNotFound(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + _, err := wr.PromoteToken("nonexistent", 1) + if err == nil { + t.Fatal("expected ErrTokenNotFound") + } + if _, ok := err.(ErrTokenNotFound); !ok { + t.Errorf("expected ErrTokenNotFound, got %T", err) + } +} + +func TestPromoteToken_InvalidTargetPosition(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + for _, target := range []int64{0, -1, -50} { + _, err := wr.PromoteToken("tok", target) + if err == nil { + t.Errorf("expected ErrInvalidTargetPosition for target=%d", target) + continue + } + if _, ok := err.(ErrInvalidTargetPosition); !ok { + t.Errorf("expected ErrInvalidTargetPosition for target=%d, got %T", target, err) + } + } +} + +func TestPromoteToken_AlreadyAdmitted(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 3, issuedAt: time.Now()}) + + _, err := wr.PromoteToken("tok", 1) + if err == nil { + t.Fatal("expected ErrAlreadyAdmitted") + } + if _, ok := err.(ErrAlreadyAdmitted); !ok { + t.Errorf("expected ErrAlreadyAdmitted, got %T", err) + } +} + +func TestPromoteToken_AlreadyAhead_Noop(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 4, issuedAt: time.Now()}) + + result, err := wr.PromoteToken("tok", 3) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Cost != 0 { + t.Errorf("expected cost 0 for no-op promotion, got %f", result.Cost) + } + + entry, _ := wr.tokens.get("tok") + if entry.ticket != 4 { + t.Errorf("ticket changed on no-op promotion: expected 4, got %d", entry.ticket) + } +} + +func TestPromoteToken_MovesToFront(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + result, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result.Cost != 9.0 { + t.Errorf("expected cost 9.0, got %f", result.Cost) + } + + entry, ok := wr.tokens.get("tok") + if !ok { + t.Fatal("token not found after promotion") + } + if entry.ticket != 2 { + t.Errorf("expected new ticket 2, got %d", entry.ticket) + } + + pos := wr.positionOf(entry.ticket) + if pos != 1 { + t.Errorf("expected position 1 after promotion, got %d", pos) + } +} + +func TestPromoteToken_MovesToIntermediate(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 5.00 }) + + wr.tokens.set("tok", ticketEntry{ticket: 21, issuedAt: time.Now()}) + + result, err := wr.PromoteToken("tok", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Cost != 50.0 { + t.Errorf("expected cost 50.0, got %f", result.Cost) + } + + entry, _ := wr.tokens.get("tok") + pos := wr.positionOf(entry.ticket) + if pos != 10 { + t.Errorf("expected position 10 after promotion, got %d", pos) + } +} + +func TestPromoteToken_SetsPromotedFlag(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + _, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + entry, _ := wr.tokens.get("tok") + if !entry.promoted { + t.Error("expected promoted=true after PromoteToken") + } +} + +func TestPromoteToken_ReturnsCost(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 2) + wr.SetRateFunc(func(depth int64) float64 { return 3.50 }) + + wr.tokens.set("tok", ticketEntry{ticket: 12, issuedAt: time.Now()}) + + result, err := wr.PromoteToken("tok", 4) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Cost != 21.0 { + t.Errorf("expected cost 21.0, got %f", result.Cost) + } +} + +// ── PromoteToken — EventPromote callback ───────────────────────────────────── + +func TestPromoteToken_FiresEventPromote(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + var promoteCount atomic.Int32 + wr.On(EventPromote, func(s Snapshot) { promoteCount.Add(1) }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + _, _ = wr.PromoteToken("tok", 1) + + waitForCount(t, &promoteCount, 1, 200*time.Millisecond) +} + +func TestPromoteToken_NoEventOnNoop(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + var promoteCount atomic.Int32 + wr.On(EventPromote, func(s Snapshot) { promoteCount.Add(1) }) + + wr.tokens.set("tok", ticketEntry{ticket: 4, issuedAt: time.Now()}) + _, _ = wr.PromoteToken("tok", 3) + + time.Sleep(50 * time.Millisecond) + if promoteCount.Load() != 0 { + t.Error("EventPromote fired on no-op promotion") + } +} + +func TestPromoteToken_NoEventOnError(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + var promoteCount atomic.Int32 + wr.On(EventPromote, func(s Snapshot) { promoteCount.Add(1) }) + + _, _ = wr.PromoteToken("missing", 1) + + time.Sleep(50 * time.Millisecond) + if promoteCount.Load() != 0 { + t.Error("EventPromote fired on failed promotion") + } +} + +// ── PromoteTokenToFront ────────────────────────────────────────────────────── + +func TestPromoteTokenToFront_JumpsToPositionOne(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + result, err := wr.PromoteTokenToFront("tok") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result.Cost != 9.0 { + t.Errorf("expected cost 9.0, got %f", result.Cost) + } + + entry, _ := wr.tokens.get("tok") + pos := wr.positionOf(entry.ticket) + if pos != 1 { + t.Errorf("expected position 1, got %d", pos) + } +} + +// ── Serialization: concurrent promotions ───────────────────────────────────── + +func TestPromoteToken_ConcurrentPromotions_NoCollision(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + const n = 20 + + for i := 0; i < n; i++ { + wr.tokens.set(fmt.Sprintf("tok-%d", i), ticketEntry{ + ticket: int64(10 + i*5), + issuedAt: time.Now(), + }) + } + + var wg sync.WaitGroup + var successCount atomic.Int32 + var errorCount atomic.Int32 + + for i := 0; i < n; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + _, err := wr.PromoteToken(fmt.Sprintf("tok-%d", i), 1) + if err != nil { + errorCount.Add(1) + } else { + successCount.Add(1) + } + }() + } + + wg.Wait() + + if errorCount.Load() > 0 { + t.Logf("successes=%d errors=%d (errors may include already-admitted tokens)", + successCount.Load(), errorCount.Load()) + } + + seen := make(map[int64]string) + wr.tokens.mu.RLock() + for token, entry := range wr.tokens.entries { + if prev, exists := seen[entry.ticket]; exists { + t.Errorf("ticket collision: tokens %q and %q both have ticket %d", + prev, token, entry.ticket) + } + seen[entry.ticket] = token + } + wr.tokens.mu.RUnlock() +} + +func TestPromoteToken_SerializedUnderMutex(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok-a", ticketEntry{ticket: 50, issuedAt: time.Now()}) + wr.tokens.set("tok-b", ticketEntry{ticket: 60, issuedAt: time.Now()}) + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + wr.PromoteToken("tok-a", 1) + }() + go func() { + defer wg.Done() + wr.PromoteToken("tok-b", 1) + }() + + wg.Wait() + + entryA, okA := wr.tokens.get("tok-a") + entryB, okB := wr.tokens.get("tok-b") + + if !okA || !okB { + t.Fatal("tokens disappeared after concurrent promotion") + } + + if entryA.ticket == entryB.ticket { + t.Errorf("ticket collision after serialized promotions: both have ticket %d", + entryA.ticket) + } +} + +// ── Integration: promoted token is admitted via status polling ──────────────── + +func TestIntegration_PromotedToken_AdmittedOnNextPoll(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + _, tokenA := serveWithCookie(r, "") + _, tokenB := serveWithCookie(r, "") + + if tokenA == "" || tokenB == "" { + t.Fatal("expected tokens for both queued requests") + } + + entryB, _ := wr.tokens.get(tokenB) + posB := wr.positionOf(entryB.ticket) + if posB < 2 { + t.Fatalf("expected tokenB at position >= 2, got %d", posB) + } + + _, err := wr.PromoteTokenToFront(tokenB) + if err != nil { + t.Fatalf("PromoteTokenToFront: %v", err) + } + + close(release) + + waitForStatus(t, r, tokenB, 5*time.Second) +} + +func TestIntegration_PromotedToken_ServesRequestSuccessfully(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + r := gin.New() + wr.RegisterRoutes(r) + + var served atomic.Int32 + r.GET("/", func(c *gin.Context) { + served.Add(1) + c.Status(http.StatusOK) + }) + + serving := make(chan struct{}, 1) + releaseFirst := make(chan struct{}) + + rBlocking := gin.New() + wr.RegisterRoutes(rBlocking) + rBlocking.GET("/", func(c *gin.Context) { + serving <- struct{}{} + <-releaseFirst + c.Status(http.StatusOK) + }) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + rBlocking.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + _, token := serveWithCookie(rBlocking, "") + if token == "" { + t.Fatal("no token issued") + } + + _, err := wr.PromoteTokenToFront(token) + if err != nil { + t.Fatalf("PromoteTokenToFront: %v", err) + } + + close(releaseFirst) + + waitForStatus(t, rBlocking, token, 5*time.Second) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("expected 200 after promotion, got %d", w.Code) + } +} + +// ── Integration: status endpoint includes pricing ──────────────────────────── + +func TestIntegration_StatusEndpoint_IncludesPricing(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + serveWithCookie(r, "") + serveWithCookie(r, "") + _, token := serveWithCookie(r, "") + if token == "" { + t.Fatal("no token issued") + } + + req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var resp statusResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode error: %v", err) + } + + if resp.Ready { + t.Fatal("expected ready=false while queued") + } + if resp.RatePerPos != 2.50 { + t.Errorf("expected rate_per_pos=2.50, got %f", resp.RatePerPos) + } + if resp.Position <= 1 { + t.Errorf("expected position > 1 for third queued request, got %d", resp.Position) + } + if resp.SkipCost <= 0 { + t.Errorf("expected positive skip_cost, got %f", resp.SkipCost) + } + + expectedCost := float64(resp.Position-1) * 2.50 + if resp.SkipCost != expectedCost { + t.Errorf("expected skip_cost=%f, got %f", expectedCost, resp.SkipCost) + } + + close(release) +} + +func TestIntegration_StatusEndpoint_NoPricingWithoutRateFunc(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + _, token := serveWithCookie(r, "") + if token == "" { + t.Fatal("no token issued") + } + + req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + var resp statusResponse + json.NewDecoder(w.Body).Decode(&resp) + + if resp.SkipCost != 0 { + t.Errorf("expected skip_cost=0 without RateFunc, got %f", resp.SkipCost) + } + if resp.RatePerPos != 0 { + t.Errorf("expected rate_per_pos=0 without RateFunc, got %f", resp.RatePerPos) + } + + close(release) +} + +func TestIntegration_StatusEndpoint_HidesPricingWhenPassActive(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + if err := wr.SetPassDuration(90 * time.Minute); err != nil { + t.Fatal(err) + } + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + serveWithCookie(r, "") + _, token := serveWithCookie(r, "") + if token == "" { + t.Fatal("no token issued") + } + + // Grant a pass for this client. + passToken := wr.GrantPass() + if passToken == "" { + t.Fatal("expected pass token") + } + + // Poll with both cookies. + req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + req.AddCookie(&http.Cookie{Name: passCookieName, Value: passToken}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + var resp statusResponse + json.NewDecoder(w.Body).Decode(&resp) + + if !resp.HasPass { + t.Error("expected has_pass=true") + } + if resp.SkipCost != 0 { + t.Errorf("expected skip_cost=0 when pass active, got %f", resp.SkipCost) + } + if resp.RatePerPos != 0 { + t.Errorf("expected rate_per_pos=0 when pass active, got %f", resp.RatePerPos) + } + + close(release) +} + +// ── Edge cases ─────────────────────────────────────────────────────────────── + +func TestPromoteToken_QueueMovedBetweenQuoteAndPromote(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + quotedCost, err := wr.QuoteCost("tok", 1) + if err != nil { + t.Fatalf("QuoteCost: %v", err) + } + + wr.nowServing.Add(3) + + result, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("PromoteToken: %v", err) + } + + if result.Cost >= quotedCost { + t.Errorf("expected actual cost (%f) < quoted cost (%f) after queue advancement", + result.Cost, quotedCost) + } +} + +func TestPromoteToken_ReapDoesNotEvictPromotedToken(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + _, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("PromoteToken: %v", err) + } + + wr.reap() + + if _, ok := wr.tokens.get("tok"); !ok { + t.Error("promoted token was evicted by reaper") + } +} + +func TestPromoteToken_ExpiredPromotedToken_Reaped(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ + ticket: 11, + issuedAt: time.Now().Add(-(cookieTTL + time.Minute)), + }) + + entry, _ := wr.tokens.get("tok") + entry.ticket = wr.nowServing.Load() + int64(wr.cap.Load()) + 1 + entry.promoted = true + wr.tokens.set("tok", entry) + + wr.reap() + + if _, ok := wr.tokens.get("tok"); ok { + t.Error("expired promoted token should have been reaped") + } +} + +// ── Pass system ────────────────────────────────────────────────────────────── + +func TestSetPassDuration_Zero_DisablesPasses(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + if err := wr.SetPassDuration(0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if wr.PassDuration() != 0 { + t.Errorf("expected 0, got %s", wr.PassDuration()) + } +} + +func TestSetPassDuration_ValidRange(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + cases := []time.Duration{ + passMinDuration, + 30 * time.Minute, + 90 * time.Minute, + passMaxDuration, + } + for _, d := range cases { + if err := wr.SetPassDuration(d); err != nil { + t.Errorf("expected no error for %s, got %v", d, err) + } + if wr.PassDuration() != d { + t.Errorf("expected %s, got %s", d, wr.PassDuration()) + } + } +} + +func TestSetPassDuration_InvalidRange(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + cases := []time.Duration{ + time.Second, + passMinDuration - time.Nanosecond, + passMaxDuration + time.Nanosecond, + } + for _, d := range cases { + err := wr.SetPassDuration(d) + if err == nil { + t.Errorf("expected error for %s, got nil", d) + } + if _, ok := err.(ErrPassDuration); !ok { + t.Errorf("expected ErrPassDuration for %s, got %T", d, err) + } + } +} + +func TestGrantPass_ReturnsTokenWhenConfigured(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + if err := wr.SetPassDuration(90 * time.Minute); err != nil { + t.Fatal(err) + } + + token := wr.GrantPass() + if token == "" { + t.Error("expected non-empty pass token") + } + + if !wr.HasValidPass(token) { + t.Error("freshly granted pass should be valid") + } +} + +func TestGrantPass_ReturnsEmptyWhenDisabled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + // Default pass duration is 0 (disabled). + + token := wr.GrantPass() + if token != "" { + t.Errorf("expected empty pass token when disabled, got %q", token) + } +} + +func TestHasValidPass_EmptyToken(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + if wr.HasValidPass("") { + t.Error("empty token should not be a valid pass") + } +} + +func TestHasValidPass_NonexistentToken(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + if wr.HasValidPass("does-not-exist") { + t.Error("nonexistent token should not be a valid pass") + } +} + +func TestHasValidPass_ExpiredPass(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.passes.set("expired", passEntry{ + expiresAt: time.Now().Add(-time.Minute), + }) + + if wr.HasValidPass("expired") { + t.Error("expired pass should not be valid") + } +} + +func TestHasValidPass_LivePass(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.passes.set("live", passEntry{ + expiresAt: time.Now().Add(30 * time.Minute), + }) + + if !wr.HasValidPass("live") { + t.Error("live pass should be valid") + } +} + +func TestPromoteToken_IssuesPassWhenConfigured(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + if err := wr.SetPassDuration(90 * time.Minute); err != nil { + t.Fatal(err) + } + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + result, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result.PassToken == "" { + t.Error("expected pass token in PromoteResult when pass duration configured") + } + if !wr.HasValidPass(result.PassToken) { + t.Error("issued pass token should be valid") + } +} + +func TestPromoteToken_NoPassWhenDisabled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + // Pass duration defaults to 0 (disabled). + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + result, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result.PassToken != "" { + t.Errorf("expected empty pass token when disabled, got %q", result.PassToken) + } +} + +func TestAutoPromote_WithValidPass(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + if err := wr.SetPassDuration(90 * time.Minute); err != nil { + t.Fatal(err) + } + + // Create a pass. + passToken := wr.GrantPass() + if passToken == "" { + t.Fatal("expected pass token") + } + + // Insert a queued token (simulating a re-entry). + wr.tokens.set("ticket-tok", ticketEntry{ + ticket: 50, + issuedAt: time.Now(), + }) + + // Before auto-promote: position should be far back. + posBefore := wr.positionOf(50) + if posBefore <= 0 { + t.Fatalf("expected positive position before auto-promote, got %d", posBefore) + } + + // Auto-promote (this is what the middleware calls internally). + wr.autoPromote("ticket-tok") + + entry, ok := wr.tokens.get("ticket-tok") + if !ok { + t.Fatal("token disappeared after autoPromote") + } + + posAfter := wr.positionOf(entry.ticket) + if posAfter >= posBefore { + t.Errorf("expected position to improve after autoPromote: before=%d after=%d", + posBefore, posAfter) + } + if !entry.promoted { + t.Error("expected promoted=true after autoPromote") + } +} + +func TestAutoPromote_SkipsAlreadyPromoted(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + + wr.tokens.set("tok", ticketEntry{ + ticket: 50, + issuedAt: time.Now(), + promoted: true, + }) + + ticketBefore := int64(50) + wr.autoPromote("tok") + + // autoPromote checks positionOf but since the middleware only calls + // autoPromote when !entry.promoted, we test that the function still + // works correctly when called on an already-promoted token (it should + // still promote since it checks position, not the flag). + entry, _ := wr.tokens.get("tok") + if entry.ticket == ticketBefore { + // It's acceptable that autoPromote moves it again — the position + // check is what matters, not the promoted flag. + t.Logf("autoPromote did not change ticket (position may already be front)") + } +} + +func TestAutoPromote_NoopWhenInServingWindow(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + + // Ticket 5, cap=10 → position = 5 - 0 - 10 = -5 (in window). + wr.tokens.set("tok", ticketEntry{ + ticket: 5, + issuedAt: time.Now(), + }) + + wr.autoPromote("tok") + + entry, _ := wr.tokens.get("tok") + if entry.ticket != 5 { + t.Errorf("autoPromote should not change ticket when already in window, got %d", entry.ticket) + } + if entry.promoted { + t.Error("autoPromote should not set promoted flag when already in window") + } +} + +func TestAutoPromote_MissingToken(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + // Must not panic. + wr.autoPromote("nonexistent") +} + +// ── Benchmark ──────────────────────────────────────────────────────────────── + +func BenchmarkQuoteCost(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 100, issuedAt: time.Now()}) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = wr.QuoteCost("tok", 1) + } +} + +func BenchmarkPromoteToken(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + token := fmt.Sprintf("tok-%d", i) + wr.tokens.set(token, ticketEntry{ + ticket: int64(100 + i), + issuedAt: time.Now(), + }) + _, _ = wr.PromoteToken(token, 1) + } +} + +func BenchmarkPromoteTokenToFront(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + token := fmt.Sprintf("tok-%d", i) + wr.tokens.set(token, ticketEntry{ + ticket: int64(100 + i), + issuedAt: time.Now(), + }) + _, _ = wr.PromoteTokenToFront(token) + } +} + +func BenchmarkGrantPass(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + if err := wr.SetPassDuration(90 * time.Minute); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = wr.GrantPass() + } +} + +func BenchmarkHasValidPass(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + + wr.passes.set("tok", passEntry{ + expiresAt: time.Now().Add(time.Hour), + }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = wr.HasValidPass("tok") + } +} diff --git a/reaper.go b/reaper.go index bac96e7..5f38700 100644 --- a/reaper.go +++ b/reaper.go @@ -76,8 +76,9 @@ func (wr *WaitingRoom) startReaper(ctx context.Context) { }() } -// reap performs a full eviction cycle over the token store. It loops over -// batch-sized scans until all expired tokens have been removed. +// reap performs a full eviction cycle over the token store and the pass +// store. It loops over batch-sized scans until all expired tokens have +// been removed, then sweeps the pass store for expired VIP passes. // // Only tokens whose ticket number is OUTSIDE the current serving window // (i.e. ticket > nowServing + cap) are counted toward nowServing advances. @@ -94,9 +95,14 @@ func (wr *WaitingRoom) reap() { for { evictedCount := wr.reapBatch() if evictedCount < reaperBatchSize { - return + break } } + + // Sweep expired VIP passes. This is cheap — passes are typically + // few (one per paying customer) and the sweep is a single lock + // acquisition with a linear scan. + wr.passes.reap() } // reapBatch performs a single bounded eviction pass. It returns the number diff --git a/reaper_test.go b/reaper_test.go index 87ac39f..044895c 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -434,3 +434,215 @@ func TestTokenStore_TouchLastPoll(t *testing.T) { t.Errorf("previous lastPoll too old: %v", prev2) } } + +// ── passStore — basic operations ───────────────────────────────────────────── + +func TestPassStore_SetAndGet(t *testing.T) { + ps := newPassStore() + + ps.set("pass-1", passEntry{expiresAt: time.Now().Add(10 * time.Minute)}) + + entry, ok := ps.get("pass-1") + if !ok { + t.Fatal("expected pass-1 to exist") + } + if entry.expiresAt.IsZero() { + t.Error("expiresAt should not be zero") + } +} + +func TestPassStore_GetMissing(t *testing.T) { + ps := newPassStore() + + _, ok := ps.get("nonexistent") + if ok { + t.Error("expected ok=false for missing pass") + } +} + +func TestPassStore_GetExpired_LazyEviction(t *testing.T) { + ps := newPassStore() + + ps.set("expired", passEntry{expiresAt: time.Now().Add(-time.Minute)}) + + _, ok := ps.get("expired") + if ok { + t.Error("expected expired pass to return ok=false") + } + + // Verify lazy deletion removed it. + if ps.len() != 0 { + t.Errorf("expected expired pass to be lazily deleted, got len=%d", ps.len()) + } +} + +func TestPassStore_GetLive(t *testing.T) { + ps := newPassStore() + + ps.set("live", passEntry{expiresAt: time.Now().Add(10 * time.Minute)}) + + _, ok := ps.get("live") + if !ok { + t.Error("expected live pass to return ok=true") + } + if ps.len() != 1 { + t.Errorf("expected len=1, got %d", ps.len()) + } +} + +func TestPassStore_Delete(t *testing.T) { + ps := newPassStore() + + ps.set("tok", passEntry{expiresAt: time.Now().Add(10 * time.Minute)}) + ps.delete("tok") + + _, ok := ps.get("tok") + if ok { + t.Error("expected deleted pass to return ok=false") + } +} + +func TestPassStore_Len(t *testing.T) { + ps := newPassStore() + + if ps.len() != 0 { + t.Errorf("expected len 0, got %d", ps.len()) + } + + ps.set("a", passEntry{expiresAt: time.Now().Add(10 * time.Minute)}) + ps.set("b", passEntry{expiresAt: time.Now().Add(10 * time.Minute)}) + + if ps.len() != 2 { + t.Errorf("expected len 2, got %d", ps.len()) + } +} + +// ── passStore.reap() ───────────────────────────────────────────────────────── + +func TestPassStore_Reap_RemovesExpired(t *testing.T) { + ps := newPassStore() + + ps.set("expired-1", passEntry{expiresAt: time.Now().Add(-5 * time.Minute)}) + ps.set("expired-2", passEntry{expiresAt: time.Now().Add(-1 * time.Minute)}) + ps.set("live-1", passEntry{expiresAt: time.Now().Add(30 * time.Minute)}) + + ps.reap() + + if _, ok := ps.get("expired-1"); ok { + t.Error("expired-1 should have been reaped") + } + if _, ok := ps.get("expired-2"); ok { + t.Error("expired-2 should have been reaped") + } + if _, ok := ps.get("live-1"); !ok { + t.Error("live-1 should have survived reap") + } +} + +func TestPassStore_Reap_EmptyStore(t *testing.T) { + ps := newPassStore() + // Must not panic. + ps.reap() +} + +func TestPassStore_Reap_AllLive(t *testing.T) { + ps := newPassStore() + + for i := 0; i < 5; i++ { + ps.set(fmt.Sprintf("live-%d", i), passEntry{ + expiresAt: time.Now().Add(time.Hour), + }) + } + + ps.reap() + + if ps.len() != 5 { + t.Errorf("expected 5 live passes after reap, got %d", ps.len()) + } +} + +func TestPassStore_Reap_AllExpired(t *testing.T) { + ps := newPassStore() + + for i := 0; i < 5; i++ { + ps.set(fmt.Sprintf("exp-%d", i), passEntry{ + expiresAt: time.Now().Add(-time.Hour), + }) + } + + ps.reap() + + if ps.len() != 0 { + t.Errorf("expected 0 passes after reap, got %d", ps.len()) + } +} + +// ── reap() sweeps pass store alongside token store ─────────────────────────── + +func TestReap_AlsoSweepsPassStore(t *testing.T) { + wr := newTestWR(t, 5) + + wr.passes.set("expired-pass", passEntry{ + expiresAt: time.Now().Add(-10 * time.Minute), + }) + wr.passes.set("live-pass", passEntry{ + expiresAt: time.Now().Add(30 * time.Minute), + }) + + wr.reap() + + if _, ok := wr.passes.get("expired-pass"); ok { + t.Error("expired pass should have been swept by reap") + } + if _, ok := wr.passes.get("live-pass"); !ok { + t.Error("live pass should have survived reap") + } +} + +func TestReap_PassSweepConcurrentWithTokenReap(t *testing.T) { + wr := newTestWR(t, 5) + expired := time.Now().Add(-(cookieTTL + time.Minute)) + + // Expired tokens. + for i := 0; i < 50; i++ { + wr.tokens.set(fmt.Sprintf("ghost-%d", i), ticketEntry{ + ticket: int64(100 + i), + issuedAt: expired, + }) + } + + // Mix of expired and live passes. + for i := 0; i < 20; i++ { + exp := time.Now().Add(time.Hour) + if i%2 == 0 { + exp = time.Now().Add(-time.Hour) + } + wr.passes.set(fmt.Sprintf("pass-%d", i), passEntry{expiresAt: exp}) + } + + var wg sync.WaitGroup + for range 5 { + wg.Add(1) + go func() { + defer wg.Done() + wr.reap() + }() + } + wg.Wait() + + // All ghost tokens should be gone. + if wr.tokens.len() != 0 { + t.Errorf("expected 0 tokens, got %d", wr.tokens.len()) + } + + // Only odd-indexed (live) passes should survive. + for i := 0; i < 20; i++ { + _, ok := wr.passes.get(fmt.Sprintf("pass-%d", i)) + if i%2 == 0 && ok { + t.Errorf("pass-%d (expired) should have been reaped", i) + } + if i%2 == 1 && !ok { + t.Errorf("pass-%d (live) should have survived", i) + } + } +} diff --git a/room.go b/room.go index af3e7b6..1ec03d5 100644 --- a/room.go +++ b/room.go @@ -22,6 +22,11 @@ var defaultWaitingRoomBytes []byte // until admitted — at which point the browser reloads and the request // re-enters on the fast path. // +// If the client presents a valid room_pass cookie (issued after a +// skip-the-line payment), the middleware auto-promotes their ticket to +// the front of the queue. This happens transparently — the client sees +// the waiting room briefly and is admitted on the next poll cycle. +// // This design avoids writing two responses to the same ResponseWriter by // never calling c.Next() on a request that was served the waiting room page. // @@ -41,6 +46,12 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { secure := wr.secureCookie.Load() || c.Request.TLS != nil + // Check if the client has a valid VIP pass for auto-promotion. + hasPass := false + if passCookie, err := c.Request.Cookie(passCookieName); err == nil { + hasPass = wr.HasValidPass(passCookie.Value) + } + // Resume an existing queued position if the client presents a // valid room_ticket cookie. This preserves queue position across // page reloads and polling retries. @@ -78,6 +89,14 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { c.Next() return } + + // Client has a valid pass but their ticket isn't ready + // yet — auto-promote them to the front so they get + // admitted on the next poll cycle. + if hasPass && !entry.promoted { + wr.autoPromote(cookie.Value) + } + // Touch the token's issuedAt so active pollers do not // get reaped during normal operation. wr.tokens.touchIssuedAt(cookie.Value) @@ -88,7 +107,7 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { position = 1 } html := wr.resolveHTML() - c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectPosition(html, position)) + c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectTemplateVars(html, position)) c.Abort() return } @@ -155,16 +174,53 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { SameSite: http.SameSiteLaxMode, }) + // If the client has a valid pass, auto-promote the freshly + // issued ticket immediately so they jump to the front. + if hasPass { + wr.autoPromote(token) + } + position := wr.positionOf(ticket) if position < 1 { position = 1 } html := wr.resolveHTML() - c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectPosition(html, position)) + c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectTemplateVars(html, position)) c.Abort() } } +// autoPromote silently promotes a token to the front of the queue. +// This is used when a client with a valid VIP pass re-enters the queue. +// Unlike PromoteToken, it does not require a RateFunc and does not +// compute cost — the pass was already paid for. +func (wr *WaitingRoom) autoPromote(token string) { + wr.promoteMu.Lock() + defer wr.promoteMu.Unlock() + + entry, ok := wr.tokens.get(token) + if !ok { + return + } + + if wr.positionOf(entry.ticket) <= 0 { + return // already in serving window + } + + ceiling := wr.nowServing.Load() + int64(wr.cap.Load()) + 1 + insert := wr.promoteInsert.Load() + if ceiling < insert { + insert = ceiling + } + + entry.ticket = insert + entry.promoted = true + wr.promoteInsert.Store(insert - 1) + wr.tokens.set(token, entry) + + wr.emit(EventPromote, wr.snapshot(EventPromote)) +} + // ticketReady reports whether the given ticket falls within the current // serving window. func (wr *WaitingRoom) ticketReady(ticket int64) bool { @@ -212,21 +268,38 @@ func (wr *WaitingRoom) resolveHTML() []byte { return defaultWaitingRoomBytes } -// injectPosition substitutes {{.Position}} in the HTML bytes with the -// caller's numeric queue position. -func (wr *WaitingRoom) injectPosition(html []byte, position int64) []byte { - return bytes.ReplaceAll( +// injectTemplateVars substitutes all template placeholders in the HTML +// bytes with their current values: +// +// - {{.Position}} → the caller's numeric queue position +// - {{.SkipURL}} → the payment URL (empty string if not configured) +// +// This is the single point of template injection. All placeholders are +// handled here so they cannot diverge across call sites. +func (wr *WaitingRoom) injectTemplateVars(html []byte, position int64) []byte { + result := bytes.ReplaceAll( html, []byte("{{.Position}}"), []byte(fmt.Sprintf("%d", position)), ) + result = bytes.ReplaceAll( + result, + []byte("{{.SkipURL}}"), + []byte(wr.SkipURL()), + ) + return result } // SetHTML replaces the waiting room page served to queued requests. // Pass nil to revert to the embedded default waiting_room.html. // Safe to call at any time including while requests are in flight. // -// Related: WaitingRoom.resolveHTML +// Custom HTML may use the following template placeholders: +// +// - {{.Position}} — replaced with the client's queue position (integer) +// - {{.SkipURL}} — replaced with the skip-the-line payment URL (string) +// +// Related: WaitingRoom.resolveHTML, WaitingRoom.SetSkipURL func (wr *WaitingRoom) SetHTML(html []byte) { wr.mu.Lock() defer wr.mu.Unlock() diff --git a/sample/basic-web-app/main.go b/sample/basic-web-app/main.go index 659155f..88192a4 100644 --- a/sample/basic-web-app/main.go +++ b/sample/basic-web-app/main.go @@ -54,6 +54,29 @@ func main() { log.Fatalf("room.SetReaperInterval: %v", err) } + // ── 3a. Configure skip-the-line pricing ────────────────────────────── + // + // SetRateFunc defines the per-position cost. Here we use a flat rate + // of $2.50 per position. In production you might use surge pricing: + // + // wr.SetRateFunc(func(depth int64) float64 { + // return 1.00 + float64(depth)*0.05 // base $1 + 5¢ per queued request + // }) + // + // SetSkipURL tells the waiting room page where the "Pay to skip" + // button should navigate. This must be registered BEFORE + // RegisterRoutes so it bypasses the waiting room. + // + // SetPassDuration configures how long a paid skip-the-line pass + // remains valid. During this window, if the client is evicted, + // times out, or refreshes, they are auto-promoted to the front + // without paying again. Set to 0 to disable (single-use promotions). + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + wr.SetSkipURL("/queue/purchase") + if err := wr.SetPassDuration(90 * time.Minute); err != nil { + log.Fatalf("room.SetPassDuration: %v", err) + } + // ── 4. Lifecycle callbacks ──────────────────────────────────────────── // // These callbacks are what you will see in the terminal during ab. @@ -66,6 +89,7 @@ func main() { // grep '\[DRAIN\]' — moments the room dropped below capacity // grep '\[EVICT\]' — abandoned ghost tickets removed by the reaper // grep '\[TIMEOUT\]'— requests whose context was cancelled mid-queue + // grep '\[PROMOTE\]'— a queued client paid to skip the line wr.On(room.EventFull, func(s room.Snapshot) { roomLog("FULL ", fmt.Sprintf( @@ -120,23 +144,50 @@ func main() { )) }) - // ── 5. Register the WaitingRoom routes ─────────────────────────────── + wr.On(room.EventPromote, func(s room.Snapshot) { + roomLog("PROMOTE", fmt.Sprintf( + "client promoted to front occupancy=%d/%d queue=%d", + s.Occupancy, s.Capacity, s.QueueDepth, + )) + }) + + // ── 5. Register skip-the-line routes BEFORE the waiting room ───────── + // + // These routes must bypass the waiting room so that queued clients + // can access the payment flow. Register them before RegisterRoutes. + // + // In production you would replace the GET /queue/purchase page with + // a handler that creates a Stripe Checkout session and redirects, + // and POST /queue/purchase/confirm with a Stripe webhook handler + // that verifies the payment event before calling PromoteTokenToFront. + + // GET /queue/purchase — shows the "confirm payment" page. + // In production: creates a Stripe Checkout session and redirects. + r.GET("/queue/purchase", handlePurchasePage) + + // POST /queue/purchase/confirm — processes the payment and promotes. + // In production: this is your Stripe webhook endpoint that verifies + // the payment signature before promoting. + r.POST("/queue/purchase/confirm", handlePurchaseConfirm) + + // ── 6. Register the WaitingRoom routes ─────────────────────────────── // - // RegisterRoutes must come BEFORE your application routes. + // RegisterRoutes must come AFTER the payment routes (so they bypass + // the queue) and BEFORE your application routes (so they are gated). // It installs, in order: // OPTIONS /queue/status — CORS preflight // GET /queue/status — polling endpoint for the waiting-room page // r.Use(wr.Middleware()) — gates every route registered after this wr.RegisterRoutes(r) - // ── 6. Application routes (all gated by the waiting room) ──────────── + // ── 7. Application routes (all gated by the waiting room) ──────────── r.GET("/", homePage) r.GET("/about", aboutPage) r.GET("/pricing", pricingPage) r.GET("/contact", contactPage) - // ── 7. Graceful shutdown ────────────────────────────────────────────── + // ── 8. Graceful shutdown ────────────────────────────────────────────── srv := &http.Server{ Addr: ":8080", @@ -147,7 +198,8 @@ func main() { signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) go func() { - log.Printf("[ INFO ] listening on http://localhost:8080 cap=%d", wr.Cap()) + log.Printf("[ INFO ] listening on http://localhost:8080 cap=%d rate=$%.2f/pos pass=%s", + wr.Cap(), 2.50, wr.PassDuration()) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("ListenAndServe: %v", err) } @@ -165,6 +217,160 @@ func main() { log.Println("[ INFO ] server exited cleanly") } +// ── Skip-the-line payment handlers ──────────────────────────────────────────── +// +// These simulate a payment flow for the demo. In production, replace +// handlePurchasePage with a Stripe Checkout redirect and +// handlePurchaseConfirm with your Stripe webhook handler. + +// handlePurchasePage shows a confirmation page with the current cost. +// The room_ticket cookie identifies which queued client is paying. +// +// Production equivalent: create a Stripe Checkout session with the +// token as client_reference_id, then redirect to session.URL. +func handlePurchasePage(c *gin.Context) { + cookie, err := c.Request.Cookie("room_ticket") + if err != nil || cookie.Value == "" { + c.Data(http.StatusBadRequest, "text/html; charset=utf-8", page( + "Error", + `

No queue ticket found

+

You need to be in the waiting room to skip the line.

+ ← Back to site`, + )) + return + } + + token := cookie.Value + + // Check if the client already has a valid pass — no need to pay again. + if passCookie, err := c.Request.Cookie("room_pass"); err == nil { + if wr.HasValidPass(passCookie.Value) { + c.Data(http.StatusOK, "text/html; charset=utf-8", page( + "VIP pass active", + fmt.Sprintf( + `

You already have a VIP pass

+

Your pass is still active (expires in %s). You'll be + automatically moved to the front — no additional payment needed.

+

Head back and you'll be admitted shortly.

+ ← Back to site`, + wr.PassDuration().Round(time.Minute)), + )) + return + } + } + + // Get the current cost to jump to position 1. + cost, err := wr.QuoteCost(token, 1) + if err != nil { + var msg string + switch err.(type) { + case room.ErrTokenNotFound: + msg = "Your queue ticket has expired or was already used." + case room.ErrAlreadyAdmitted: + msg = "You're already being admitted — no need to pay!" + case room.ErrPromotionDisabled: + msg = "Skip-the-line is not available right now." + default: + msg = "Something went wrong: " + err.Error() + } + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Skip the line", fmt.Sprintf( + `

Skip the line

+

%s

+ ← Back to site`, msg, + ))) + return + } + + if cost <= 0 { + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Skip the line", + `

You're next!

+

You're already at the front of the line — no payment needed.

+

Head back and you'll be admitted momentarily.

+ ← Back to site`, + )) + return + } + + // Render the payment confirmation page. + // In production this would be a Stripe Checkout redirect instead. + c.Data(http.StatusOK, "text/html; charset=utf-8", purchasePage(cost, wr.PassDuration())) +} + +// handlePurchaseConfirm processes the "payment" and promotes the token. +// +// Production equivalent: this is your Stripe webhook handler. It would: +// 1. Verify the Stripe signature (stripe.ConstructEvent) +// 2. Extract client_reference_id from the checkout session +// 3. Call wr.PromoteTokenToFront(token) +// 4. Set the room_pass cookie from result.PassToken +// 5. Return 200 to Stripe +// +// For this demo we skip signature verification and just promote. +func handlePurchaseConfirm(c *gin.Context) { + cookie, err := c.Request.Cookie("room_ticket") + if err != nil || cookie.Value == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "no room_ticket cookie"}) + return + } + + token := cookie.Value + + result, err := wr.PromoteTokenToFront(token) + if err != nil { + log.Printf("[ SKIP ] promotion failed for token=%.8s...: %v", token, err) + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Payment failed", fmt.Sprintf( + `

Something went wrong

+

%s

+

You haven't been charged. Head back to the waiting room and try again.

+ ← Back to site`, err.Error(), + ))) + return + } + + log.Printf("[ SKIP ] token=%.8s... promoted to front cost=$%.2f pass=%v", + token, result.Cost, result.PassToken != "") + + // Set the VIP pass cookie if a pass was issued. This cookie + // persists across queue entries so the client is auto-promoted + // for the configured pass duration without paying again. + if result.PassToken != "" { + secure := wr.SkipURL() != "" // crude; in production use wr.SetSecureCookie logic + http.SetCookie(c.Writer, &http.Cookie{ + Name: "room_pass", + Value: result.PassToken, + Path: wr.CookiePath(), + Domain: wr.CookieDomain(), + MaxAge: int(wr.PassDuration().Seconds()), + HttpOnly: true, + Secure: secure, + SameSite: http.SameSiteLaxMode, + }) + } + + // Redirect back to the site. The next poll (or page load) will + // see ready=true and admit the client immediately. + passMsg := "" + if result.PassToken != "" { + passMsg = fmt.Sprintf( + `

Your VIP pass is valid for %s — if you + re-enter the queue during that time, you'll be automatically + moved to the front at no extra cost.

`, + wr.PassDuration().Round(time.Minute)) + } + + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Payment confirmed", + fmt.Sprintf( + `

Payment confirmed — $%.2f

+

You've been moved to the front of the line!

+ %s +

Redirecting you now...

+ + `, + result.Cost, passMsg, + ), + )) +} + // ── Page handlers ───────────────────────────────────────────────────────────── // // Each handler sleeps for a realistic duration so that concurrent ab requests @@ -184,6 +390,12 @@ func homePage(c *gin.Context) { Run ab -t 60 -n 1000 -c 100 http://localhost:8080/about in a second terminal and watch this terminal for room events.

+

+ When you land in the waiting room, you'll see a + "Skip the line" option — click it to test the + payment flow at $2.50/position. Your VIP pass + lasts 90 minutes. +