Skip to content

Commit f436f26

Browse files
gogbogGeorgi Dimitrovthomaspoignant
authored
feat: shared pgx-pool (#4393)
* use pgxpool and singelton to initialize it just once * format with gofmt * fix imports * fix formatting issues * simplify postgres db initalization * format * multi postgres pools support * format issues * Pass ctx to DB shutdown * multiple pools test * merge main * change error message / fix test --------- Co-authored-by: Georgi Dimitrov <georgi.dimitrov@swisscom.com> Co-authored-by: Thomas Poignant <thomas.poignant@gofeatureflag.org>
1 parent 0a2f40b commit f436f26

File tree

6 files changed

+165
-17
lines changed

6 files changed

+165
-17
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ require (
181181
github.com/inconshreveable/mousetrap v1.1.0 // indirect
182182
github.com/jackc/pgpassfile v1.0.0 // indirect
183183
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
184+
github.com/jackc/puddle/v2 v2.2.2 // indirect
184185
github.com/jaegertracing/jaeger-idl v0.6.0 // indirect
185186
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
186187
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect

go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,6 @@ github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8
658658
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
659659
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
660660
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
661-
github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw=
662661
github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
663662
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
664663
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package postgresqlretriever
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/jackc/pgx/v5/pgxpool"
8+
)
9+
10+
type poolEntry struct {
11+
pool *pgxpool.Pool
12+
refCount int
13+
}
14+
15+
var (
16+
mu sync.Mutex
17+
poolMap = make(map[string]*poolEntry)
18+
)
19+
20+
// GetPool returns a pool for a given URI, creating it if needed.
21+
func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
22+
mu.Lock()
23+
defer mu.Unlock()
24+
25+
// If already exists, bump refCount
26+
if entry, ok := poolMap[uri]; ok {
27+
entry.refCount++
28+
return entry.pool, nil
29+
}
30+
31+
// Create a new pool
32+
pool, err := pgxpool.New(ctx, uri)
33+
if err != nil {
34+
return nil, err
35+
}
36+
if err := pool.Ping(ctx); err != nil {
37+
pool.Close()
38+
return nil, err
39+
}
40+
41+
poolMap[uri] = &poolEntry{
42+
pool: pool,
43+
refCount: 1,
44+
}
45+
46+
return pool, nil
47+
}
48+
49+
// ReleasePool decreases refCount and closes/removes when it hits zero.
50+
func ReleasePool(ctx context.Context, uri string) {
51+
mu.Lock()
52+
defer mu.Unlock()
53+
54+
entry, ok := poolMap[uri]
55+
if !ok {
56+
return // nothing to do
57+
}
58+
59+
entry.refCount--
60+
if entry.refCount <= 0 {
61+
entry.pool.Close()
62+
delete(poolMap, uri)
63+
}
64+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package postgresqlretriever_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/testcontainers/testcontainers-go"
10+
"github.com/testcontainers/testcontainers-go/wait"
11+
rr "github.com/thomaspoignant/go-feature-flag/retriever/postgresqlretriever"
12+
)
13+
14+
func TestGetPool_MultipleURIsAndReuse(t *testing.T) {
15+
ctx := context.Background()
16+
17+
// Setup Container
18+
req := testcontainers.ContainerRequest{
19+
Image: "postgres:15-alpine",
20+
ExposedPorts: []string{"5432/tcp"},
21+
Env: map[string]string{"POSTGRES_PASSWORD": "password"},
22+
// This waits until the log says the system is ready, preventing connection errors
23+
WaitingFor: wait.ForAll(
24+
wait.ForLog("database system is ready to accept connections").WithStartupTimeout(10*time.Second),
25+
wait.ForListeningPort("5432/tcp").WithStartupTimeout(10*time.Second),
26+
),
27+
}
28+
29+
pg, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
30+
ContainerRequest: req,
31+
Started: true,
32+
})
33+
assert.NoError(t, err)
34+
defer pg.Terminate(ctx)
35+
36+
endpoint, err := pg.Endpoint(ctx, "")
37+
assert.NoError(t, err)
38+
baseURI := "postgres://postgres:password@" + endpoint + "/postgres?sslmode=disable"
39+
40+
// Different URIs
41+
uri1 := baseURI + "&application_name=A"
42+
uri2 := baseURI + "&application_name=B"
43+
44+
// First Calls (RefCount = 1)
45+
pool1a, err := rr.GetPool(ctx, uri1)
46+
assert.NoError(t, err)
47+
assert.NotNil(t, pool1a)
48+
49+
pool2a, err := rr.GetPool(ctx, uri2)
50+
assert.NoError(t, err)
51+
assert.NotNil(t, pool2a)
52+
53+
// Verify distinct pools
54+
assert.NotEqual(t, pool1a, pool2a)
55+
56+
// Reuse Logic (RefCount = 2 for URI1)
57+
pool1b, err := rr.GetPool(ctx, uri1)
58+
assert.NoError(t, err)
59+
assert.Equal(t, pool1a, pool1b, "Should return exact same pool instance")
60+
61+
// Release Logic
62+
// URI1 RefCount: 2 -> 1
63+
rr.ReleasePool(ctx, uri1)
64+
65+
// URI1 RefCount: 1 -> 0 (Closed & Removed)
66+
rr.ReleasePool(ctx, uri1)
67+
68+
// Recreation Logic
69+
// URI1 should now create a NEW pool
70+
pool1c, err := rr.GetPool(ctx, uri1)
71+
assert.NoError(t, err)
72+
assert.NotEqual(t, pool1a, pool1c, "Should be a new pool instance after full release")
73+
74+
// Cleanup new pool
75+
rr.ReleasePool(ctx, uri1)
76+
77+
// URI2 Cleanup verification
78+
rr.ReleasePool(ctx, uri2) // RefCount -> 0
79+
80+
pool2b, err := rr.GetPool(ctx, uri2)
81+
assert.NoError(t, err)
82+
assert.NotEqual(t, pool2a, pool2b, "URI2 should be recreated")
83+
84+
rr.ReleasePool(ctx, uri2)
85+
}

retriever/postgresqlretriever/retriever.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log/slog"
88

99
"github.com/jackc/pgx/v5"
10+
"github.com/jackc/pgx/v5/pgxpool"
1011
"github.com/thomaspoignant/go-feature-flag/retriever"
1112
"github.com/thomaspoignant/go-feature-flag/utils"
1213
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
@@ -26,7 +27,7 @@ type Retriever struct {
2627
logger *fflog.FFLogger
2728
status retriever.Status
2829
columns map[string]string
29-
conn *pgx.Conn
30+
pool *pgxpool.Pool
3031
flagset *string
3132
}
3233

@@ -40,23 +41,19 @@ func (r *Retriever) Init(ctx context.Context, logger *fflog.FFLogger, flagset *s
4041
r.columns = r.getColumnNames()
4142
r.flagset = flagset
4243

43-
if r.conn == nil {
44+
if r.pool == nil {
4445
r.logger.Info("Initializing PostgreSQL retriever")
4546
r.logger.Debug("Using columns", "columns", r.columns)
4647

47-
conn, err := pgx.Connect(ctx, r.URI)
48+
pool, err := GetPool(ctx, r.URI)
4849
if err != nil {
4950
r.status = retriever.RetrieverError
5051
return err
5152
}
5253

53-
if err := conn.Ping(ctx); err != nil {
54-
r.status = retriever.RetrieverError
55-
return err
56-
}
57-
58-
r.conn = conn
54+
r.pool = pool
5955
}
56+
6057
r.status = retriever.RetrieverReady
6158
return nil
6259
}
@@ -71,16 +68,17 @@ func (r *Retriever) Status() retriever.Status {
7168

7269
// Shutdown closes the database connection.
7370
func (r *Retriever) Shutdown(ctx context.Context) error {
74-
if r.conn == nil {
75-
return nil
71+
if r.pool != nil {
72+
ReleasePool(ctx, r.URI)
73+
r.pool = nil
7674
}
77-
return r.conn.Close(ctx)
75+
return nil
7876
}
7977

8078
// Retrieve fetches flag configuration from PostgreSQL.
8179
func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
82-
if r.conn == nil {
83-
return nil, fmt.Errorf("database connection is not initialized")
80+
if r.pool == nil {
81+
return nil, fmt.Errorf("database connection pool is not initialized")
8482
}
8583

8684
// Build the query using the configured table and column names
@@ -94,7 +92,7 @@ func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
9492

9593
r.logger.Debug("Executing PostgreSQL query", slog.String("query", query), slog.Any("args", args))
9694

97-
rows, err := r.conn.Query(ctx, query, args...)
95+
rows, err := r.pool.Query(ctx, query, args...)
9896
if err != nil {
9997
return nil, fmt.Errorf("failed to execute query: %w", err)
10098
}
@@ -121,6 +119,7 @@ func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
121119
flagConfigs[flagName] = config
122120
}
123121

122+
// Check for any errors that occurred during row iteration
124123
if err := rows.Err(); err != nil {
125124
return nil, fmt.Errorf("rows iteration error: %w", err)
126125
}

retriever/postgresqlretriever/retriever_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func TestRetrieverErrorHandling(t *testing.T) {
215215

216216
_, err := r.Retrieve(context.Background())
217217
assert.Error(t, err)
218-
assert.Contains(t, err.Error(), "database connection is not initialized")
218+
assert.Contains(t, err.Error(), "database connection pool is not initialized")
219219
})
220220

221221
t.Run("Status - nil receiver", func(t *testing.T) {

0 commit comments

Comments
 (0)