From 1c646cb12678ec3218c78321831836ab3152aa87 Mon Sep 17 00:00:00 2001 From: Alexey Mironov Date: Sat, 9 May 2026 13:45:42 +0200 Subject: [PATCH 1/3] Route trailing readReadyForQuery error through handleError on extended-protocol cleanup --- conn.go | 10 ++++---- conn_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/conn.go b/conn.go index 3ed7ecfa..f6ea7d28 100644 --- a/conn.go +++ b/conn.go @@ -1760,7 +1760,7 @@ func (cn *conn) readParseResponse() error { return nil case proto.ErrorResponse: err := parseError(r, "") - _ = cn.readReadyForQuery() + _ = cn.handleError(cn.readReadyForQuery()) return err default: cn.err.set(driver.ErrBadConn) @@ -1788,7 +1788,7 @@ func (cn *conn) readStatementDescribeResponse() (paramTyps []oid.Oid, colNames [ return paramTyps, colNames, colTyps, nil case proto.ErrorResponse: err := parseError(r, "") - _ = cn.readReadyForQuery() + _ = cn.handleError(cn.readReadyForQuery()) return nil, nil, nil, err default: cn.err.set(driver.ErrBadConn) @@ -1809,7 +1809,7 @@ func (cn *conn) readPortalDescribeResponse() (rowsHeader, error) { return rowsHeader{}, nil case proto.ErrorResponse: err := parseError(r, "") - _ = cn.readReadyForQuery() + _ = cn.handleError(cn.readReadyForQuery()) return rowsHeader{}, err default: cn.err.set(driver.ErrBadConn) @@ -1827,7 +1827,7 @@ func (cn *conn) readBindResponse() error { return nil case proto.ErrorResponse: err := parseError(r, "") - _ = cn.readReadyForQuery() + _ = cn.handleError(cn.readReadyForQuery()) return err default: cn.err.set(driver.ErrBadConn) @@ -1853,7 +1853,7 @@ func (cn *conn) postExecuteWorkaround() error { switch t { case proto.ErrorResponse: err := parseError(r, "") - _ = cn.readReadyForQuery() + _ = cn.handleError(cn.readReadyForQuery()) return err case proto.CommandComplete, proto.DataRow, proto.EmptyQueryResponse: // the query didn't fail, but we can't process this message diff --git a/conn_test.go b/conn_test.go index 1a39ecfc..00c7570d 100644 --- a/conn_test.go +++ b/conn_test.go @@ -7,6 +7,7 @@ import ( "database/sql" "database/sql/driver" "encoding/json" + "errors" "fmt" "io" "maps" @@ -578,6 +579,71 @@ func TestUnexpectedEOF(t *testing.T) { pqtest.QueryRow[int](t, db, `select okay`) } +func TestPoolerErrorResponseWithoutReadyForQuery(t *testing.T) { + t.Parallel() + + // On any Parse the fake emits a non-fatal ErrorResponse and closes the + // connection without sending a trailing ReadyForQuery — the byte sequence + // pgbouncer emits via disconnect_server(false, ...) -> send_pooler_error + // (severity ERROR, SQLSTATE 08P01, no RFQ). + // + // Inside readParseResponse, the trailing readReadyForQuery() returns + // io.EOF; that EOF must be routed through handleError so cn.err is set + // to driver.ErrBadConn. Otherwise the broken conn re-enters the *sql.DB + // pool with inProgress stuck at true, and the next call on it short- + // circuits to errQueryInProgress instead of running the query. + var triggered atomic.Bool + f := pqtest.NewFake(t, func(f pqtest.Fake, cn net.Conn) { + f.Startup(cn, nil) + for { + code, _, ok := f.ReadMsg(cn) + if !ok { + return + } + switch code { + case proto.Terminate: + cn.Close() + return + case proto.Query: + // Ping(): empty simple query. + f.WriteMsg(cn, proto.EmptyQueryResponse, "") + f.WriteMsg(cn, proto.ReadyForQuery, "I") + case proto.Parse: + triggered.Store(true) + f.WriteMsg(cn, proto.ErrorResponse, + "SERROR\x00C08P01\x00Mserver conn crashed?\x00\x00") + cn.Close() + return + } + } + }) + defer f.Close() + + db := pqtest.MustDB(t, f.DSN()) + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + if _, err := db.Exec("select $1::int", 1); err == nil { + t.Fatal("first Exec: want non-nil error from pooler fault, got nil") + } + if !triggered.Load() { + t.Fatal("server-side fault was never triggered") + } + + // The next Exec must not short-circuit to errQueryInProgress. Without the + // fix the poisoned conn stays in the pool with inProgress=true and is + // reused here; with the fix it was evicted, *sql.DB opens a fresh conn + // and we get a proper *pq.Error from the new fault — not the "there is + // already a query being processed" guard. + _, err := db.Exec("select $1::int", 1) + if err == nil { + t.Fatal("second Exec: want non-nil error, got nil") + } + if errors.Is(err, errQueryInProgress) { + t.Fatalf("second Exec: poisoned conn was reused from pool: %v", err) + } +} + func TestConnClose(t *testing.T) { // Ensure the underlying connection can be closed with Close after an error. t.Run("CloseBadConn", func(t *testing.T) { From baa451622087b76e1959fa36dd895f2b185b30c2 Mon Sep 17 00:00:00 2001 From: Martin Tournoij Date: Tue, 12 May 2026 12:34:44 +0100 Subject: [PATCH 2/3] Small stylistic fixes for test case --- conn_test.go | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/conn_test.go b/conn_test.go index 00c7570d..5223a387 100644 --- a/conn_test.go +++ b/conn_test.go @@ -579,19 +579,13 @@ func TestUnexpectedEOF(t *testing.T) { pqtest.QueryRow[int](t, db, `select okay`) } -func TestPoolerErrorResponseWithoutReadyForQuery(t *testing.T) { +// #1320 +func TestCloseWithoutReadyForQuery(t *testing.T) { t.Parallel() - // On any Parse the fake emits a non-fatal ErrorResponse and closes the - // connection without sending a trailing ReadyForQuery — the byte sequence - // pgbouncer emits via disconnect_server(false, ...) -> send_pooler_error - // (severity ERROR, SQLSTATE 08P01, no RFQ). - // - // Inside readParseResponse, the trailing readReadyForQuery() returns - // io.EOF; that EOF must be routed through handleError so cn.err is set - // to driver.ErrBadConn. Otherwise the broken conn re-enters the *sql.DB - // pool with inProgress stuck at true, and the next call on it short- - // circuits to errQueryInProgress instead of running the query. + // Emit non-fatal ErrorResponse and close the connection on any Parse. + // Previously this would cause conn.inProgress to remain "stuck" on true + // because cn.err was never set and no ReadyForQuery was received. var triggered atomic.Bool f := pqtest.NewFake(t, func(f pqtest.Fake, cn net.Conn) { f.Startup(cn, nil) @@ -604,38 +598,35 @@ func TestPoolerErrorResponseWithoutReadyForQuery(t *testing.T) { case proto.Terminate: cn.Close() return - case proto.Query: - // Ping(): empty simple query. + case proto.Query: // Ping(): empty simple query. f.WriteMsg(cn, proto.EmptyQueryResponse, "") f.WriteMsg(cn, proto.ReadyForQuery, "I") case proto.Parse: triggered.Store(true) - f.WriteMsg(cn, proto.ErrorResponse, - "SERROR\x00C08P01\x00Mserver conn crashed?\x00\x00") + f.WriteMsg(cn, proto.ErrorResponse, "SERROR\x00C08P01\x00Mserver conn crashed?\x00\x00") cn.Close() return } } }) defer f.Close() - db := pqtest.MustDB(t, f.DSN()) db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) - if _, err := db.Exec("select $1::int", 1); err == nil { + _, err := db.Exec("select $1::int", 1) + if err == nil { t.Fatal("first Exec: want non-nil error from pooler fault, got nil") } if !triggered.Load() { t.Fatal("server-side fault was never triggered") } - // The next Exec must not short-circuit to errQueryInProgress. Without the - // fix the poisoned conn stays in the pool with inProgress=true and is - // reused here; with the fix it was evicted, *sql.DB opens a fresh conn - // and we get a proper *pq.Error from the new fault — not the "there is - // already a query being processed" guard. - _, err := db.Exec("select $1::int", 1) + // Must not short-circuit to errQueryInProgress and have the poisoned conn + // stays in the pool with inProgress=true. database/sql should open a new + // connection we get a proper *pq.Error from the new fault, not the "there + // is already a query being processed" guard. + _, err = db.Exec("select $1::int", 1) if err == nil { t.Fatal("second Exec: want non-nil error, got nil") } From e553c4ae182558ffded9547f6225dc5e491765f7 Mon Sep 17 00:00:00 2001 From: Martin Tournoij Date: Tue, 12 May 2026 12:47:03 +0100 Subject: [PATCH 3/3] Better test name --- conn_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conn_test.go b/conn_test.go index 5223a387..17b0857b 100644 --- a/conn_test.go +++ b/conn_test.go @@ -580,7 +580,7 @@ func TestUnexpectedEOF(t *testing.T) { } // #1320 -func TestCloseWithoutReadyForQuery(t *testing.T) { +func TestUnexpectedClose(t *testing.T) { t.Parallel() // Emit non-fatal ErrorResponse and close the connection on any Parse.