Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ All notable changes to this project will be documented in this file. From versio
### Fixed

- Fix unnecessary connection pool flushes during schema cache reloading by @mkleczek in #4645
- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622
- Limit concurrent schema cache loads by @mkleczek in #4643

## [14.9] - 2026-04-10

Expand Down
1 change: 1 addition & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ library
, stm-hamt >= 1.2 && < 2
, focus >= 1.0 && < 2
, some >= 1.0.4.1 && < 2
, uuid >= 1.3 && < 2
-- -fno-spec-constr may help keep compile time memory use in check,
-- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304
-- -optP-Wno-nonportable-include-path
Expand Down
74 changes: 67 additions & 7 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TypeApplications #-}

module PostgREST.AppState
( AppState
Expand Down Expand Up @@ -33,7 +35,8 @@ import qualified Data.ByteString.Char8 as BS
import Data.Either.Combinators (whenLeft)
import qualified Hasql.Pool as SQL
import qualified Hasql.Pool.Config as SQL
import qualified Hasql.Session as SQL
import qualified Hasql.Session as SQL hiding (statement)
import qualified Hasql.Transaction as SQL hiding (sql)
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified PostgREST.Auth.JwtCache as JwtCache
Expand Down Expand Up @@ -63,11 +66,17 @@ import PostgREST.Config.Database (queryDbSettings,
import PostgREST.Config.PgVersion (PgVersion (..),
minimumPgVersion)
import PostgREST.Debounce (makeDebouncer)
import PostgREST.Metrics (MetricsState (connTrack))
import PostgREST.SchemaCache (SchemaCache (..),
querySchemaCache,
showSummary)
import PostgREST.SchemaCache.Identifiers (quoteQi)

import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import qualified Hasql.Statement as SQL
import NeatInterpolation (trimming)

import Protolude

data AppState = AppState
Expand Down Expand Up @@ -303,7 +312,7 @@ getObserver = stateObserver
-- + Because connections cache the pg catalog(see #2620)
-- + For rapid recovery. Otherwise, the pool idle or lifetime timeout would have to be reached for new healthy connections to be acquired.
retryingSchemaCacheLoad :: AppState -> IO ()
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} =
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId, stateMetrics} =
void $ retrying retryPolicy shouldRetry (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do
when (rsIterNumber > 0) $ do
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
Expand Down Expand Up @@ -342,8 +351,22 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
qSchemaCache :: IO (Maybe SchemaCache)
qSchemaCache = do
conf@AppConfig{..} <- getConfig appState
-- Throttle concurrent schema cache loads, guarded by advisory locks.
-- This is to prevent thundering herd problem on startup or when many PostgREST
-- instances receive "reload schema" notifications at the same time
-- See get_lock_sql for details of the algorithm.
-- Here we calculate the number of open connections passed to the query.
Metrics.ConnStats connected inUse <- Metrics.connectionCounts $ connTrack stateMetrics
-- Determine whether schema cache loading will create a new session
let
-- if all connections in use but pool not full - schema cache loading will create session
scLoadingSessions = if connected <= inUse && inUse < configDbPoolSize then 1 else 0
withTxLock = SQL.statement
(fromIntegral $ connected + scLoadingSessions)
(SQL.Statement get_lock_sql get_lock_params HD.noResult configDbPreparedStatements)

(resultTime, result) <-
timeItT $ usePool appState (SQL.transactionNoRetry SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
timeItT $ usePool appState (SQL.transactionNoRetry SQL.ReadCommitted SQL.Read $ withTxLock *> querySchemaCache conf)
case result of
Left e -> do
markSchemaCachePending appState
Expand All @@ -365,6 +388,43 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
observer $ SchemaCacheLoadedObs loadTime summary
markSchemaCacheLoaded appState
return $ Just sCache
where
-- Recursive query that tries acquiring locks in order
-- and waits for randomly selected lock if no attempt succeeded.
-- It has a single parameter: this node open connection count.
-- It is used to estimate the number of nodes
-- by counting the number of active sessions for current session_user
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this assumes that all other instances are running with the same user as the current session?

That seems overly specific to a certain use-case for something to put into PostgREST.

Copy link
Copy Markdown
Collaborator Author

@mkleczek mkleczek Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this assumes that all other instances are running with the same user as the current session?

That seems overly specific to a certain use-case for something to put into PostgREST.

Is it really overly specific?

I cannot imagine hundreds (or even tens) of PostgREST instances connected to the same database, each configured differently using different user and listening for the same schema changes... What would be the goal of doing that?
The thundering herd issue in reality happens only in clustered environments where identical PostgREST instances are started to horizontally scale the cluster.

This patch prevents thundering herd issue in such cases.

Do you have any other cases in mind?

Also - do you have any ideas how to estimate the size of the PostgREST cluster without this assumption?

@steve-chavez WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can easily imagine a multi-tenant use-case, where each tenant gets their own authenticator role and their own PostgREST instance - but all connecting to the same database.

Copy link
Copy Markdown
Collaborator Author

@mkleczek mkleczek Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can easily imagine a multi-tenant use-case, where each tenant gets their own authenticator role and their own PostgREST instance - but all connecting to the same database.

That would be a schema based multitentancy with shared role structure but for some reason stateless PostgREST instances have to be separate but listening on the same channel. In other words in such setup one tenant running NOTIFY pgrst would force other tenants to reload the schema cache. Why would someone do that???
What's more - how many such PostgREST instances do you imagine in such a setup? So many that it would cause thundering herd problem?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would that be schema based multitenancy? I don't see it that way. They'd all be using the same schema, but need different JWT settings.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we currently reloading the schema on a different connection than where we listen to notifications?

That seems like a bug in itself to me: If we listen on the primary, but load the schema from the replica, there is no guarantee, that the schema changes we actually want to load are already available on the replica. That means we might actually end up with a stale schema cache, even when the replica catches up afterwards.

I had assumed that not to be the case, was my assumption wrong?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we currently reloading the schema on a different connection than where we listen to notifications?

Yes, it's exactly as you mention, LISTEN on the primary and reload on the replica. I had the same concern on #4642 (comment).

Copy link
Copy Markdown
Collaborator Author

@mkleczek mkleczek Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's exactly as you mention, LISTEN on the primary and reload on the replica. I had the same concern on #4642 (comment).

Loading from master is no better but in the opposite direction: an instance can have newer schema cache than the actual schema on the replica.

There are no good solutions with current LISTEN/NOTIFY based architecture but it is unrelated to this PR - see also my comment here: #4642 (comment)

I've created #4842 to discuss it further.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #4842 to discuss it further.

Since the conclusion in that issue is that "loading the schema cache from the primary is likely the best idea", I suggest we assume for now, that this happens / will be the case.

it causes huge overestimation in case of single master and multiple replicas.

Since we need to start with something and need to make a compromise somewhere, I suggest we ignore this scenario, where we overestimate the schema reloads per replica. We can consider it part of the "schema cache reloads should really happen via the primary instead" issue.

So let's try to find a way to reliably count listener threads against the primary. As pointed out, I believe basing it off of the current role is unreliable. Steve pointed out that basing it off of the application_name is potentially unreliable as well.

Any other ideas?

Copy link
Copy Markdown
Collaborator Author

@mkleczek mkleczek Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #4842 to discuss it further.

Since the conclusion in that issue is that "loading the schema cache from the primary is likely the best idea", I suggest we assume for now, that this happens / will be the case.

Fixing #4642 (possibly with this PR) must be a prerequisite to the above (otherwise thundering herd vulnerability is amplified as all PostgREST instances are going to hit master).

it causes huge overestimation in case of single master and multiple replicas.

Since we need to start with something and need to make a compromise somewhere, I suggest we ignore this scenario, where we overestimate the schema reloads per replica. We can consider it part of the "schema cache reloads should really happen via the primary instead" issue.

I think we should rather aim for the rule throttling must be done in the same database transaction as schema cache loading (regardless of which connection is used for querying) - thundering herd issue is really per db server instance
This would mean merging this PR (as it implements this strategy) and in subsequent PR changing the connection used for schema cache querying.

These issues are really orthogonal: one is about "what should be the source of schema cache data" and another is about "making sure the source of schema cache data, whatever it is, is not overloaded by concurrent requests".

So let's try to find a way to reliably count listener threads against the primary. As pointed out, I believe basing it off of the current role is unreliable. Steve pointed out that basing it off of the application_name is potentially unreliable as well.

Any other ideas?

There is also another dimension here: it is safer to underestimate than to overestimate the number of PostgREST instances.
Basing the calculation off the current_role can only underestimate instances (ie. there can be other instances connected using different user but they won't be counted towards the limit).
Since all instances, regardless of session_user use the same db locks for throttling, it means that it fixes the thundering herd problem in all scenarios (it is just that the limit might be lower than optimal in the scenario @wolfgangwalther described).

So IMHO it is safe to merge this PR as is and we can change the strategy to be more liberal (ie. not underestimating) in the future.

@steve-chavez @wolfgangwalther WDYT?

-- and dividing it by this node open connections.
-- Assuming load is uniform among cluster nodes, all should have
-- statistically the same number of open connections.
-- Once the number of nodes is known we calculate the number
-- of locks as ceil(log(2, number_of_nodes))
get_lock_sql = encodeUtf8 [trimming|
WITH RECURSIVE attempts AS (
SELECT 1 AS lock_number, pg_try_advisory_xact_lock(lock_id, 1) AS success FROM parameters
UNION ALL
SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock(lock_id, next_lock_number) AS success
FROM
parameters CROSS JOIN LATERAL (
SELECT lock_number + 1 AS next_lock_number FROM attempts
WHERE NOT success AND lock_number < locks_count
ORDER BY lock_number DESC
LIMIT 1
) AS previous_attempt
),
counts AS (
SELECT round(log(2, round(count(*)::double precision/$$1)::numeric))::int AS locks_count
FROM
pg_stat_activity WHERE usename = SESSION_USER
),
parameters AS (
SELECT locks_count, 50168275 AS lock_id FROM counts WHERE locks_count > 0
)
SELECT pg_advisory_xact_lock(lock_id, floor(random() * locks_count)::int + 1)
FROM
parameters WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]

get_lock_params = HE.param (HE.nonNullable HE.int4)

shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
shouldRetry _ (pgVer, sCache) = do
Expand Down
65 changes: 53 additions & 12 deletions src/PostgREST/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
-}
module PostgREST.Metrics
( init
, ConnTrack
, ConnStats (..)
, MetricsState (..)
, connectionCounts
, observationMetrics
, metricsToText
) where
Expand All @@ -19,13 +22,18 @@

import PostgREST.Observation


import Protolude
import Control.Arrow ((&&&))
import Data.Bitraversable (bisequenceA)
import Data.Tuple.Extra (both)
import Data.UUID (UUID)
import qualified Focus
import Protolude
import qualified StmHamt.SizedHamt as SH

data MetricsState =
MetricsState {
poolTimeouts :: Counter,
poolAvailable :: Gauge,
connTrack :: ConnTrack,
poolWaiting :: Gauge,
poolMaxSize :: Gauge,
schemaCacheLoads :: Vector Label1 Counter,
Expand All @@ -40,7 +48,7 @@
whenM getRTSStatsEnabled $ void $ register PMG.ghcMetrics
metricState <- MetricsState <$>
register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*>
register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*>
register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*>
register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*>
register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*>
register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*>
Expand All @@ -50,20 +58,28 @@
register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions"))
setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize)
pure metricState
where
dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts
where
calcAvailable = liftA2 (-) connected inUse
toSample name labels = Sample name labels . encodeUtf8 . show
noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty

-- Only some observations are used as metrics
observationMetrics :: MetricsState -> ObservationHandler
observationMetrics MetricsState{..} obs = case obs of
PoolAcqTimeoutObs -> do
incCounter poolTimeouts
(HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of
SQL.ReadyForUseConnectionStatus _ -> do
incGauge poolAvailable
SQL.InUseConnectionStatus -> do
decGauge poolAvailable
SQL.TerminatedConnectionStatus _ -> do
decGauge poolAvailable
SQL.ConnectingConnectionStatus -> pure ()
-- Handle pool observations with connection tracking
-- this is necessary because it is not possible
-- to accurately maintain open/in use conneciton counts
-- statelessly based only on pool observation events.
-- The reason is that hasql-pool emits TerminatedConnectionStatus
-- both for connections successfully established and failed when connecting.
-- When receiving TerminatedConnectionStatus we have to find out
-- if we can decrement established connection count. To do that we have to track
-- established connections.
(HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs
PoolRequest ->
incGauge poolWaiting
PoolRequestFullfilled ->
Expand All @@ -81,3 +97,28 @@

metricsToText :: IO LBS.ByteString
metricsToText = exportMetricsAsText

data ConnStats = ConnStats {
connected :: Int,
inUse :: Int
} deriving (Eq, Show)

Check warning on line 104 in src/PostgREST/Metrics.hs

View check run for this annotation

Codecov / codecov/patch

src/PostgREST/Metrics.hs#L104

Added line #L104 was not covered by tests

data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID }

connectionTracker :: IO ConnTrack
connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO

trackConnections :: ConnTrack -> SQL.Observation -> IO ()
trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of
SQL.ReadyForUseConnectionStatus _ -> atomically $
SH.insert identity uuid connTrackConnected *>
SH.focus Focus.delete identity uuid connTrackInUse
SQL.TerminatedConnectionStatus _ -> atomically $
SH.focus Focus.delete identity uuid connTrackConnected *>
SH.focus Focus.delete identity uuid connTrackInUse
SQL.InUseConnectionStatus -> atomically $
SH.insert identity uuid connTrackInUse
_ -> mempty

connectionCounts :: ConnTrack -> IO ConnStats
connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse)
91 changes: 90 additions & 1 deletion test/io/test_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"Unit tests for Input/Ouput of PostgREST seen as a black box."

import contextlib
import os
import re
import signal
Expand All @@ -19,6 +20,7 @@
sleep_until_postgrest_full_reload,
sleep_until_postgrest_scache_reload,
wait_until_exit,
wait_until_status_code,
)


Expand Down Expand Up @@ -1252,6 +1254,93 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
assert response.status_code == 200


@pytest.mark.parametrize(
"instance_count, expected_concurrency", [(2, 2), (4, 3), (6, 4), (8, 4), (16, 5)]
)
def test_schema_cache_reload_throttled_with_advisory_locks(
instance_count, expected_concurrency, slow_schema_cache_env
):
"schema cache reloads should be throttled across instances"

internal_sleep_ms = int(
slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]
)
lock_wait_threshold_ms = internal_sleep_ms * 2
query_log_pattern = re.compile(r"Schema cache queried in ([\d.]+) milliseconds")

def read_available_output_lines(postgrest):
try:
output = postgrest.process.stdout.read()
except BlockingIOError:
return []

if not output:
return []
return output.decode().splitlines()

with contextlib.ExitStack() as stack:
instances = [
stack.enter_context(
run(
env=slow_schema_cache_env,
wait_for_readiness=False,
wait_max_seconds=10,
)
)
for _ in range(instance_count)
]

for postgrest in instances:
wait_until_status_code(
postgrest.admin.baseurl + "/ready", max_seconds=10, status_code=200
)

# Drop startup logs so only reload logs are parsed.
for postgrest in instances:
read_available_output_lines(postgrest)

response = instances[0].session.get("/rpc/notify_pgrst")
assert response.status_code == 204

# Wait long enough for the lock-throttled cache reloads to finish.
time.sleep((internal_sleep_ms / 1000) * 2)

reload_durations_ms = []
for postgrest in instances:
output_lines = []
for _ in range(instance_count * 2):
output_lines.extend(read_available_output_lines(postgrest))
if any(query_log_pattern.search(line) for line in output_lines):
break
time.sleep(0.2)

durations = []
for line in output_lines:
match = query_log_pattern.search(line)
if match:
durations.append(float(match.group(1)))

assert durations
reload_durations_ms.append(max(durations))

assert len(reload_durations_ms) == instance_count

# expected_concurrency instances should have
# reload_durations_ms <= lock_wait_threshold_ms
# the rest should wait
assert (
instance_count
- len(
[
duration
for duration in reload_durations_ms
if duration > lock_wait_threshold_ms
]
)
== expected_concurrency
)


def test_schema_cache_query_sleep_logs(defaultenv):
"""Schema cache sleep should be reflected in the logged query duration."""

Expand Down Expand Up @@ -1945,7 +2034,7 @@ def test_requests_with_resource_embedding_wait_for_schema_cache_reload(defaulten
env = {
**defaultenv,
"PGRST_DB_POOL": "2",
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5100",
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5200",
}

with run(env=env, wait_max_seconds=30) as postgrest:
Expand Down
Loading
Loading