-
-
Notifications
You must be signed in to change notification settings - Fork 186
feat: shared pgx-pool #4393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: shared pgx-pool #4393
Conversation
✅ Deploy Preview for go-feature-flag-doc-preview canceled.
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4393 +/- ##
==========================================
+ Coverage 85.51% 85.58% +0.07%
==========================================
Files 147 148 +1
Lines 6150 6174 +24
==========================================
+ Hits 5259 5284 +25
+ Misses 665 664 -1
Partials 226 226 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a shared connection pool for PostgreSQL retrievers, which is a significant improvement to prevent exhausting database connections. The approach of using a singleton with reference counting is sound. However, the current implementation has a critical flaw where it assumes all retrievers use the same database URI, causing them to share a single connection pool incorrectly if they are configured with different URIs. My review includes suggestions to make the pool manager URI-aware, ensuring a separate pool is managed for each unique database connection string. I've also suggested a small improvement to the Shutdown logic for better robustness. Additionally, I've noted that the checklist items for implementing reconnecting logic and adding unit tests are unchecked. It would be beneficial to add tests for the new pooling logic, especially covering multiple URIs and database disconnection scenarios, to ensure its reliability.
| package postgresqlretriever | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
|
|
||
| "github.com/jackc/pgx/v5/pgxpool" | ||
| ) | ||
|
|
||
| var ( | ||
| pool *pgxpool.Pool | ||
| mu sync.Mutex | ||
| refCount int | ||
| ) | ||
|
|
||
| func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| if pool == nil { | ||
| p, err := pgxpool.New(ctx, uri) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if err := p.Ping(ctx); err != nil { | ||
| p.Close() | ||
| return nil, err | ||
| } | ||
|
|
||
| pool = p | ||
| } | ||
|
|
||
| refCount++ | ||
| return pool, nil | ||
| } | ||
|
|
||
| func ReleasePool() { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| refCount-- | ||
| if refCount <= 0 { | ||
| if pool != nil { | ||
| pool.Close() | ||
| pool = nil | ||
| } | ||
| refCount = 0 | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current singleton implementation for the connection pool doesn't support multiple database URIs. The global pool variable will be initialized once with the URI from the first retriever, and all subsequent retrievers, regardless of their configured URI, will receive this same pool. This can lead to retrievers connecting to the wrong database.
To fix this, the pool management should be URI-aware. I suggest using maps to store a connection pool and a reference count for each unique URI. This ensures that each database has its own dedicated pool, which is shared only by retrievers connecting to that same database.
package postgresqlretriever
import (
"context"
"sync"
"github.com/jackc/pgx/v5/pgxpool"
)
var (
pools = make(map[string]*pgxpool.Pool)
refCounts = make(map[string]int)
mu sync.Mutex
)
// GetPool retrieves a shared connection pool for the given URI.
// It creates a new pool if one doesn't exist for the URI, otherwise, it returns the existing one.
// It uses reference counting to manage the lifecycle of the pool.
func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
mu.Lock()
defer mu.Unlock()
// If a pool for this URI already exists, increment its ref count and return it.
if pool, ok := pools[uri]; ok {
refCounts[uri]++
return pool, nil
}
// Otherwise, create a new pool for this URI.
p, err := pgxpool.New(ctx, uri)
if err != nil {
return nil, err
}
if err := p.Ping(ctx); err != nil {
p.Close()
return nil, err
}
pools[uri] = p
refCounts[uri] = 1
return p, nil
}
// ReleasePool decrements the reference count for a pool.
// If the reference count drops to zero, the pool is closed and removed.
func ReleasePool(uri string) {
mu.Lock()
defer mu.Unlock()
// Only proceed if the URI is being tracked.
if _, ok := refCounts[uri]; !ok {
return
}
refCounts[uri]--
if refCounts[uri] <= 0 {
if pool, ok := pools[uri]; ok {
pool.Close()
delete(pools, uri)
}
delete(refCounts, uri)
}
}|
@gogbog thanks for starting this, Gemini said exactly what I was about to say (probably in a better way). I also was asking myself if we want to offer a way to opt-out from using the pool and use direct connection? |
|
@thomaspoignant I totally missed that use-case. We can still make the solution support multiple pools depending on how many databases the user needs to connect to - for example: For the opting-out from using the pool and using a direct connection instead, we can definitely add support for it, but what would be the benefits? From what I know, the pool - even if you only ever use a single connection - is not a bad choice because it still manages reconnections, backoff, lifecycle, and error recovery for free, whereas a direct connection gives you none of that. |
|
Your proposal with a For the optout, forget it, you are right it will always be better to have a pool. |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the PostgreSQL retriever to utilize a shared connection pool (pgxpool.Pool) instead of individual pgx.Conn connections. A new file, postgres.go, was introduced to manage a global map of connection pools with reference counting, allowing multiple retrievers to share pools based on their URI. The retriever.go file was updated to integrate this new pooling mechanism for initialization, shutdown, and query execution. Additionally, the code was modernized by replacing interface{} with any for query arguments and map types. Review comments noted the use of global variables for connection pool management as a common design choice, suggested adding a context.Context to the ReleasePool function for API consistency, and praised the modernization to any types.
| var ( | ||
| mu sync.Mutex | ||
| poolMap = make(map[string]*poolEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of global variables (mu and poolMap) to implement the singleton pattern for the connection pool is a common approach for shared resources. However, global state can sometimes make testing more complex and introduce implicit dependencies. While acceptable for this specific use case, it's a design choice that should be considered carefully in larger architectures.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a shared connection pool for PostgreSQL retrievers to solve the issue of excessive open connections. The implementation uses a reference-counted singleton pattern to manage pgxpool.Pool instances, which is a good approach. The changes in retriever.go to adopt this new pooling mechanism are clean and correct.
My review includes a critical suggestion to improve the concurrency of the pool creation logic to avoid potential bottlenecks, a recommendation to clean up an unused parameter, and a suggestion to add a concurrent test to ensure the thread-safety of the new mechanism. Overall, this is a great improvement, and with these changes, it will be even more robust.
| func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| // If already exists, bump refCount | ||
| if entry, ok := poolMap[uri]; ok { | ||
| entry.refCount++ | ||
| return entry.pool, nil | ||
| } | ||
|
|
||
| // Create a new pool | ||
| pool, err := pgxpool.New(ctx, uri) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if err := pool.Ping(ctx); err != nil { | ||
| pool.Close() | ||
| return nil, err | ||
| } | ||
|
|
||
| poolMap[uri] = &poolEntry{ | ||
| pool: pool, | ||
| refCount: 1, | ||
| } | ||
|
|
||
| return pool, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of GetPool holds a global mutex lock while creating and pinging a new connection pool. These operations involve network I/O and can be slow, causing contention and becoming a performance bottleneck if multiple retrievers with different URIs are initialized concurrently. This could negate some of the performance benefits of connection pooling.
To improve this, you can use a double-checked locking pattern. This avoids holding the lock during the potentially long-running pool creation and pinging process.
func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
mu.Lock()
if entry, ok := poolMap[uri]; ok {
entry.refCount++
mu.Unlock()
return entry.pool, nil
}
mu.Unlock()
// Create a new pool outside the lock.
pool, err := pgxpool.New(ctx, uri)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, err
}
mu.Lock()
defer mu.Unlock()
// Re-check if another goroutine created the pool while we were unlocked.
if entry, ok := poolMap[uri]; ok {
pool.Close() // Close the one we just created, it's not needed.
entry.refCount++
return entry.pool, nil
}
poolMap[uri] = &poolEntry{
pool: pool,
refCount: 1,
}
return pool, nil
}| } | ||
|
|
||
| // ReleasePool decreases refCount and closes/removes when it hits zero. | ||
| func ReleasePool(ctx context.Context, uri string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ctx context.Context parameter in the ReleasePool function is not used, as pgxpool.Pool.Close() does not accept a context. To improve code clarity and adhere to Go conventions, it's best to remove this unused parameter.
You will also need to update the call site in retriever/postgresqlretriever/retriever.go inside the Shutdown method to ReleasePool(r.URI).
func ReleasePool(uri string) {| rr "github.com/thomaspoignant/go-feature-flag/retriever/postgresqlretriever" | ||
| ) | ||
|
|
||
| func TestGetPool_MultipleURIsAndReuse(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing test TestGetPool_MultipleURIsAndReuse is great for verifying the basic logic, but it runs serially. Since the new pooling mechanism is designed for concurrent use and is protected by a mutex, it's important to have a test that validates its behavior under concurrent access.
Please consider adding a new test that calls GetPool and ReleasePool from multiple goroutines simultaneously to ensure there are no race conditions and that the pool is correctly shared and managed.
Here is an example of how you could structure such a test:
func TestGetPool_Concurrency(t *testing.T) {
// ... (test container setup)
var wg sync.WaitGroup
numGoroutines := 100
wg.Add(numGoroutines)
var firstPool *pgxpool.Pool
var poolMu sync.Mutex
// Concurrently get the same pool
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
pool, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotNil(t, pool)
poolMu.Lock()
if firstPool == nil {
firstPool = pool
} else {
// Use assert.Same to check for pointer equality
assert.Same(t, firstPool, pool, "All goroutines should get the same pool instance")
}
poolMu.Unlock()
}()
}
wg.Wait()
// Concurrently release the pool
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
rr.ReleasePool(uri1) // Assuming ctx is removed
}()
}
wg.Wait()
// Verify the pool is closed and can be recreated
recreatedPool, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotNil(t, recreatedPool)
assert.NotSame(t, firstPool, recreatedPool, "A new pool should be created after all references are released")
rr.ReleasePool(uri1)
}
thomaspoignant
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @gogbog thanks for your pull request.
I think this solution will work and will add a lot of benefits to the postgres retriever.
That being said I think it cool be great to be able to configure the pool in some way, being able to configure a pgxpool.Config.
We can do it in another PR if you prefer, but I think this could be a great addition.
The complexity could be which pool configuration to use, but I guess we can say first config is the first used.
|



Description
This PR addresses an issue with how database connections are managed when using multiple Postgres retrievers.
Currently, each retriever creates its own dedicated connection, which leads to hundreds of open connections over time.
It resolves the problem by introducing a shared pgxpool implemented using a singleton pattern. All retrievers now reuse the same connection pool instead of creating new individual connections.
How to test:
Breaking changes:
None expected. Retrievers now use a pooled connection, but the change is internal and transparent to users.
Closes issue(s)
Resolve #4392
Checklist
README.mdand/website/docs)