Skip to content

Commit 96d0e65

Browse files
committed
fix: double freeturn bug
1 parent 68d8c59 commit 96d0e65

File tree

7 files changed

+1093
-13
lines changed

7 files changed

+1093
-13
lines changed

config_comparison_test.go

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"sync"
8+
"sync/atomic"
9+
"testing"
10+
"time"
11+
)
12+
13+
// TestBadConfigurationHighLoad demonstrates the problem with default configuration
14+
// under high load with slow dials.
15+
func TestBadConfigurationHighLoad(t *testing.T) {
16+
var dialCount atomic.Int32
17+
var dialsFailed atomic.Int32
18+
var dialsSucceeded atomic.Int32
19+
20+
// Simulate slow network - 300ms per dial (e.g., network latency, TLS handshake)
21+
slowDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
22+
dialCount.Add(1)
23+
select {
24+
case <-time.After(300 * time.Millisecond):
25+
dialsSucceeded.Add(1)
26+
return &net.TCPConn{}, nil
27+
case <-ctx.Done():
28+
dialsFailed.Add(1)
29+
return nil, ctx.Err()
30+
}
31+
}
32+
33+
// BAD CONFIGURATION: Default settings
34+
// On an 8-CPU machine:
35+
// - PoolSize = 10 * 8 = 80
36+
// - MaxConcurrentDials = 80
37+
// - MinIdleConns = 0 (no pre-warming)
38+
opt := &Options{
39+
Addr: "localhost:6379",
40+
Dialer: slowDialer,
41+
PoolSize: 80, // Default: 10 * GOMAXPROCS
42+
MaxConcurrentDials: 80, // Default: same as PoolSize
43+
MinIdleConns: 0, // Default: no pre-warming
44+
DialTimeout: 5 * time.Second,
45+
}
46+
47+
client := NewClient(opt)
48+
defer client.Close()
49+
50+
// Simulate high load: 200 concurrent requests with 200ms timeout
51+
// This simulates a burst of traffic (e.g., after a deployment or cache miss)
52+
const numRequests = 200
53+
const requestTimeout = 200 * time.Millisecond
54+
55+
var wg sync.WaitGroup
56+
var timeouts atomic.Int32
57+
var successes atomic.Int32
58+
var errors atomic.Int32
59+
60+
startTime := time.Now()
61+
62+
for i := 0; i < numRequests; i++ {
63+
wg.Add(1)
64+
go func(id int) {
65+
defer wg.Done()
66+
67+
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
68+
defer cancel()
69+
70+
_, err := client.Get(ctx, fmt.Sprintf("key-%d", id)).Result()
71+
72+
if err != nil {
73+
if ctx.Err() == context.DeadlineExceeded || err == context.DeadlineExceeded {
74+
timeouts.Add(1)
75+
} else {
76+
errors.Add(1)
77+
}
78+
} else {
79+
successes.Add(1)
80+
}
81+
}(i)
82+
83+
// Stagger requests slightly to simulate real traffic
84+
if i%20 == 0 {
85+
time.Sleep(5 * time.Millisecond)
86+
}
87+
}
88+
89+
wg.Wait()
90+
totalTime := time.Since(startTime)
91+
92+
timeoutRate := float64(timeouts.Load()) / float64(numRequests) * 100
93+
successRate := float64(successes.Load()) / float64(numRequests) * 100
94+
95+
t.Logf("\n=== BAD CONFIGURATION (Default Settings) ===")
96+
t.Logf("Configuration:")
97+
t.Logf(" PoolSize: %d", opt.PoolSize)
98+
t.Logf(" MaxConcurrentDials: %d", opt.MaxConcurrentDials)
99+
t.Logf(" MinIdleConns: %d", opt.MinIdleConns)
100+
t.Logf("\nResults:")
101+
t.Logf(" Total time: %v", totalTime)
102+
t.Logf(" Successes: %d (%.1f%%)", successes.Load(), successRate)
103+
t.Logf(" Timeouts: %d (%.1f%%)", timeouts.Load(), timeoutRate)
104+
t.Logf(" Other errors: %d", errors.Load())
105+
t.Logf(" Total dials: %d (succeeded: %d, failed: %d)",
106+
dialCount.Load(), dialsSucceeded.Load(), dialsFailed.Load())
107+
108+
// With bad configuration:
109+
// - MaxConcurrentDials=80 means only 80 dials can run concurrently
110+
// - Each dial takes 300ms, but request timeout is 200ms
111+
// - Requests timeout waiting for dial slots
112+
// - Expected: High timeout rate (>50%)
113+
114+
if timeoutRate < 50 {
115+
t.Logf("WARNING: Expected high timeout rate (>50%%), got %.1f%%. Test may not be stressing the system enough.", timeoutRate)
116+
}
117+
}
118+
119+
// TestGoodConfigurationHighLoad demonstrates how proper configuration fixes the problem
120+
func TestGoodConfigurationHighLoad(t *testing.T) {
121+
var dialCount atomic.Int32
122+
var dialsFailed atomic.Int32
123+
var dialsSucceeded atomic.Int32
124+
125+
// Same slow dialer - 300ms per dial
126+
slowDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
127+
dialCount.Add(1)
128+
select {
129+
case <-time.After(300 * time.Millisecond):
130+
dialsSucceeded.Add(1)
131+
return &net.TCPConn{}, nil
132+
case <-ctx.Done():
133+
dialsFailed.Add(1)
134+
return nil, ctx.Err()
135+
}
136+
}
137+
138+
// GOOD CONFIGURATION: Tuned for high load
139+
opt := &Options{
140+
Addr: "localhost:6379",
141+
Dialer: slowDialer,
142+
PoolSize: 300, // Increased from 80
143+
MaxConcurrentDials: 300, // Increased from 80
144+
MinIdleConns: 50, // Pre-warm the pool
145+
DialTimeout: 5 * time.Second,
146+
}
147+
148+
client := NewClient(opt)
149+
defer client.Close()
150+
151+
// Wait for pool to warm up
152+
time.Sleep(100 * time.Millisecond)
153+
154+
// Same load: 200 concurrent requests with 200ms timeout
155+
const numRequests = 200
156+
const requestTimeout = 200 * time.Millisecond
157+
158+
var wg sync.WaitGroup
159+
var timeouts atomic.Int32
160+
var successes atomic.Int32
161+
var errors atomic.Int32
162+
163+
startTime := time.Now()
164+
165+
for i := 0; i < numRequests; i++ {
166+
wg.Add(1)
167+
go func(id int) {
168+
defer wg.Done()
169+
170+
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
171+
defer cancel()
172+
173+
_, err := client.Get(ctx, fmt.Sprintf("key-%d", id)).Result()
174+
175+
if err != nil {
176+
if ctx.Err() == context.DeadlineExceeded || err == context.DeadlineExceeded {
177+
timeouts.Add(1)
178+
} else {
179+
errors.Add(1)
180+
}
181+
} else {
182+
successes.Add(1)
183+
}
184+
}(i)
185+
186+
// Stagger requests slightly
187+
if i%20 == 0 {
188+
time.Sleep(5 * time.Millisecond)
189+
}
190+
}
191+
192+
wg.Wait()
193+
totalTime := time.Since(startTime)
194+
195+
timeoutRate := float64(timeouts.Load()) / float64(numRequests) * 100
196+
successRate := float64(successes.Load()) / float64(numRequests) * 100
197+
198+
t.Logf("\n=== GOOD CONFIGURATION (Tuned Settings) ===")
199+
t.Logf("Configuration:")
200+
t.Logf(" PoolSize: %d", opt.PoolSize)
201+
t.Logf(" MaxConcurrentDials: %d", opt.MaxConcurrentDials)
202+
t.Logf(" MinIdleConns: %d", opt.MinIdleConns)
203+
t.Logf("\nResults:")
204+
t.Logf(" Total time: %v", totalTime)
205+
t.Logf(" Successes: %d (%.1f%%)", successes.Load(), successRate)
206+
t.Logf(" Timeouts: %d (%.1f%%)", timeouts.Load(), timeoutRate)
207+
t.Logf(" Other errors: %d", errors.Load())
208+
t.Logf(" Total dials: %d (succeeded: %d, failed: %d)",
209+
dialCount.Load(), dialsSucceeded.Load(), dialsFailed.Load())
210+
211+
// With good configuration:
212+
// - Higher MaxConcurrentDials allows more concurrent dials
213+
// - MinIdleConns pre-warms the pool
214+
// - Expected: Low timeout rate (<20%)
215+
216+
if timeoutRate > 20 {
217+
t.Errorf("Expected low timeout rate (<20%%), got %.1f%%", timeoutRate)
218+
}
219+
}
220+
221+
// TestConfigurationComparison runs both tests and shows the difference
222+
func TestConfigurationComparison(t *testing.T) {
223+
t.Run("BadConfiguration", TestBadConfigurationHighLoad)
224+
t.Run("GoodConfiguration", TestGoodConfigurationHighLoad)
225+
}
226+
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package pool_test
2+
3+
import (
4+
"context"
5+
"net"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/redis/go-redis/v9/internal/pool"
11+
)
12+
13+
// TestDoubleFreeTurnSimple tests the double-free bug with a simple scenario.
14+
// This test FAILS with the OLD code and PASSES with the NEW code.
15+
//
16+
// Scenario:
17+
// 1. Request A times out, Dial A completes and delivers connection to Request B
18+
// 2. Request B's own Dial B completes later
19+
// 3. With the bug: Dial B frees Request B's turn (even though Request B is using connection A)
20+
// 4. Then Request B calls Put() and frees the turn AGAIN (double-free)
21+
// 5. This allows more concurrent operations than PoolSize permits
22+
//
23+
// Detection method:
24+
// - Try to acquire PoolSize+1 connections after the double-free
25+
// - With the bug: All succeed (pool size violated)
26+
// - With the fix: Only PoolSize succeed
27+
func TestDoubleFreeTurnSimple(t *testing.T) {
28+
ctx := context.Background()
29+
30+
var dialCount atomic.Int32
31+
dialBComplete := make(chan struct{})
32+
requestBGotConn := make(chan struct{})
33+
requestBCalledPut := make(chan struct{})
34+
35+
controlledDialer := func(ctx context.Context) (net.Conn, error) {
36+
count := dialCount.Add(1)
37+
38+
if count == 1 {
39+
// Dial A: takes 150ms
40+
time.Sleep(150 * time.Millisecond)
41+
t.Logf("Dial A completed")
42+
} else if count == 2 {
43+
// Dial B: takes 300ms (longer than Dial A)
44+
time.Sleep(300 * time.Millisecond)
45+
t.Logf("Dial B completed")
46+
close(dialBComplete)
47+
} else {
48+
// Other dials: fast
49+
time.Sleep(10 * time.Millisecond)
50+
}
51+
52+
return newDummyConn(), nil
53+
}
54+
55+
testPool := pool.NewConnPool(&pool.Options{
56+
Dialer: controlledDialer,
57+
PoolSize: 2, // Only 2 concurrent operations allowed
58+
MaxConcurrentDials: 5,
59+
DialTimeout: 1 * time.Second,
60+
PoolTimeout: 1 * time.Second,
61+
})
62+
defer testPool.Close()
63+
64+
// Request A: Short timeout (100ms), will timeout before dial completes (150ms)
65+
go func() {
66+
shortCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
67+
defer cancel()
68+
69+
_, err := testPool.Get(shortCtx)
70+
if err != nil {
71+
t.Logf("Request A: Timed out as expected: %v", err)
72+
}
73+
}()
74+
75+
// Wait for Request A to start
76+
time.Sleep(20 * time.Millisecond)
77+
78+
// Request B: Long timeout, will receive connection from Request A's dial
79+
requestBDone := make(chan struct{})
80+
go func() {
81+
defer close(requestBDone)
82+
83+
longCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
84+
defer cancel()
85+
86+
cn, err := testPool.Get(longCtx)
87+
if err != nil {
88+
t.Errorf("Request B: Should have received connection but got error: %v", err)
89+
return
90+
}
91+
92+
t.Logf("Request B: Got connection from Request A's dial")
93+
close(requestBGotConn)
94+
95+
// Wait for dial B to complete
96+
<-dialBComplete
97+
98+
t.Logf("Request B: Dial B completed")
99+
100+
// Wait a bit to allow Dial B goroutine to finish and call freeTurn()
101+
time.Sleep(100 * time.Millisecond)
102+
103+
// Signal that we're ready for the test to check semaphore state
104+
close(requestBCalledPut)
105+
106+
// Wait for the test to check QueueLen
107+
time.Sleep(200 * time.Millisecond)
108+
109+
t.Logf("Request B: Now calling Put()")
110+
testPool.Put(ctx, cn)
111+
t.Logf("Request B: Put() called")
112+
}()
113+
114+
// Wait for Request B to get the connection
115+
<-requestBGotConn
116+
117+
// Wait for Dial B to complete and freeTurn() to be called
118+
<-requestBCalledPut
119+
120+
// NOW WE'RE IN THE CRITICAL WINDOW
121+
// Request B is holding a connection (from Dial A)
122+
// Dial B has completed and returned (freeTurn() has been called)
123+
// With the bug:
124+
// - Dial B freed Request B's turn (BUG!)
125+
// - QueueLen should be 0
126+
// With the fix:
127+
// - Dial B did NOT free Request B's turn
128+
// - QueueLen should be 1 (Request B still holds the turn)
129+
130+
t.Logf("\n=== CRITICAL CHECK: QueueLen ===")
131+
t.Logf("Request B is holding a connection, Dial B has completed and returned")
132+
queueLen := testPool.QueueLen()
133+
t.Logf("QueueLen: %d", queueLen)
134+
135+
// Wait for Request B to finish
136+
select {
137+
case <-requestBDone:
138+
case <-time.After(1 * time.Second):
139+
t.Logf("Request B timed out")
140+
}
141+
142+
t.Logf("\n=== Results ===")
143+
t.Logf("QueueLen during critical window: %d", queueLen)
144+
t.Logf("Expected with fix: 1 (Request B still holds the turn)")
145+
t.Logf("Expected with bug: 0 (Dial B freed Request B's turn)")
146+
147+
if queueLen == 0 {
148+
t.Errorf("DOUBLE-FREE BUG DETECTED!")
149+
t.Errorf("QueueLen is 0, meaning Dial B freed Request B's turn")
150+
t.Errorf("But Request B is still holding a connection, so its turn should NOT be freed yet")
151+
} else if queueLen == 1 {
152+
t.Logf("✓ CORRECT: QueueLen is 1")
153+
t.Logf("Request B is still holding the turn (will be freed when Request B calls Put())")
154+
} else {
155+
t.Logf("Unexpected QueueLen: %d (expected 1 with fix, 0 with bug)", queueLen)
156+
}
157+
}
158+

0 commit comments

Comments
 (0)