From 48ea3ea5e731af99b63b392a3dda25a3a67ed3f4 Mon Sep 17 00:00:00 2001 From: Andrei Merlescu Date: Mon, 13 Apr 2026 17:50:00 -0400 Subject: [PATCH 1/4] Added paywall functionality to waiting room --- callback.go | 7 + callback_test.go | 3 +- errors.go | 33 ++ new.go | 28 ++ promote.go | 181 +++++++ promote_test.go | 947 +++++++++++++++++++++++++++++++++++ room.go | 31 +- sample/basic-web-app/main.go | 325 +++++++++++- status.go | 18 +- types.go | 7 + waiting_room.html | 181 ++++++- 11 files changed, 1734 insertions(+), 27 deletions(-) create mode 100644 promote.go create mode 100644 promote_test.go diff --git a/callback.go b/callback.go index b22dbe8..5540058 100644 --- a/callback.go +++ b/callback.go @@ -43,6 +43,11 @@ const ( // EventTimeout fires when a queued request's context is cancelled or // its deadline expires before a slot becomes available. EventTimeout + + // EventPromote fires when a queued token is promoted to a higher + // position via PromoteToken. Use this to track revenue events or + // log queue jumps for fairness monitoring. + EventPromote ) // String returns the canonical name of the Event, suitable for logging. @@ -62,6 +67,8 @@ func (e Event) String() string { return "Evict" case EventTimeout: return "Timeout" + case EventPromote: + return "Promote" default: return "Unknown" } diff --git a/callback_test.go b/callback_test.go index cf4c5d1..b88dd75 100644 --- a/callback_test.go +++ b/callback_test.go @@ -23,6 +23,7 @@ func TestEvent_String(t *testing.T) { {EventQueue, "Queue"}, {EventEvict, "Evict"}, {EventTimeout, "Timeout"}, + {EventPromote, "Promote"}, {Event(255), "Unknown"}, } for _, tc := range cases { @@ -246,7 +247,7 @@ func TestConcurrent_OnOffEmit_AllEvents(t *testing.T) { t.Parallel() wr := newTestWR(t, 5) - events := []Event{EventEnter, EventExit, EventFull, EventDrain, EventQueue, EventEvict, EventTimeout} + events := []Event{EventEnter, EventExit, EventFull, EventDrain, EventQueue, EventEvict, EventTimeout, EventPromote} var wg sync.WaitGroup for _, ev := range events { ev := ev diff --git a/errors.go b/errors.go index 1e4fbd7..0de5688 100644 --- a/errors.go +++ b/errors.go @@ -46,3 +46,36 @@ type ErrInvalidMaxQueueDepth struct { func (e ErrInvalidMaxQueueDepth) Error() string { return fmt.Sprintf("room: invalid max queue depth %d: must be >= 0", e.Given) } + +// ErrPromotionDisabled is returned when PromoteToken or QuoteCost is +// called without a RateFunc configured via SetRateFunc. +type ErrPromotionDisabled struct{} + +func (e ErrPromotionDisabled) Error() string { + return "room: promotions disabled — call SetRateFunc first" +} + +// ErrTokenNotFound is returned when the token does not exist in the +// token store (expired, already admitted, or never issued). +type ErrTokenNotFound struct{} + +func (e ErrTokenNotFound) Error() string { + return "room: token not found" +} + +// ErrAlreadyAdmitted is returned when a promotion is attempted on a +// token that is already within the serving window. +type ErrAlreadyAdmitted struct{} + +func (e ErrAlreadyAdmitted) Error() string { + return "room: token already within serving window" +} + +// ErrInvalidTargetPosition is returned when targetPosition < 1. +type ErrInvalidTargetPosition struct { + Given int64 +} + +func (e ErrInvalidTargetPosition) Error() string { + return fmt.Sprintf("room: invalid target position %d: must be >= 1", e.Given) +} diff --git a/new.go b/new.go index 6d41f9d..551095e 100644 --- a/new.go +++ b/new.go @@ -3,6 +3,7 @@ package room import ( "context" "fmt" + "math" "net/http" "github.com/andreimerlescu/sema" @@ -92,6 +93,9 @@ func (wr *WaitingRoom) Init(cap int32) error { wr.maxQueueDepth.Store(defaultMaxQueueDepth) wr.cookiePath.Store("/") wr.cookieDomain.Store("") + wr.rateFunc.Store((*rateFuncHolder)(nil)) + wr.promoteInsert.Store(math.MaxInt64) + wr.skipURL.Store("") wr.initialised.Store(true) wr.callbacks = newCallbackRegistry() @@ -179,6 +183,30 @@ func (wr *WaitingRoom) CookieDomain() string { return wr.cookieDomain.Load().(string) } +// SetSkipURL sets the URL that the waiting room "Pay to skip" button +// navigates to. This is typically a payment page (Stripe Checkout, +// crypto invoice, etc.) that your application hosts. +// +// The handler at this URL can read the room_ticket cookie to identify +// which queued client is paying. After payment verification, call +// PromoteTokenToFront to move them to the front of the queue. +// +// If empty (the default), the skip-the-line offer is never shown on +// the waiting room page, even if a RateFunc is configured. Both +// SetRateFunc and SetSkipURL must be set for the offer to appear. +// +// Safe to call at any time. +// +// Related: SetRateFunc, PromoteToken, PromoteTokenToFront +func (wr *WaitingRoom) SetSkipURL(url string) { + wr.skipURL.Store(url) +} + +// SkipURL returns the current skip-the-line payment URL. +func (wr *WaitingRoom) SkipURL() string { + return wr.skipURL.Load().(string) +} + // checkInitialised aborts the request with 500 and returns false if the // WaitingRoom has not been initialised. Prevents nil pointer dereferences // on zero-value WaitingRoom structs. diff --git a/promote.go b/promote.go new file mode 100644 index 0000000..13cf3ee --- /dev/null +++ b/promote.go @@ -0,0 +1,181 @@ +package room + +// RateFunc returns the per-position cost given the current queue depth. +// Implementations can return a flat rate, a curve, or surge pricing. +// +// Examples: +// +// // Flat: $1 per position +// func(depth int64) float64 { return 1.00 } +// +// // Surge: price increases with queue depth +// func(depth int64) float64 { return 0.50 + float64(depth)*0.01 } +type RateFunc func(queueDepth int64) float64 + +// SetRateFunc sets the per-position pricing function used by QuoteCost +// and PromoteToken. If nil, promotions are disabled and PromoteToken +// returns ErrPromotionDisabled. +// +// Safe to call at any time including while requests are in flight. +func (wr *WaitingRoom) SetRateFunc(fn RateFunc) { + if fn == nil { + wr.rateFunc.Store((*rateFuncHolder)(nil)) + return + } + wr.rateFunc.Store(&rateFuncHolder{fn: fn}) +} + +// rateFuncHolder wraps a RateFunc so that atomic.Value always stores the +// same concrete type. atomic.Value panics if Store is called with a +// different concrete type than a previous Store, so we cannot alternate +// between a nil interface and a concrete function value. Wrapping in a +// pointer-to-struct avoids this: we always store *rateFuncHolder (which +// may itself be nil). +type rateFuncHolder struct { + fn RateFunc +} + +// rateFuncLoad returns the current RateFunc or nil if unset. +func (wr *WaitingRoom) rateFuncLoad() RateFunc { + v := wr.rateFunc.Load() + if v == nil { + return nil + } + h, ok := v.(*rateFuncHolder) + if !ok || h == nil { + return nil + } + return h.fn +} + +// QuoteCost returns the cost for a queued token to jump to targetPosition. +// targetPosition=1 means next to be admitted. Returns the cost without +// modifying any state — use this to display pricing on the waiting room page. +// +// Returns ErrPromotionDisabled if no RateFunc is set. +// Returns ErrTokenNotFound if the token does not exist in the queue. +// Returns ErrAlreadyAdmitted if the token is already within the serving window. +// Returns ErrInvalidTargetPosition if targetPosition < 1. +// +// Related: PromoteToken, SetRateFunc +func (wr *WaitingRoom) QuoteCost(token string, targetPosition int64) (float64, error) { + if targetPosition < 1 { + return 0, ErrInvalidTargetPosition{Given: targetPosition} + } + + fn := wr.rateFuncLoad() + if fn == nil { + return 0, ErrPromotionDisabled{} + } + + entry, ok := wr.tokens.get(token) + if !ok { + return 0, ErrTokenNotFound{} + } + + currentPosition := wr.positionOf(entry.ticket) + if currentPosition <= 0 { + return 0, ErrAlreadyAdmitted{} + } + + if targetPosition >= currentPosition { + return 0, nil // already at or ahead of target + } + + distance := currentPosition - targetPosition + rate := fn(wr.QueueDepth()) + return float64(distance) * rate, nil +} + +// PromoteToken moves a queued token to targetPosition in the queue. +// targetPosition=1 means next to be admitted. +// +// The caller is responsible for payment verification before calling this. +// PromoteToken does not handle payments — it only reassigns the ticket. +// +// Returns the cost that was computed at promotion time. The caller should +// compare this against the amount actually charged to detect race conditions +// where the queue moved between QuoteCost and PromoteToken. +// +// Promotion is serialized: only one PromoteToken call executes at a time +// to prevent two promotions from claiming the same ticket slot. Each +// successive promotion to the same target position is placed one position +// behind the previous promotee, ensuring unique ticket assignment. +// +// Returns ErrPromotionDisabled if no RateFunc is set. +// Returns ErrTokenNotFound if the token does not exist. +// Returns ErrAlreadyAdmitted if already within the serving window. +// Returns ErrInvalidTargetPosition if targetPosition < 1. +// +// Related: QuoteCost, PromoteTokenToFront, SetRateFunc +func (wr *WaitingRoom) PromoteToken(token string, targetPosition int64) (float64, error) { + if targetPosition < 1 { + return 0, ErrInvalidTargetPosition{Given: targetPosition} + } + + fn := wr.rateFuncLoad() + if fn == nil { + return 0, ErrPromotionDisabled{} + } + + // Serialize promotions so two concurrent callers cannot claim + // the same synthetic ticket number. + wr.promoteMu.Lock() + defer wr.promoteMu.Unlock() + + entry, ok := wr.tokens.get(token) + if !ok { + return 0, ErrTokenNotFound{} + } + + currentPosition := wr.positionOf(entry.ticket) + if currentPosition <= 0 { + return 0, ErrAlreadyAdmitted{} + } + + if targetPosition >= currentPosition { + return 0, nil // no-op, already ahead + } + + distance := currentPosition - targetPosition + rate := fn(wr.QueueDepth()) + cost := float64(distance) * rate + + // Compute the new ticket number. We use promoteInsert to guarantee + // uniqueness: it tracks the lowest ticket number that the next + // promotion is allowed to claim. Each promotion takes the minimum + // of the natural ceiling (where the target position would land in + // the ticket space) and promoteInsert, then decrements promoteInsert + // so the next caller gets a strictly lower value. + // + // promoteInsert is initialised to math.MaxInt64, so the first + // promotion always uses the natural ceiling. Subsequent promotions + // use whichever is lower — the ceiling or the running counter — + // preventing collisions even when multiple tokens target the same + // position. + ceiling := wr.nowServing.Load() + int64(wr.cap.Load()) + targetPosition + insert := wr.promoteInsert.Load() + if ceiling < insert { + insert = ceiling + } + + newTicket := insert + wr.promoteInsert.Store(insert - 1) + + // Update the entry with the new ticket. + entry.ticket = newTicket + entry.promoted = true + wr.tokens.set(token, entry) + + wr.emit(EventPromote, wr.snapshot(EventPromote)) + + return cost, nil +} + +// PromoteTokenToFront is a convenience wrapper that promotes a token +// to position 1 (next to be admitted). +// +// Related: PromoteToken, QuoteCost +func (wr *WaitingRoom) PromoteTokenToFront(token string) (float64, error) { + return wr.PromoteToken(token, 1) +} diff --git a/promote_test.go b/promote_test.go new file mode 100644 index 0000000..a2f2e20 --- /dev/null +++ b/promote_test.go @@ -0,0 +1,947 @@ +package room + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gin-gonic/gin" +) + +// ── RateFunc / SetRateFunc ─────────────────────────────────────────────────── + +func TestSetRateFunc_NilDisablesPromotions(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + wr.SetRateFunc(nil) + + if fn := wr.rateFuncLoad(); fn != nil { + t.Error("expected nil RateFunc after SetRateFunc(nil)") + } +} + +func TestSetRateFunc_StoresAndLoads(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + + fn := wr.rateFuncLoad() + if fn == nil { + t.Fatal("expected non-nil RateFunc") + } + if got := fn(0); got != 2.50 { + t.Errorf("expected rate 2.50, got %f", got) + } +} + +func TestRateFuncLoad_DefaultIsNil(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + if fn := wr.rateFuncLoad(); fn != nil { + t.Error("expected nil RateFunc by default") + } +} + +// ── QuoteCost ──────────────────────────────────────────────────────────────── + +func TestQuoteCost_NoRateFunc_ReturnsDisabled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + + // Manually insert a token so the test isn't about token lookup. + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + _, err := wr.QuoteCost("tok", 1) + if err == nil { + t.Fatal("expected ErrPromotionDisabled, got nil") + } + if _, ok := err.(ErrPromotionDisabled); !ok { + t.Errorf("expected ErrPromotionDisabled, got %T: %v", err, err) + } +} + +func TestQuoteCost_TokenNotFound(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + _, err := wr.QuoteCost("nonexistent", 1) + if err == nil { + t.Fatal("expected ErrTokenNotFound, got nil") + } + if _, ok := err.(ErrTokenNotFound); !ok { + t.Errorf("expected ErrTokenNotFound, got %T: %v", err, err) + } +} + +func TestQuoteCost_InvalidTargetPosition(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + for _, target := range []int64{0, -1, -100} { + _, err := wr.QuoteCost("tok", target) + if err == nil { + t.Errorf("expected ErrInvalidTargetPosition for target=%d, got nil", target) + continue + } + if _, ok := err.(ErrInvalidTargetPosition); !ok { + t.Errorf("expected ErrInvalidTargetPosition for target=%d, got %T", target, err) + } + } +} + +func TestQuoteCost_AlreadyAdmitted(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // Ticket 5 with cap=10 and nowServing=0 → position = 5 - 0 - 10 = -5 (admitted). + wr.tokens.set("tok", ticketEntry{ticket: 5, issuedAt: time.Now()}) + + _, err := wr.QuoteCost("tok", 1) + if err == nil { + t.Fatal("expected ErrAlreadyAdmitted, got nil") + } + if _, ok := err.(ErrAlreadyAdmitted); !ok { + t.Errorf("expected ErrAlreadyAdmitted, got %T: %v", err, err) + } +} + +func TestQuoteCost_AlreadyAtOrAheadOfTarget(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // cap=1, nowServing=0 → position = ticket - 0 - 1 = ticket - 1. + // ticket=4 → position=3. + wr.tokens.set("tok", ticketEntry{ticket: 4, issuedAt: time.Now()}) + + // Target position 3 (same as current) → cost should be 0. + cost, err := wr.QuoteCost("tok", 3) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 0 { + t.Errorf("expected cost 0 for same position, got %f", cost) + } + + // Target position 5 (behind current) → cost should be 0. + cost, err = wr.QuoteCost("tok", 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 0 { + t.Errorf("expected cost 0 for behind position, got %f", cost) + } +} + +func TestQuoteCost_CorrectCalculation_FlatRate(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 2.00 }) + + // cap=1, nowServing=0 → position = ticket - 1. + // ticket=11 → position=10. + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + // Jump from position 10 to position 1: distance=9, cost = 9 * 2.00 = 18.00. + cost, err := wr.QuoteCost("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 18.00 { + t.Errorf("expected cost 18.00, got %f", cost) + } + + // Jump from position 10 to position 5: distance=5, cost = 5 * 2.00 = 10.00. + cost, err = wr.QuoteCost("tok", 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 10.00 { + t.Errorf("expected cost 10.00, got %f", cost) + } +} + +func TestQuoteCost_SurgeRateUsesQueueDepth(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + + // Surge pricing: base $1 + $0.10 per queued request. + wr.SetRateFunc(func(depth int64) float64 { + return 1.0 + float64(depth)*0.10 + }) + + // Insert several tokens to build queue depth. + // cap=1, nowServing=0 → window is ticket [1]. + // Tickets 2..6 are outside the window → queue depth = 5. + for i := int64(2); i <= 6; i++ { + wr.nextTicket.Store(i) + wr.tokens.set(fmt.Sprintf("tok-%d", i), ticketEntry{ + ticket: i, + issuedAt: time.Now(), + }) + } + wr.nextTicket.Store(6) + + // tok-6: position = 6 - 0 - 1 = 5. Queue depth = 5. + // rate = 1.0 + 5*0.10 = 1.50. + // Jump to position 1: distance = 4, cost = 4 * 1.50 = 6.00. + cost, err := wr.QuoteCost("tok-6", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 6.00 { + t.Errorf("expected cost 6.00, got %f", cost) + } +} + +func TestQuoteCost_DoesNotMutateState(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + ticketBefore := int64(11) + nowServingBefore := wr.nowServing.Load() + + _, _ = wr.QuoteCost("tok", 1) + + entry, ok := wr.tokens.get("tok") + if !ok { + t.Fatal("token disappeared after QuoteCost") + } + if entry.ticket != ticketBefore { + t.Errorf("QuoteCost mutated ticket: was %d, now %d", ticketBefore, entry.ticket) + } + if wr.nowServing.Load() != nowServingBefore { + t.Errorf("QuoteCost mutated nowServing: was %d, now %d", + nowServingBefore, wr.nowServing.Load()) + } +} + +// ── PromoteToken ───────────────────────────────────────────────────────────── + +func TestPromoteToken_NoRateFunc_ReturnsDisabled(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + _, err := wr.PromoteToken("tok", 1) + if err == nil { + t.Fatal("expected ErrPromotionDisabled") + } + if _, ok := err.(ErrPromotionDisabled); !ok { + t.Errorf("expected ErrPromotionDisabled, got %T", err) + } +} + +func TestPromoteToken_TokenNotFound(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + _, err := wr.PromoteToken("nonexistent", 1) + if err == nil { + t.Fatal("expected ErrTokenNotFound") + } + if _, ok := err.(ErrTokenNotFound); !ok { + t.Errorf("expected ErrTokenNotFound, got %T", err) + } +} + +func TestPromoteToken_InvalidTargetPosition(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + wr.tokens.set("tok", ticketEntry{ticket: 20, issuedAt: time.Now()}) + + for _, target := range []int64{0, -1, -50} { + _, err := wr.PromoteToken("tok", target) + if err == nil { + t.Errorf("expected ErrInvalidTargetPosition for target=%d", target) + continue + } + if _, ok := err.(ErrInvalidTargetPosition); !ok { + t.Errorf("expected ErrInvalidTargetPosition for target=%d, got %T", target, err) + } + } +} + +func TestPromoteToken_AlreadyAdmitted(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 10) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // Ticket 3, cap=10, nowServing=0 → position = 3 - 0 - 10 = -7 (in window). + wr.tokens.set("tok", ticketEntry{ticket: 3, issuedAt: time.Now()}) + + _, err := wr.PromoteToken("tok", 1) + if err == nil { + t.Fatal("expected ErrAlreadyAdmitted") + } + if _, ok := err.(ErrAlreadyAdmitted); !ok { + t.Errorf("expected ErrAlreadyAdmitted, got %T", err) + } +} + +func TestPromoteToken_AlreadyAhead_Noop(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // cap=1, nowServing=0 → position = ticket - 1. + // ticket=4 → position=3. + wr.tokens.set("tok", ticketEntry{ticket: 4, issuedAt: time.Now()}) + + cost, err := wr.PromoteToken("tok", 3) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 0 { + t.Errorf("expected cost 0 for no-op promotion, got %f", cost) + } + + // Ticket should be unchanged. + entry, _ := wr.tokens.get("tok") + if entry.ticket != 4 { + t.Errorf("ticket changed on no-op promotion: expected 4, got %d", entry.ticket) + } +} + +func TestPromoteToken_MovesToFront(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // cap=1, nowServing=0. + // ticket=11 → position = 11 - 0 - 1 = 10. + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + cost, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // distance = 10 - 1 = 9, cost = 9 * 1.0 = 9.0. + if cost != 9.0 { + t.Errorf("expected cost 9.0, got %f", cost) + } + + // New ticket should place the token at position 1. + // ceiling = nowServing + cap + targetPosition = 0 + 1 + 1 = 2. + // promoteInsert was 0, so insert = ceiling = 2. newTicket = 2. + entry, ok := wr.tokens.get("tok") + if !ok { + t.Fatal("token not found after promotion") + } + if entry.ticket != 2 { + t.Errorf("expected new ticket 2, got %d", entry.ticket) + } + + // Verify positionOf confirms position 1. + pos := wr.positionOf(entry.ticket) + if pos != 1 { + t.Errorf("expected position 1 after promotion, got %d", pos) + } +} + +func TestPromoteToken_MovesToIntermediate(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 5.00 }) + + // cap=1, nowServing=0. + // ticket=21 → position = 21 - 0 - 1 = 20. + wr.tokens.set("tok", ticketEntry{ticket: 21, issuedAt: time.Now()}) + + // Promote to position 10: distance = 20 - 10 = 10, cost = 10 * 5.00 = 50.00. + cost, err := wr.PromoteToken("tok", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 50.0 { + t.Errorf("expected cost 50.0, got %f", cost) + } + + entry, _ := wr.tokens.get("tok") + pos := wr.positionOf(entry.ticket) + if pos != 10 { + t.Errorf("expected position 10 after promotion, got %d", pos) + } +} + +func TestPromoteToken_SetsPromotedFlag(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + _, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + entry, _ := wr.tokens.get("tok") + if !entry.promoted { + t.Error("expected promoted=true after PromoteToken") + } +} + +func TestPromoteToken_ReturnsCost(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 2) + wr.SetRateFunc(func(depth int64) float64 { return 3.50 }) + + // cap=2, nowServing=0 → position = ticket - 2. + // ticket=12 → position=10. + wr.tokens.set("tok", ticketEntry{ticket: 12, issuedAt: time.Now()}) + + // Jump to position 4: distance = 10 - 4 = 6, cost = 6 * 3.50 = 21.00. + cost, err := wr.PromoteToken("tok", 4) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 21.0 { + t.Errorf("expected cost 21.0, got %f", cost) + } +} + +// ── PromoteToken — EventPromote callback ───────────────────────────────────── + +func TestPromoteToken_FiresEventPromote(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + var promoteCount atomic.Int32 + wr.On(EventPromote, func(s Snapshot) { promoteCount.Add(1) }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + _, _ = wr.PromoteToken("tok", 1) + + waitForCount(t, &promoteCount, 1, 200*time.Millisecond) +} + +func TestPromoteToken_NoEventOnNoop(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + var promoteCount atomic.Int32 + wr.On(EventPromote, func(s Snapshot) { promoteCount.Add(1) }) + + // Position 3, target 3 → no-op. + wr.tokens.set("tok", ticketEntry{ticket: 4, issuedAt: time.Now()}) + _, _ = wr.PromoteToken("tok", 3) + + time.Sleep(50 * time.Millisecond) + if promoteCount.Load() != 0 { + t.Error("EventPromote fired on no-op promotion") + } +} + +func TestPromoteToken_NoEventOnError(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 5) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + var promoteCount atomic.Int32 + wr.On(EventPromote, func(s Snapshot) { promoteCount.Add(1) }) + + // Token does not exist → error, no event. + _, _ = wr.PromoteToken("missing", 1) + + time.Sleep(50 * time.Millisecond) + if promoteCount.Load() != 0 { + t.Error("EventPromote fired on failed promotion") + } +} + +// ── PromoteTokenToFront ────────────────────────────────────────────────────── + +func TestPromoteTokenToFront_JumpsToPositionOne(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // position = 11 - 0 - 1 = 10. + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + cost, err := wr.PromoteTokenToFront("tok") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // distance = 10 - 1 = 9, cost = 9 * 1.0 = 9.0. + if cost != 9.0 { + t.Errorf("expected cost 9.0, got %f", cost) + } + + entry, _ := wr.tokens.get("tok") + pos := wr.positionOf(entry.ticket) + if pos != 1 { + t.Errorf("expected position 1, got %d", pos) + } +} + +// ── Serialization: concurrent promotions ───────────────────────────────────── + +func TestPromoteToken_ConcurrentPromotions_NoCollision(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + const n = 20 + + // Insert n tokens at various positions. + for i := 0; i < n; i++ { + wr.tokens.set(fmt.Sprintf("tok-%d", i), ticketEntry{ + ticket: int64(10 + i*5), // positions spread apart + issuedAt: time.Now(), + }) + } + + var wg sync.WaitGroup + var successCount atomic.Int32 + var errorCount atomic.Int32 + + for i := 0; i < n; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + _, err := wr.PromoteToken(fmt.Sprintf("tok-%d", i), 1) + if err != nil { + errorCount.Add(1) + } else { + successCount.Add(1) + } + }() + } + + wg.Wait() + + // All should succeed (no panics, no data races). + if errorCount.Load() > 0 { + t.Logf("successes=%d errors=%d (errors may include already-admitted tokens)", + successCount.Load(), errorCount.Load()) + } + + // Verify no two surviving tokens have the same ticket number. + seen := make(map[int64]string) + wr.tokens.mu.RLock() + for token, entry := range wr.tokens.entries { + if prev, exists := seen[entry.ticket]; exists { + t.Errorf("ticket collision: tokens %q and %q both have ticket %d", + prev, token, entry.ticket) + } + seen[entry.ticket] = token + } + wr.tokens.mu.RUnlock() +} + +func TestPromoteToken_SerializedUnderMutex(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok-a", ticketEntry{ticket: 50, issuedAt: time.Now()}) + wr.tokens.set("tok-b", ticketEntry{ticket: 60, issuedAt: time.Now()}) + + // Promote both to position 1 concurrently. + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + wr.PromoteToken("tok-a", 1) + }() + go func() { + defer wg.Done() + wr.PromoteToken("tok-b", 1) + }() + + wg.Wait() + + entryA, okA := wr.tokens.get("tok-a") + entryB, okB := wr.tokens.get("tok-b") + + if !okA || !okB { + t.Fatal("tokens disappeared after concurrent promotion") + } + + // Because promotions are serialized via promoteMu and each claims + // a unique slot via promoteInsert, tickets must differ. + if entryA.ticket == entryB.ticket { + t.Errorf("ticket collision after serialized promotions: both have ticket %d", + entryA.ticket) + } +} + +// ── Integration: promoted token is admitted via status polling ──────────────── + +func TestIntegration_PromotedToken_AdmittedOnNextPoll(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + // Fill the single slot. + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + // Queue two requests and collect tokens. + _, tokenA := serveWithCookie(r, "") + _, tokenB := serveWithCookie(r, "") + + if tokenA == "" || tokenB == "" { + t.Fatal("expected tokens for both queued requests") + } + + // tokenB should be further back in the queue than tokenA. + entryB, _ := wr.tokens.get(tokenB) + posB := wr.positionOf(entryB.ticket) + if posB < 2 { + t.Fatalf("expected tokenB at position >= 2, got %d", posB) + } + + // Promote tokenB to front. + _, err := wr.PromoteTokenToFront(tokenB) + if err != nil { + t.Fatalf("PromoteTokenToFront: %v", err) + } + + // Release the active slot. + close(release) + + // tokenB should be admitted before tokenA now. + waitForStatus(t, r, tokenB, 5*time.Second) +} + +func TestIntegration_PromotedToken_ServesRequestSuccessfully(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + r := gin.New() + wr.RegisterRoutes(r) + + var served atomic.Int32 + r.GET("/", func(c *gin.Context) { + served.Add(1) + c.Status(http.StatusOK) + }) + + serving := make(chan struct{}, 1) + releaseFirst := make(chan struct{}) + + // Override the handler for the first request to block. + rBlocking := gin.New() + wr.RegisterRoutes(rBlocking) + rBlocking.GET("/", func(c *gin.Context) { + serving <- struct{}{} + <-releaseFirst + c.Status(http.StatusOK) + }) + + // Fill the slot with a blocking request. + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + rBlocking.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + // Queue a request and get its token. + _, token := serveWithCookie(rBlocking, "") + if token == "" { + t.Fatal("no token issued") + } + + // Promote to front. + _, err := wr.PromoteTokenToFront(token) + if err != nil { + t.Fatalf("PromoteTokenToFront: %v", err) + } + + // Release the blocking request. + close(releaseFirst) + + // Wait until the promoted token is ready. + waitForStatus(t, rBlocking, token, 5*time.Second) + + // Now make the actual request with the promoted cookie — it should + // pass through the middleware and hit the handler. + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("expected 200 after promotion, got %d", w.Code) + } +} + +// ── Integration: status endpoint includes pricing ──────────────────────────── + +func TestIntegration_StatusEndpoint_IncludesPricing(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + // Fill the slot. + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + // Queue multiple requests so the last one is at position > 1. + serveWithCookie(r, "") + serveWithCookie(r, "") + _, token := serveWithCookie(r, "") + if token == "" { + t.Fatal("no token issued") + } + + // Poll status — should include pricing fields. + req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var resp statusResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode error: %v", err) + } + + if resp.Ready { + t.Fatal("expected ready=false while queued") + } + if resp.RatePerPos != 2.50 { + t.Errorf("expected rate_per_pos=2.50, got %f", resp.RatePerPos) + } + if resp.Position <= 1 { + t.Errorf("expected position > 1 for third queued request, got %d", resp.Position) + } + if resp.SkipCost <= 0 { + t.Errorf("expected positive skip_cost, got %f", resp.SkipCost) + } + + // skip_cost should be (position - 1) * rate. + expectedCost := float64(resp.Position-1) * 2.50 + if resp.SkipCost != expectedCost { + t.Errorf("expected skip_cost=%f, got %f", expectedCost, resp.SkipCost) + } + + close(release) +} + +func TestIntegration_StatusEndpoint_NoPricingWithoutRateFunc(t *testing.T) { + t.Parallel() + const cap = 1 + wr := newTestWR(t, int32(cap)) + // No SetRateFunc — pricing should be absent. + + serving := make(chan struct{}, 1) + release := make(chan struct{}) + r := newTestRouter(wr, serving, release) + + go func() { + req := httptest.NewRequest(http.MethodGet, "/", nil) + r.ServeHTTP(httptest.NewRecorder(), req) + }() + <-serving + + _, token := serveWithCookie(r, "") + if token == "" { + t.Fatal("no token issued") + } + + req := httptest.NewRequest(http.MethodGet, "/queue/status", nil) + req.AddCookie(&http.Cookie{Name: cookieName, Value: token}) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + var resp statusResponse + json.NewDecoder(w.Body).Decode(&resp) + + if resp.SkipCost != 0 { + t.Errorf("expected skip_cost=0 without RateFunc, got %f", resp.SkipCost) + } + if resp.RatePerPos != 0 { + t.Errorf("expected rate_per_pos=0 without RateFunc, got %f", resp.RatePerPos) + } + + close(release) +} + +// ── Edge cases ─────────────────────────────────────────────────────────────── + +func TestPromoteToken_QueueMovedBetweenQuoteAndPromote(t *testing.T) { + // Simulates the race where the queue advances between QuoteCost + // and PromoteToken — the actual cost should reflect the state at + // promotion time, not quote time. + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + // position = 11 - 0 - 1 = 10. + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + + quotedCost, err := wr.QuoteCost("tok", 1) + if err != nil { + t.Fatalf("QuoteCost: %v", err) + } + + // Simulate queue advancement: nowServing moves forward by 3. + wr.nowServing.Add(3) + + // Now position = 11 - 3 - 1 = 7. Jump to 1: distance=6, cost=6.0. + actualCost, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("PromoteToken: %v", err) + } + + if actualCost >= quotedCost { + t.Errorf("expected actual cost (%f) < quoted cost (%f) after queue advancement", + actualCost, quotedCost) + } +} + +func TestPromoteToken_ReapDoesNotEvictPromotedToken(t *testing.T) { + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 11, issuedAt: time.Now()}) + _, err := wr.PromoteToken("tok", 1) + if err != nil { + t.Fatalf("PromoteToken: %v", err) + } + + // Token was just promoted — issuedAt was set at creation time, which + // is recent. Reap should not evict it. + wr.reap() + + if _, ok := wr.tokens.get("tok"); !ok { + t.Error("promoted token was evicted by reaper") + } +} + +func TestPromoteToken_ExpiredPromotedToken_Reaped(t *testing.T) { + // A promoted token that stops polling should eventually expire + // and be reaped, just like any other token. + t.Parallel() + wr := newTestWR(t, 1) + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ + ticket: 11, + issuedAt: time.Now().Add(-(cookieTTL + time.Minute)), + }) + + // Promote the already-expired token (simulating a payment that + // was processed very late). + // Note: the token's position would be based on current state. + // Since it's expired, it may be reaped before promotion in real + // usage, but here we're testing that promotion doesn't grant + // immunity from expiration. + entry, _ := wr.tokens.get("tok") + entry.ticket = wr.nowServing.Load() + int64(wr.cap.Load()) + 1 + entry.promoted = true + // Keep the old issuedAt to simulate an abandoned promoted token. + wr.tokens.set("tok", entry) + + wr.reap() + + if _, ok := wr.tokens.get("tok"); ok { + t.Error("expired promoted token should have been reaped") + } +} + +// ── Benchmark ──────────────────────────────────────────────────────────────── + +func BenchmarkQuoteCost(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + wr.tokens.set("tok", ticketEntry{ticket: 100, issuedAt: time.Now()}) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = wr.QuoteCost("tok", 1) + } +} + +func BenchmarkPromoteToken(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + token := fmt.Sprintf("tok-%d", i) + wr.tokens.set(token, ticketEntry{ + ticket: int64(100 + i), + issuedAt: time.Now(), + }) + _, _ = wr.PromoteToken(token, 1) + } +} + +func BenchmarkPromoteTokenToFront(b *testing.B) { + wr := &WaitingRoom{} + if err := wr.Init(10); err != nil { + b.Fatal(err) + } + defer wr.Stop() + wr.SetRateFunc(func(depth int64) float64 { return 1.0 }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + token := fmt.Sprintf("tok-%d", i) + wr.tokens.set(token, ticketEntry{ + ticket: int64(100 + i), + issuedAt: time.Now(), + }) + _, _ = wr.PromoteTokenToFront(token) + } +} diff --git a/room.go b/room.go index af3e7b6..75ac833 100644 --- a/room.go +++ b/room.go @@ -88,7 +88,7 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { position = 1 } html := wr.resolveHTML() - c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectPosition(html, position)) + c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectTemplateVars(html, position)) c.Abort() return } @@ -160,7 +160,7 @@ func (wr *WaitingRoom) Middleware() gin.HandlerFunc { position = 1 } html := wr.resolveHTML() - c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectPosition(html, position)) + c.Data(http.StatusOK, "text/html; charset=utf-8", wr.injectTemplateVars(html, position)) c.Abort() } } @@ -212,21 +212,38 @@ func (wr *WaitingRoom) resolveHTML() []byte { return defaultWaitingRoomBytes } -// injectPosition substitutes {{.Position}} in the HTML bytes with the -// caller's numeric queue position. -func (wr *WaitingRoom) injectPosition(html []byte, position int64) []byte { - return bytes.ReplaceAll( +// injectTemplateVars substitutes all template placeholders in the HTML +// bytes with their current values: +// +// - {{.Position}} → the caller's numeric queue position +// - {{.SkipURL}} → the payment URL (empty string if not configured) +// +// This is the single point of template injection. All placeholders are +// handled here so they cannot diverge across call sites. +func (wr *WaitingRoom) injectTemplateVars(html []byte, position int64) []byte { + result := bytes.ReplaceAll( html, []byte("{{.Position}}"), []byte(fmt.Sprintf("%d", position)), ) + result = bytes.ReplaceAll( + result, + []byte("{{.SkipURL}}"), + []byte(wr.SkipURL()), + ) + return result } // SetHTML replaces the waiting room page served to queued requests. // Pass nil to revert to the embedded default waiting_room.html. // Safe to call at any time including while requests are in flight. // -// Related: WaitingRoom.resolveHTML +// Custom HTML may use the following template placeholders: +// +// - {{.Position}} — replaced with the client's queue position (integer) +// - {{.SkipURL}} — replaced with the skip-the-line payment URL (string) +// +// Related: WaitingRoom.resolveHTML, WaitingRoom.SetSkipURL func (wr *WaitingRoom) SetHTML(html []byte) { wr.mu.Lock() defer wr.mu.Unlock() diff --git a/sample/basic-web-app/main.go b/sample/basic-web-app/main.go index 659155f..2515521 100644 --- a/sample/basic-web-app/main.go +++ b/sample/basic-web-app/main.go @@ -54,6 +54,21 @@ func main() { log.Fatalf("room.SetReaperInterval: %v", err) } + // ── 3a. Configure skip-the-line pricing ────────────────────────────── + // + // SetRateFunc defines the per-position cost. Here we use a flat rate + // of $2.50 per position. In production you might use surge pricing: + // + // wr.SetRateFunc(func(depth int64) float64 { + // return 1.00 + float64(depth)*0.05 // base $1 + 5¢ per queued request + // }) + // + // SetSkipURL tells the waiting room page where the "Pay to skip" + // button should navigate. This must be registered BEFORE + // RegisterRoutes so it bypasses the waiting room. + wr.SetRateFunc(func(depth int64) float64 { return 2.50 }) + wr.SetSkipURL("/queue/purchase") + // ── 4. Lifecycle callbacks ──────────────────────────────────────────── // // These callbacks are what you will see in the terminal during ab. @@ -66,6 +81,7 @@ func main() { // grep '\[DRAIN\]' — moments the room dropped below capacity // grep '\[EVICT\]' — abandoned ghost tickets removed by the reaper // grep '\[TIMEOUT\]'— requests whose context was cancelled mid-queue + // grep '\[PROMOTE\]'— a queued client paid to skip the line wr.On(room.EventFull, func(s room.Snapshot) { roomLog("FULL ", fmt.Sprintf( @@ -120,23 +136,50 @@ func main() { )) }) - // ── 5. Register the WaitingRoom routes ─────────────────────────────── + wr.On(room.EventPromote, func(s room.Snapshot) { + roomLog("PROMOTE", fmt.Sprintf( + "client paid to skip occupancy=%d/%d queue=%d", + s.Occupancy, s.Capacity, s.QueueDepth, + )) + }) + + // ── 5. Register skip-the-line routes BEFORE the waiting room ───────── + // + // These routes must bypass the waiting room so that queued clients + // can access the payment flow. Register them before RegisterRoutes. + // + // In production you would replace the GET /queue/purchase page with + // a handler that creates a Stripe Checkout session and redirects, + // and POST /queue/purchase/confirm with a Stripe webhook handler + // that verifies the payment event before calling PromoteTokenToFront. + + // GET /queue/purchase — shows the "confirm payment" page. + // In production: creates a Stripe Checkout session and redirects. + r.GET("/queue/purchase", handlePurchasePage) + + // POST /queue/purchase/confirm — processes the payment and promotes. + // In production: this is your Stripe webhook endpoint that verifies + // the payment signature before promoting. + r.POST("/queue/purchase/confirm", handlePurchaseConfirm) + + // ── 6. Register the WaitingRoom routes ─────────────────────────────── // - // RegisterRoutes must come BEFORE your application routes. + // RegisterRoutes must come AFTER the payment routes (so they bypass + // the queue) and BEFORE your application routes (so they are gated). // It installs, in order: // OPTIONS /queue/status — CORS preflight // GET /queue/status — polling endpoint for the waiting-room page // r.Use(wr.Middleware()) — gates every route registered after this wr.RegisterRoutes(r) - // ── 6. Application routes (all gated by the waiting room) ──────────── + // ── 7. Application routes (all gated by the waiting room) ──────────── r.GET("/", homePage) r.GET("/about", aboutPage) r.GET("/pricing", pricingPage) r.GET("/contact", contactPage) - // ── 7. Graceful shutdown ────────────────────────────────────────────── + // ── 8. Graceful shutdown ────────────────────────────────────────────── srv := &http.Server{ Addr: ":8080", @@ -147,7 +190,8 @@ func main() { signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) go func() { - log.Printf("[ INFO ] listening on http://localhost:8080 cap=%d", wr.Cap()) + log.Printf("[ INFO ] listening on http://localhost:8080 cap=%d rate=$%.2f/pos skip_url=%s", + wr.Cap(), 2.50, wr.SkipURL()) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("ListenAndServe: %v", err) } @@ -165,6 +209,113 @@ func main() { log.Println("[ INFO ] server exited cleanly") } +// ── Skip-the-line payment handlers ──────────────────────────────────────────── +// +// These simulate a payment flow for the demo. In production, replace +// handlePurchasePage with a Stripe Checkout redirect and +// handlePurchaseConfirm with your Stripe webhook handler. + +// handlePurchasePage shows a confirmation page with the current cost. +// The room_ticket cookie identifies which queued client is paying. +// +// Production equivalent: create a Stripe Checkout session with the +// token as client_reference_id, then redirect to session.URL. +func handlePurchasePage(c *gin.Context) { + cookie, err := c.Request.Cookie("room_ticket") + if err != nil || cookie.Value == "" { + c.Data(http.StatusBadRequest, "text/html; charset=utf-8", page( + "Error", + `

No queue ticket found

+

You need to be in the waiting room to skip the line.

+ ← Back to site`, + )) + return + } + + token := cookie.Value + + // Get the current cost to jump to position 1. + cost, err := wr.QuoteCost(token, 1) + if err != nil { + var msg string + switch err.(type) { + case room.ErrTokenNotFound: + msg = "Your queue ticket has expired or was already used." + case room.ErrAlreadyAdmitted: + msg = "You're already being admitted — no need to pay!" + case room.ErrPromotionDisabled: + msg = "Skip-the-line is not available right now." + default: + msg = "Something went wrong: " + err.Error() + } + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Skip the line", fmt.Sprintf( + `

Skip the line

+

%s

+ ← Back to site`, msg, + ))) + return + } + + if cost <= 0 { + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Skip the line", + `

You're next!

+

You're already at the front of the line — no payment needed.

+

Head back and you'll be admitted momentarily.

+ ← Back to site`, + )) + return + } + + // Render the payment confirmation page. + // In production this would be a Stripe Checkout redirect instead. + c.Data(http.StatusOK, "text/html; charset=utf-8", purchasePage(cost)) +} + +// handlePurchaseConfirm processes the "payment" and promotes the token. +// +// Production equivalent: this is your Stripe webhook handler. It would: +// 1. Verify the Stripe signature (stripe.ConstructEvent) +// 2. Extract client_reference_id from the checkout session +// 3. Call wr.PromoteTokenToFront(token) +// 4. Return 200 to Stripe +// +// For this demo we skip signature verification and just promote. +func handlePurchaseConfirm(c *gin.Context) { + cookie, err := c.Request.Cookie("room_ticket") + if err != nil || cookie.Value == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "no room_ticket cookie"}) + return + } + + token := cookie.Value + + cost, err := wr.PromoteTokenToFront(token) + if err != nil { + log.Printf("[ SKIP ] promotion failed for token=%.8s...: %v", token, err) + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Payment failed", fmt.Sprintf( + `

Something went wrong

+

%s

+

You haven't been charged. Head back to the waiting room and try again.

+ ← Back to site`, err.Error(), + ))) + return + } + + log.Printf("[ SKIP ] token=%.8s... promoted to front cost=$%.2f", token, cost) + + // Redirect back to the site. The next poll (or page load) will + // see ready=true and admit the client immediately. + c.Data(http.StatusOK, "text/html; charset=utf-8", page("Payment confirmed", + fmt.Sprintf( + `

Payment confirmed — $%.2f

+

You've been moved to the front of the line!

+

Redirecting you now...

+ + `, cost, + ), + )) +} + // ── Page handlers ───────────────────────────────────────────────────────────── // // Each handler sleeps for a realistic duration so that concurrent ab requests @@ -184,6 +335,11 @@ func homePage(c *gin.Context) { Run ab -t 60 -n 1000 -c 100 http://localhost:8080/about in a second terminal and watch this terminal for room events.

+

+ When you land in the waiting room, you'll see a + "Skip the line" option — click it to test the + payment flow at $2.50/position. +