-
-
Notifications
You must be signed in to change notification settings - Fork 186
feat: shared pgx-pool #4393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: shared pgx-pool #4393
Changes from all commits
6823c96
7a3eb53
a0082b1
e9ff21c
117ae1a
4617a05
5d97bb0
cde1664
2852577
fa090e2
228208a
de4ed7f
e3f6a11
a0bd5ce
39388bb
b990bbe
d290db0
73d9ed6
9f25828
6ffb852
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| package postgresqlretriever | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
|
|
||
| "github.com/jackc/pgx/v5/pgxpool" | ||
| ) | ||
|
|
||
| type poolEntry struct { | ||
| pool *pgxpool.Pool | ||
| refCount int | ||
| } | ||
|
|
||
| var ( | ||
| mu sync.Mutex | ||
| poolMap = make(map[string]*poolEntry) | ||
| ) | ||
|
|
||
| // GetPool returns a pool for a given URI, creating it if needed. | ||
| func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| // If already exists, bump refCount | ||
| if entry, ok := poolMap[uri]; ok { | ||
| entry.refCount++ | ||
| return entry.pool, nil | ||
| } | ||
|
|
||
| // Create a new pool | ||
| pool, err := pgxpool.New(ctx, uri) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if err := pool.Ping(ctx); err != nil { | ||
| pool.Close() | ||
| return nil, err | ||
| } | ||
|
|
||
| poolMap[uri] = &poolEntry{ | ||
| pool: pool, | ||
| refCount: 1, | ||
| } | ||
|
|
||
| return pool, nil | ||
| } | ||
|
Comment on lines
+21
to
+47
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of To improve this, you can use a double-checked locking pattern. This avoids holding the lock during the potentially long-running pool creation and pinging process. func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
mu.Lock()
if entry, ok := poolMap[uri]; ok {
entry.refCount++
mu.Unlock()
return entry.pool, nil
}
mu.Unlock()
// Create a new pool outside the lock.
pool, err := pgxpool.New(ctx, uri)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, err
}
mu.Lock()
defer mu.Unlock()
// Re-check if another goroutine created the pool while we were unlocked.
if entry, ok := poolMap[uri]; ok {
pool.Close() // Close the one we just created, it's not needed.
entry.refCount++
return entry.pool, nil
}
poolMap[uri] = &poolEntry{
pool: pool,
refCount: 1,
}
return pool, nil
} |
||
|
|
||
| // ReleasePool decreases refCount and closes/removes when it hits zero. | ||
| func ReleasePool(ctx context.Context, uri string) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The You will also need to update the call site in func ReleasePool(uri string) { |
||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| entry, ok := poolMap[uri] | ||
| if !ok { | ||
| return // nothing to do | ||
| } | ||
|
|
||
| entry.refCount-- | ||
| if entry.refCount <= 0 { | ||
| entry.pool.Close() | ||
| delete(poolMap, uri) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| package postgresqlretriever_test | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/testcontainers/testcontainers-go" | ||
| "github.com/testcontainers/testcontainers-go/wait" | ||
| rr "github.com/thomaspoignant/go-feature-flag/retriever/postgresqlretriever" | ||
| ) | ||
|
|
||
| func TestGetPool_MultipleURIsAndReuse(t *testing.T) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The existing test Please consider adding a new test that calls Here is an example of how you could structure such a test: func TestGetPool_Concurrency(t *testing.T) {
// ... (test container setup)
var wg sync.WaitGroup
numGoroutines := 100
wg.Add(numGoroutines)
var firstPool *pgxpool.Pool
var poolMu sync.Mutex
// Concurrently get the same pool
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
pool, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotNil(t, pool)
poolMu.Lock()
if firstPool == nil {
firstPool = pool
} else {
// Use assert.Same to check for pointer equality
assert.Same(t, firstPool, pool, "All goroutines should get the same pool instance")
}
poolMu.Unlock()
}()
}
wg.Wait()
// Concurrently release the pool
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
rr.ReleasePool(uri1) // Assuming ctx is removed
}()
}
wg.Wait()
// Verify the pool is closed and can be recreated
recreatedPool, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotNil(t, recreatedPool)
assert.NotSame(t, firstPool, recreatedPool, "A new pool should be created after all references are released")
rr.ReleasePool(uri1)
} |
||
| ctx := context.Background() | ||
|
|
||
| // Setup Container | ||
| req := testcontainers.ContainerRequest{ | ||
| Image: "postgres:15-alpine", | ||
| ExposedPorts: []string{"5432/tcp"}, | ||
| Env: map[string]string{"POSTGRES_PASSWORD": "password"}, | ||
| // This waits until the log says the system is ready, preventing connection errors | ||
| WaitingFor: wait.ForAll( | ||
| wait.ForLog("database system is ready to accept connections").WithStartupTimeout(10*time.Second), | ||
| wait.ForListeningPort("5432/tcp").WithStartupTimeout(10*time.Second), | ||
| ), | ||
| } | ||
|
|
||
| pg, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ | ||
| ContainerRequest: req, | ||
| Started: true, | ||
| }) | ||
| assert.NoError(t, err) | ||
| defer pg.Terminate(ctx) | ||
|
|
||
| endpoint, err := pg.Endpoint(ctx, "") | ||
| assert.NoError(t, err) | ||
| baseURI := "postgres://postgres:password@" + endpoint + "/postgres?sslmode=disable" | ||
|
|
||
| // Different URIs | ||
| uri1 := baseURI + "&application_name=A" | ||
| uri2 := baseURI + "&application_name=B" | ||
|
|
||
| // First Calls (RefCount = 1) | ||
| pool1a, err := rr.GetPool(ctx, uri1) | ||
| assert.NoError(t, err) | ||
| assert.NotNil(t, pool1a) | ||
|
|
||
| pool2a, err := rr.GetPool(ctx, uri2) | ||
| assert.NoError(t, err) | ||
| assert.NotNil(t, pool2a) | ||
|
|
||
| // Verify distinct pools | ||
| assert.NotEqual(t, pool1a, pool2a) | ||
|
|
||
| // Reuse Logic (RefCount = 2 for URI1) | ||
| pool1b, err := rr.GetPool(ctx, uri1) | ||
| assert.NoError(t, err) | ||
| assert.Equal(t, pool1a, pool1b, "Should return exact same pool instance") | ||
|
|
||
| // Release Logic | ||
| // URI1 RefCount: 2 -> 1 | ||
| rr.ReleasePool(ctx, uri1) | ||
|
|
||
| // URI1 RefCount: 1 -> 0 (Closed & Removed) | ||
| rr.ReleasePool(ctx, uri1) | ||
|
|
||
| // Recreation Logic | ||
| // URI1 should now create a NEW pool | ||
| pool1c, err := rr.GetPool(ctx, uri1) | ||
| assert.NoError(t, err) | ||
| assert.NotEqual(t, pool1a, pool1c, "Should be a new pool instance after full release") | ||
|
|
||
| // Cleanup new pool | ||
| rr.ReleasePool(ctx, uri1) | ||
|
|
||
| // URI2 Cleanup verification | ||
| rr.ReleasePool(ctx, uri2) // RefCount -> 0 | ||
|
|
||
| pool2b, err := rr.GetPool(ctx, uri2) | ||
| assert.NoError(t, err) | ||
| assert.NotEqual(t, pool2a, pool2b, "URI2 should be recreated") | ||
|
|
||
| rr.ReleasePool(ctx, uri2) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of global variables (
muandpoolMap) to implement the singleton pattern for the connection pool is a common approach for shared resources. However, global state can sometimes make testing more complex and introduce implicit dependencies. While acceptable for this specific use case, it's a design choice that should be considered carefully in larger architectures.