Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jaegertracing/jaeger-idl v0.6.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw=
github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
Expand Down
64 changes: 64 additions & 0 deletions retriever/postgresqlretriever/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package postgresqlretriever

import (
"context"
"sync"

"github.com/jackc/pgx/v5/pgxpool"
)

type poolEntry struct {
pool *pgxpool.Pool
refCount int
}

var (
mu sync.Mutex
poolMap = make(map[string]*poolEntry)
Comment on lines +15 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of global variables (mu and poolMap) to implement the singleton pattern for the connection pool is a common approach for shared resources. However, global state can sometimes make testing more complex and introduce implicit dependencies. While acceptable for this specific use case, it's a design choice that should be considered carefully in larger architectures.

)

// GetPool returns a pool for a given URI, creating it if needed.
func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
mu.Lock()
defer mu.Unlock()

// If already exists, bump refCount
if entry, ok := poolMap[uri]; ok {
entry.refCount++
return entry.pool, nil
}

// Create a new pool
pool, err := pgxpool.New(ctx, uri)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, err
}

poolMap[uri] = &poolEntry{
pool: pool,
refCount: 1,
}

return pool, nil
}
Comment on lines +21 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of GetPool holds a global mutex lock while creating and pinging a new connection pool. These operations involve network I/O and can be slow, causing contention and becoming a performance bottleneck if multiple retrievers with different URIs are initialized concurrently. This could negate some of the performance benefits of connection pooling.

To improve this, you can use a double-checked locking pattern. This avoids holding the lock during the potentially long-running pool creation and pinging process.

func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
	mu.Lock()
	if entry, ok := poolMap[uri]; ok {
		entry.refCount++
		mu.Unlock()
		return entry.pool, nil
	}
	mu.Unlock()

	// Create a new pool outside the lock.
	pool, err := pgxpool.New(ctx, uri)
	if err != nil {
		return nil, err
	}
	if err := pool.Ping(ctx); err != nil {
		pool.Close()
		return nil, err
	}

	mu.Lock()
	defer mu.Unlock()

	// Re-check if another goroutine created the pool while we were unlocked.
	if entry, ok := poolMap[uri]; ok {
		pool.Close() // Close the one we just created, it's not needed.
		entry.refCount++
		return entry.pool, nil
	}

	poolMap[uri] = &poolEntry{
		pool:     pool,
		refCount: 1,
	}

	return pool, nil
}


// ReleasePool decreases refCount and closes/removes when it hits zero.
func ReleasePool(ctx context.Context, uri string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ctx context.Context parameter in the ReleasePool function is not used, as pgxpool.Pool.Close() does not accept a context. To improve code clarity and adhere to Go conventions, it's best to remove this unused parameter.

You will also need to update the call site in retriever/postgresqlretriever/retriever.go inside the Shutdown method to ReleasePool(r.URI).

func ReleasePool(uri string) {

mu.Lock()
defer mu.Unlock()

entry, ok := poolMap[uri]
if !ok {
return // nothing to do
}

entry.refCount--
if entry.refCount <= 0 {
entry.pool.Close()
delete(poolMap, uri)
}
}
85 changes: 85 additions & 0 deletions retriever/postgresqlretriever/postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package postgresqlretriever_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
rr "github.com/thomaspoignant/go-feature-flag/retriever/postgresqlretriever"
)

func TestGetPool_MultipleURIsAndReuse(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The existing test TestGetPool_MultipleURIsAndReuse is great for verifying the basic logic, but it runs serially. Since the new pooling mechanism is designed for concurrent use and is protected by a mutex, it's important to have a test that validates its behavior under concurrent access.

Please consider adding a new test that calls GetPool and ReleasePool from multiple goroutines simultaneously to ensure there are no race conditions and that the pool is correctly shared and managed.

Here is an example of how you could structure such a test:

func TestGetPool_Concurrency(t *testing.T) {
	// ... (test container setup)

	var wg sync.WaitGroup
	numGoroutines := 100
	wg.Add(numGoroutines)

	var firstPool *pgxpool.Pool
	var poolMu sync.Mutex

	// Concurrently get the same pool
	for i := 0; i < numGoroutines; i++ {
		go func() {
			defer wg.Done()
			pool, err := rr.GetPool(ctx, uri1)
			assert.NoError(t, err)
			assert.NotNil(t, pool)

			poolMu.Lock()
			if firstPool == nil {
				firstPool = pool
			} else {
				// Use assert.Same to check for pointer equality
				assert.Same(t, firstPool, pool, "All goroutines should get the same pool instance")
			}
			poolMu.Unlock()
		}()
	}
	wg.Wait()

	// Concurrently release the pool
	wg.Add(numGoroutines)
	for i := 0; i < numGoroutines; i++ {
		go func() {
			defer wg.Done()
			rr.ReleasePool(uri1) // Assuming ctx is removed
		}()
	}
	wg.Wait()

	// Verify the pool is closed and can be recreated
	recreatedPool, err := rr.GetPool(ctx, uri1)
	assert.NoError(t, err)
	assert.NotNil(t, recreatedPool)
	assert.NotSame(t, firstPool, recreatedPool, "A new pool should be created after all references are released")
	rr.ReleasePool(uri1)
}

ctx := context.Background()

// Setup Container
req := testcontainers.ContainerRequest{
Image: "postgres:15-alpine",
ExposedPorts: []string{"5432/tcp"},
Env: map[string]string{"POSTGRES_PASSWORD": "password"},
// This waits until the log says the system is ready, preventing connection errors
WaitingFor: wait.ForAll(
wait.ForLog("database system is ready to accept connections").WithStartupTimeout(10*time.Second),
wait.ForListeningPort("5432/tcp").WithStartupTimeout(10*time.Second),
),
}

pg, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
assert.NoError(t, err)
defer pg.Terminate(ctx)

endpoint, err := pg.Endpoint(ctx, "")
assert.NoError(t, err)
baseURI := "postgres://postgres:password@" + endpoint + "/postgres?sslmode=disable"

// Different URIs
uri1 := baseURI + "&application_name=A"
uri2 := baseURI + "&application_name=B"

// First Calls (RefCount = 1)
pool1a, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotNil(t, pool1a)

pool2a, err := rr.GetPool(ctx, uri2)
assert.NoError(t, err)
assert.NotNil(t, pool2a)

// Verify distinct pools
assert.NotEqual(t, pool1a, pool2a)

// Reuse Logic (RefCount = 2 for URI1)
pool1b, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.Equal(t, pool1a, pool1b, "Should return exact same pool instance")

// Release Logic
// URI1 RefCount: 2 -> 1
rr.ReleasePool(ctx, uri1)

// URI1 RefCount: 1 -> 0 (Closed & Removed)
rr.ReleasePool(ctx, uri1)

// Recreation Logic
// URI1 should now create a NEW pool
pool1c, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotEqual(t, pool1a, pool1c, "Should be a new pool instance after full release")

// Cleanup new pool
rr.ReleasePool(ctx, uri1)

// URI2 Cleanup verification
rr.ReleasePool(ctx, uri2) // RefCount -> 0

pool2b, err := rr.GetPool(ctx, uri2)
assert.NoError(t, err)
assert.NotEqual(t, pool2a, pool2b, "URI2 should be recreated")

rr.ReleasePool(ctx, uri2)
}
36 changes: 18 additions & 18 deletions retriever/postgresqlretriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/thomaspoignant/go-feature-flag/retriever"
"github.com/thomaspoignant/go-feature-flag/utils"
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
Expand All @@ -26,7 +27,7 @@ type Retriever struct {
logger *fflog.FFLogger
status retriever.Status
columns map[string]string
conn *pgx.Conn
pool *pgxpool.Pool
flagset *string
}

Expand All @@ -40,23 +41,19 @@ func (r *Retriever) Init(ctx context.Context, logger *fflog.FFLogger, flagset *s
r.columns = r.getColumnNames()
r.flagset = flagset

if r.conn == nil {
if r.pool == nil {
r.logger.Info("Initializing PostgreSQL retriever")
r.logger.Debug("Using columns", "columns", r.columns)

conn, err := pgx.Connect(ctx, r.URI)
pool, err := GetPool(ctx, r.URI)
if err != nil {
r.status = retriever.RetrieverError
return err
}

if err := conn.Ping(ctx); err != nil {
r.status = retriever.RetrieverError
return err
}

r.conn = conn
r.pool = pool
}

r.status = retriever.RetrieverReady
return nil
}
Expand All @@ -71,37 +68,39 @@ func (r *Retriever) Status() retriever.Status {

// Shutdown closes the database connection.
func (r *Retriever) Shutdown(ctx context.Context) error {
if r.conn == nil {
return nil
if r.pool != nil {
ReleasePool(ctx, r.URI)
r.pool = nil
}
return r.conn.Close(ctx)
return nil
}

// Retrieve fetches flag configuration from PostgreSQL.
func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
if r.conn == nil {
return nil, fmt.Errorf("database connection is not initialized")
if r.pool == nil {
return nil, fmt.Errorf("database connection pool is not initialized")
}

// Build the query using the configured table and column names
query := r.buildQuery()

// Build the arguments for the query
args := []interface{}{}
args := []any{}
if r.getFlagset() != "" {
args = []interface{}{r.getFlagset()}
// If a flagset is defined, it becomes the first ($1) argument.
args = []any{r.getFlagset()}
}

r.logger.Debug("Executing PostgreSQL query", slog.String("query", query), slog.Any("args", args))

rows, err := r.conn.Query(ctx, query, args...)
rows, err := r.pool.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()

// Map to store flag configurations with flag_name as key
flagConfigs := make(map[string]interface{})
flagConfigs := make(map[string]any)

for rows.Next() {
var flagName string
Expand All @@ -121,6 +120,7 @@ func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
flagConfigs[flagName] = config
}

// Check for any errors that occurred during row iteration
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration error: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions retriever/postgresqlretriever/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestRetrieverErrorHandling(t *testing.T) {

_, err := r.Retrieve(context.Background())
assert.Error(t, err)
assert.Contains(t, err.Error(), "database connection is not initialized")
assert.Contains(t, err.Error(), "database connection pool is not initialized")
})

t.Run("Status - nil receiver", func(t *testing.T) {
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestRetrieverDataHandlingErrors(t *testing.T) {
assert.Contains(t, err.Error(), "failed to execute query")
}
})
}
}

// TestRetrieverEdgeCases tests additional edge cases and boundary conditions
func TestRetrieverEdgeCases(t *testing.T) {
Expand Down
Loading