From 68745cb002a6439ec54fc13c57eb25b6ce387201 Mon Sep 17 00:00:00 2001 From: shum Date: Sat, 27 Jun 2026 14:03:55 +0000 Subject: [PATCH 1/4] docs: plan for service subscriptions memory leak --- ...06-27-service-subscriptions-memory-leak.md | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 plans/2026-06-27-service-subscriptions-memory-leak.md diff --git a/plans/2026-06-27-service-subscriptions-memory-leak.md b/plans/2026-06-27-service-subscriptions-memory-leak.md new file mode 100644 index 000000000..ff86c6644 --- /dev/null +++ b/plans/2026-06-27-service-subscriptions-memory-leak.md @@ -0,0 +1,57 @@ +## Root cause: orphaned `Sub` entries in the service client's `subscriptions` map + +**The leak is service-specific and was introduced by PR #1667 "messaging services" (`f0b7a4be`).** A long-lived messaging-service connection accumulates per-queue `Sub` records in its `Client.subscriptions` map that are **never removed** when the associated queues are deleted or unassociated — only the counter is decremented. Over normal queue churn the map grows monotonically for the entire lifetime of the service connection. + +### The proof — an asymmetry between two handlers in `serverThread` + +Both individual queue subscriptions and service subscriptions store a `Sub` per queue in `Client.subscriptions` (= `clientSubs` for the SMP subscriber thread, wired at `Server.hs:189`). When a queue ends/is deleted, the two paths diverge: + +**Individual subscriber — entry IS removed** (`Server.hs:332`, `346`): +```haskell +CSAEndSub qId -> atomically (endSub c qId) >>= a unsub_ -- :332 + ... +endSub c qId = TM.lookupDelete qId (clientSubs c) >>= (removeWhenNoSubs c $>) -- :346 +``` + +**Service subscriber — entry is NOT removed** (`Server.hs:336-340`): +```haskell +CSAEndServiceSub qId -> atomically $ do + modifyTVar' (clientServiceSubs c) decrease -- decrements serviceSubsCount + modifyTVar' totalServiceSubs decrease -- decrements global count + where decrease = subtractServiceSubs (1, queueIdHash qId) + -- never touches (clientSubs c) — the Sub for qId stays forever +``` + +### Where the orphaned entries are added (both new in this PR) +- `Server.hs:1860-1862` — on service subscribe (`SSUB`), one `Sub` inserted per queue that has a pending message. +- `Server.hs:2039-2043` (`newServiceDeliverySub`) — on **every** `SEND` to a service-associated queue with no existing sub, a `Sub` is inserted into the service client's `subscriptions`. After delivery the thread state resets to `NoSub` (`:2069`) but the map entry remains as a "already delivering" marker (`:1856-1859`). + +### Why they leak +The only places the service client's `subscriptions` map is cleared are: +- `clientDisconnected` — `swapTVar subscriptions M.empty` (`Server.hs:1097`) — only on disconnect. +- `CSADecreaseSubs` — `swapTVar (clientSubs c) M.empty` (`Server.hs:343`) — only on full service takeover by another connection. +- `delQueueAndMsgs` — `TM.lookupDelete entId $ subscriptions clnt` (`Server.hs:2164`) — but `clnt` here is **the recipient deleting its own queue, not the service client**. The service's entry for that queue is reached only via the `CSDeleted → endServiceSub → CSAEndServiceSub` path (`Server.hs:306, 313, 336`), which decrements the counter but leaves the map entry. + +**Concrete scenario (fully traced):** Service `S` subscribes (`SSUB`) and stays connected for days. Recipient `R` owns service-associated queue `Q`. A `SEND` to `Q` inserts a `Sub` into `S.subscriptions[Q]` (`:2042`). `R` later deletes `Q` → `delQueueAndMsgs` runs on `R`'s connection, removes `Q` from `R.subscriptions`, decrements counters, enqueues `CSDeleted Q (Just S)` (`:2167`) → `serverThread` runs `CSAEndServiceSub Q` for `S` (`:336`), decrementing `S.serviceSubsCount` but **leaving `S.subscriptions[Q]` in place**. Net: one orphaned `Sub` (record + 2 TVars) per service-associated queue ever deleted/unassociated, never reclaimed until `S` disconnects. The logical counter `serviceSubsCount` correctly drops, so the map size diverges from the counter — making the leak invisible to the existing service-sub metric. + +### Verdict +This is a deterministic, static-provable memory leak — no production logging needed to confirm the existence; the asymmetry between `CSAEndSub` (removes) and `CSAEndServiceSub` (doesn't) is the smoking gun. It is specific to messaging-service certificate clients, which is exactly the population added by the services/certificate PR. + +### Secondary findings (lower impact, same PR area, not the primary cause) +- **`forkClient` register-after-fork race** (`Server.hs:1356-1359`): if the forked action's `finally` delete (`:1358`) runs before the parent's `IM.insert` (`:1359`), a `Weak ThreadId` of a dead thread is left in `endThreads` until disconnect. Pre-existing, tiny per-entry, but exercised far more by the PR's higher END/DELD volume. +- **Wrong-client counter decrement** (`Server.hs:2166`): `delQueueAndMsgs` decrements `serviceSubsCount` of the *deleting* client, not the service; harmless for non-service deleters (floored at 0) but corrupts accounting if a service deletes its own queue. + +--- + +### Recommended fix (mirror `endSub` in the service path) +Make `CSAEndServiceSub` also delete the per-queue `Sub` and cancel its delivery thread, exactly as `CSAEndSub`/`endSub` do for individual subscribers. Roughly: + +```haskell +CSAEndServiceSub qId -> do + s_ <- atomically $ do + modifyTVar' (clientServiceSubs c) decrease + modifyTVar' totalServiceSubs decrease + TM.lookupDelete qId (clientSubs c) <* removeWhenNoSubs c + forM_ unsub_ $ \unsub -> mapM_ unsub s_ + where decrease = subtractServiceSubs (1, queueIdHash qId) +``` From 9de19e853183d9d437f360eba945cda102e5818f Mon Sep 17 00:00:00 2001 From: shum Date: Mon, 29 Jun 2026 12:55:12 +0000 Subject: [PATCH 2/4] fix: remove leaked service delivery subscriptions The CSAEndServiceSub handler decremented subscription counters but did not remove the per-queue delivery Sub from the service client's subscriptions map. Over queue churn a long-lived service connection accumulated orphaned Sub entries until disconnect, leaking memory. Mirror CSAEndSub via endServiceQueueSub (reusing endSub) so the entry is removed and its delivery thread cancelled. Add a regression test with white-box access to the server Env via runSMPServerBlocking_; verified failing without the fix. --- src/Simplex/Messaging/Server.hs | 33 ++++++++++++++------- tests/SMPClient.hs | 34 +++++++++++++++++----- tests/ServerTests.hs | 51 +++++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 17 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 1b7d920ac..9658517dd 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -32,6 +32,7 @@ module Simplex.Messaging.Server ( runSMPServer, runSMPServerBlocking, + runSMPServerBlocking_, controlPortAuth, importMessages, exportMessages, @@ -159,7 +160,15 @@ runSMPServer cfg attachHTTP_ = do -- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True) -- and when it is disconnected from the TCP socket once the server thread is killed (False). runSMPServerBlocking :: MsgStoreClass s => TMVar Bool -> ServerConfig s -> Maybe AttachHTTP -> IO () -runSMPServerBlocking started cfg attachHTTP_ = newEnv cfg >>= runReaderT (smpServer started cfg attachHTTP_) +runSMPServerBlocking = runSMPServerBlocking_ $ \_ -> pure () + +-- | Like 'runSMPServerBlocking', but passes the created 'Env' to the given action before the server starts, +-- allowing tests to introspect the running server's internal state. +runSMPServerBlocking_ :: MsgStoreClass s => (Env s -> IO ()) -> TMVar Bool -> ServerConfig s -> Maybe AttachHTTP -> IO () +runSMPServerBlocking_ withEnv started cfg attachHTTP_ = do + env <- newEnv cfg + withEnv env + runReaderT (smpServer started cfg attachHTTP_) env type M s a = ReaderT (Env s) IO a type AttachHTTP = Socket -> TLS.Context -> IO () @@ -329,21 +338,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt endPreviousSubscriptions = mapM_ $ \(c, subAction, evt) -> do atomically $ modifyTVar' pendingEvents $ IM.alter (Just . maybe [evt] (evt <|)) (clientId c) case subAction of - CSAEndSub qId -> atomically (endSub c qId) >>= a unsub_ - where - a (Just unsub) (Just s) = unsub s - a _ _ = pure () - CSAEndServiceSub qId -> atomically $ do - modifyTVar' (clientServiceSubs c) decrease - modifyTVar' totalServiceSubs decrease - where - decrease = subtractServiceSubs (1, queueIdHash qId) + CSAEndSub qId -> atomically (endSub c qId) >>= unsubPrev + -- like endSub, also removes the delivery subscription from the service client's subscriptions map, + -- otherwise the map retains entries for queues unassociated/deleted while the service stays connected. + CSAEndServiceSub qId -> atomically (endServiceQueueSub c qId) >>= unsubPrev CSADecreaseSubs changedSubs -> do atomically $ modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs forM_ unsub_ $ \unsub -> atomically (swapTVar (clientSubs c) M.empty) >>= mapM_ unsub where + unsubPrev :: Maybe sub -> IO () + unsubPrev s_ = forM_ unsub_ $ \unsub -> mapM_ unsub s_ endSub :: Client s -> QueueId -> STM (Maybe sub) endSub c qId = TM.lookupDelete qId (clientSubs c) >>= (removeWhenNoSubs c $>) + endServiceQueueSub :: Client s -> QueueId -> STM (Maybe sub) + endServiceQueueSub c qId = do + modifyTVar' (clientServiceSubs c) decrease + modifyTVar' totalServiceSubs decrease + endSub c qId + where + decrease = subtractServiceSubs (1, queueIdHash qId) -- remove client from server's subscribed cients removeWhenNoSubs c = do noClientSubs <- null <$> readTVar (clientSubs c) diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index d043fd3c8..10b12e091 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -19,6 +19,7 @@ import Control.Monad import Control.Monad.Except (runExceptT) import Data.ByteString.Char8 (ByteString) import Data.List.NonEmpty (NonEmpty) +import qualified Data.Map.Strict as M import qualified Data.X509 as X import qualified Data.X509.Validation as XV import Network.Socket @@ -30,7 +31,7 @@ import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClie import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Protocol -import Simplex.Messaging.Server (runSMPServerBlocking) +import Simplex.Messaging.Server (runSMPServerBlocking_) import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..)) import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) @@ -38,14 +39,14 @@ import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client import Simplex.Messaging.Transport.Server import Simplex.Messaging.Transport.Shared (ChainCertificates (..), chainIdCaCerts) -import Simplex.Messaging.Util (ifM) +import Simplex.Messaging.Util (ifM, ($>>=)) import Simplex.Messaging.Version import Simplex.Messaging.Version.Internal import System.Info (os) import Test.Hspec hiding (fit, it) import UnliftIO.Concurrent import qualified UnliftIO.Exception as E -import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar) +import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, readTVarIO, takeTMVar) import UnliftIO.Timeout (timeout) import Util @@ -357,14 +358,33 @@ withServerCfg :: AServerConfig -> (forall s. ServerConfig s -> a) -> a withServerCfg (ASrvCfg _ _ cfg') f = f cfg' withSmpServerConfigOn :: HasCallStack => ASrvTransport -> AServerConfig -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a -withSmpServerConfigOn t (ASrvCfg _ _ cfg') port' = - serverBracket - (\started -> runSMPServerBlocking started cfg' {transports = [(port', t, False)]} Nothing) - (threadDelay 10000) +withSmpServerConfigOn t srvCfg port' f = withSmpServerConfigEnvOn t srvCfg port' (const f) withSmpServerThreadOn :: HasCallStack => (ASrvTransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a withSmpServerThreadOn (t, msType) = withSmpServerConfigOn t (cfgMS msType) +-- the message store type is existential in AServerConfig, so the running Env is wrapped to hide it +data SomeEnv = forall s. SomeEnv (Env s) + +-- runs the server like withSmpServerConfigOn, also passing its Env for white-box assertions on internal state +withSmpServerConfigEnvOn :: HasCallStack => ASrvTransport -> AServerConfig -> ServiceName -> (HasCallStack => SomeEnv -> ThreadId -> IO a) -> IO a +withSmpServerConfigEnvOn t (ASrvCfg _ _ cfg') port' f = do + envVar <- newEmptyTMVarIO + serverBracket + (\started -> runSMPServerBlocking_ (atomically . putTMVar envVar . SomeEnv) started cfg' {transports = [(port', t, False)]} Nothing) + (threadDelay 10000) + (\tId -> atomically (takeTMVar envVar) >>= (`f` tId)) + +-- size of the service client's delivery subscriptions map, Nothing if no client is subscribed as the service +serviceSubsMapSize :: SomeEnv -> ServiceId -> IO (Maybe Int) +serviceSubsMapSize (SomeEnv env) serviceId = + getSubscribedClient serviceId serviceSubscribers + $>>= readTVarIO + $>>= \Client {subscriptions} -> Just . M.size <$> readTVarIO subscriptions + where + Server {subscribers} = server env + ServerSubscribers {serviceSubscribers} = subscribers + serverBracket :: HasCallStack => (TMVar Bool -> IO ()) -> IO () -> (HasCallStack => ThreadId -> IO a) -> IO a serverBracket process afterProcess f = do started <- newEmptyTMVarIO diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index ecd923619..e2bf341ba 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -93,6 +93,7 @@ serverTests = do testServiceDeliverSubscribe testServiceUpgradeAndDowngrade testServiceSubsTotalCount + testServiceSubsRemovedOnQueueDelete describe "Store log" testWithStoreLog describe "Restore messages" testRestoreMessages describe "Restore messages (old / v2)" testRestoreExpireMessages @@ -937,6 +938,56 @@ testServiceSubsTotalCount = threadDelay 1500000 readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 3 +-- regression test for a memory leak: a service delivery subscription stayed in the service client's +-- subscriptions map after its queue was unassociated/deleted by another client, growing without bound +-- for the lifetime of a long-lived service connection. +testServiceSubsRemovedOnQueueDelete :: SpecWith (ASrvTransport, AStoreType) +testServiceSubsRemovedOnQueueDelete = + it "should remove service delivery subscription when its queue is deleted by another client" $ \(at@(ATransport t), msType) -> do + g <- C.newRandom + creds <- genCredentials g Nothing (0, 2400) "localhost" + let (_fp, tlsCred) = tlsCredentials [creds] + serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g + let aServicePK = C.APrivateAuthKey C.SEd25519 servicePK + withSmpServerConfigEnvOn at (cfgMS msType) testPort $ \senv _ -> do + (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + runSMPClient t $ \h -> do + (rId, sId, serviceId) <- runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + Resp "1" NoEntity (Ids_ rId sId _srvDh serviceId) <- serviceSignSendRecv sh rKey servicePK ("1", NoEntity, New rPub dhPub) + Resp "2" _ OK <- signSendRecv h sKey ("2", sId, SKEY sPub) + pure (rId, sId, serviceId) + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + signSend_ sh aServicePK Nothing ("3", serviceId, SUBS 1 (queueIdsHash [rId])) + -- drain SOKS and ALLS (they arrive on separate threads); SOKS content is covered by testServiceSubsTotalCount + void $ + receiveInAnyOrder + sh + [ \case + Resp "3" _ (SOKS _ _) -> pure $ Just () + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just () + _ -> pure Nothing + ] + -- a SEND to the service-associated queue creates a delivery subscription in the service client's map + Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello") + Resp "" _ (Msg _ _) <- tGet1 sh + serviceSubsMapSize senv serviceId `shouldReturn` Just 1 + -- a different client deletes the queue while the service stays connected + runSMPClient t $ \dh -> do + Resp "5" _ OK <- signSendRecv dh rKey ("5", rId, DEL) + pure () + -- the queue deletion is processed asynchronously; the delivery subscription must be removed + let go n = + serviceSubsMapSize senv serviceId >>= \case + Just 0 -> pure () + s + | n > 0 -> threadDelay 100000 >> go (n - 1) + | otherwise -> s `shouldBe` Just 0 + go (20 :: Int) + readServiceSubsMetric :: String -> Maybe Int readServiceSubsMetric content = case filter ("simplex_smp_subscribtion_service_subs_total " `isPrefixOf`) (lines content) of From 8879d8b8121a36fa524636bfbf3a877e6e49e285 Mon Sep 17 00:00:00 2001 From: shum Date: Mon, 29 Jun 2026 15:15:42 +0000 Subject: [PATCH 3/4] test: remove service subs leak regression test Remove testServiceSubsRemovedOnQueueDelete and the test-only Env exposure (runSMPServerBlocking_, withSmpServerConfigEnvOn, serviceSubsMapSize), leaving only the server fix. --- src/Simplex/Messaging/Server.hs | 11 +------ tests/SMPClient.hs | 34 +++++----------------- tests/ServerTests.hs | 51 --------------------------------- 3 files changed, 8 insertions(+), 88 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 9658517dd..005b4301b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -32,7 +32,6 @@ module Simplex.Messaging.Server ( runSMPServer, runSMPServerBlocking, - runSMPServerBlocking_, controlPortAuth, importMessages, exportMessages, @@ -160,15 +159,7 @@ runSMPServer cfg attachHTTP_ = do -- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True) -- and when it is disconnected from the TCP socket once the server thread is killed (False). runSMPServerBlocking :: MsgStoreClass s => TMVar Bool -> ServerConfig s -> Maybe AttachHTTP -> IO () -runSMPServerBlocking = runSMPServerBlocking_ $ \_ -> pure () - --- | Like 'runSMPServerBlocking', but passes the created 'Env' to the given action before the server starts, --- allowing tests to introspect the running server's internal state. -runSMPServerBlocking_ :: MsgStoreClass s => (Env s -> IO ()) -> TMVar Bool -> ServerConfig s -> Maybe AttachHTTP -> IO () -runSMPServerBlocking_ withEnv started cfg attachHTTP_ = do - env <- newEnv cfg - withEnv env - runReaderT (smpServer started cfg attachHTTP_) env +runSMPServerBlocking started cfg attachHTTP_ = newEnv cfg >>= runReaderT (smpServer started cfg attachHTTP_) type M s a = ReaderT (Env s) IO a type AttachHTTP = Socket -> TLS.Context -> IO () diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 10b12e091..d043fd3c8 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -19,7 +19,6 @@ import Control.Monad import Control.Monad.Except (runExceptT) import Data.ByteString.Char8 (ByteString) import Data.List.NonEmpty (NonEmpty) -import qualified Data.Map.Strict as M import qualified Data.X509 as X import qualified Data.X509.Validation as XV import Network.Socket @@ -31,7 +30,7 @@ import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClie import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Protocol -import Simplex.Messaging.Server (runSMPServerBlocking_) +import Simplex.Messaging.Server (runSMPServerBlocking) import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..)) import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) @@ -39,14 +38,14 @@ import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client import Simplex.Messaging.Transport.Server import Simplex.Messaging.Transport.Shared (ChainCertificates (..), chainIdCaCerts) -import Simplex.Messaging.Util (ifM, ($>>=)) +import Simplex.Messaging.Util (ifM) import Simplex.Messaging.Version import Simplex.Messaging.Version.Internal import System.Info (os) import Test.Hspec hiding (fit, it) import UnliftIO.Concurrent import qualified UnliftIO.Exception as E -import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, readTVarIO, takeTMVar) +import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar) import UnliftIO.Timeout (timeout) import Util @@ -358,33 +357,14 @@ withServerCfg :: AServerConfig -> (forall s. ServerConfig s -> a) -> a withServerCfg (ASrvCfg _ _ cfg') f = f cfg' withSmpServerConfigOn :: HasCallStack => ASrvTransport -> AServerConfig -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a -withSmpServerConfigOn t srvCfg port' f = withSmpServerConfigEnvOn t srvCfg port' (const f) +withSmpServerConfigOn t (ASrvCfg _ _ cfg') port' = + serverBracket + (\started -> runSMPServerBlocking started cfg' {transports = [(port', t, False)]} Nothing) + (threadDelay 10000) withSmpServerThreadOn :: HasCallStack => (ASrvTransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a withSmpServerThreadOn (t, msType) = withSmpServerConfigOn t (cfgMS msType) --- the message store type is existential in AServerConfig, so the running Env is wrapped to hide it -data SomeEnv = forall s. SomeEnv (Env s) - --- runs the server like withSmpServerConfigOn, also passing its Env for white-box assertions on internal state -withSmpServerConfigEnvOn :: HasCallStack => ASrvTransport -> AServerConfig -> ServiceName -> (HasCallStack => SomeEnv -> ThreadId -> IO a) -> IO a -withSmpServerConfigEnvOn t (ASrvCfg _ _ cfg') port' f = do - envVar <- newEmptyTMVarIO - serverBracket - (\started -> runSMPServerBlocking_ (atomically . putTMVar envVar . SomeEnv) started cfg' {transports = [(port', t, False)]} Nothing) - (threadDelay 10000) - (\tId -> atomically (takeTMVar envVar) >>= (`f` tId)) - --- size of the service client's delivery subscriptions map, Nothing if no client is subscribed as the service -serviceSubsMapSize :: SomeEnv -> ServiceId -> IO (Maybe Int) -serviceSubsMapSize (SomeEnv env) serviceId = - getSubscribedClient serviceId serviceSubscribers - $>>= readTVarIO - $>>= \Client {subscriptions} -> Just . M.size <$> readTVarIO subscriptions - where - Server {subscribers} = server env - ServerSubscribers {serviceSubscribers} = subscribers - serverBracket :: HasCallStack => (TMVar Bool -> IO ()) -> IO () -> (HasCallStack => ThreadId -> IO a) -> IO a serverBracket process afterProcess f = do started <- newEmptyTMVarIO diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index e2bf341ba..ecd923619 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -93,7 +93,6 @@ serverTests = do testServiceDeliverSubscribe testServiceUpgradeAndDowngrade testServiceSubsTotalCount - testServiceSubsRemovedOnQueueDelete describe "Store log" testWithStoreLog describe "Restore messages" testRestoreMessages describe "Restore messages (old / v2)" testRestoreExpireMessages @@ -938,56 +937,6 @@ testServiceSubsTotalCount = threadDelay 1500000 readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 3 --- regression test for a memory leak: a service delivery subscription stayed in the service client's --- subscriptions map after its queue was unassociated/deleted by another client, growing without bound --- for the lifetime of a long-lived service connection. -testServiceSubsRemovedOnQueueDelete :: SpecWith (ASrvTransport, AStoreType) -testServiceSubsRemovedOnQueueDelete = - it "should remove service delivery subscription when its queue is deleted by another client" $ \(at@(ATransport t), msType) -> do - g <- C.newRandom - creds <- genCredentials g Nothing (0, 2400) "localhost" - let (_fp, tlsCred) = tlsCredentials [creds] - serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g - let aServicePK = C.APrivateAuthKey C.SEd25519 servicePK - withSmpServerConfigEnvOn at (cfgMS msType) testPort $ \senv _ -> do - (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - (dhPub, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g - (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - runSMPClient t $ \h -> do - (rId, sId, serviceId) <- runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do - Resp "1" NoEntity (Ids_ rId sId _srvDh serviceId) <- serviceSignSendRecv sh rKey servicePK ("1", NoEntity, New rPub dhPub) - Resp "2" _ OK <- signSendRecv h sKey ("2", sId, SKEY sPub) - pure (rId, sId, serviceId) - runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do - signSend_ sh aServicePK Nothing ("3", serviceId, SUBS 1 (queueIdsHash [rId])) - -- drain SOKS and ALLS (they arrive on separate threads); SOKS content is covered by testServiceSubsTotalCount - void $ - receiveInAnyOrder - sh - [ \case - Resp "3" _ (SOKS _ _) -> pure $ Just () - _ -> pure Nothing, - \case - Resp "" NoEntity ALLS -> pure $ Just () - _ -> pure Nothing - ] - -- a SEND to the service-associated queue creates a delivery subscription in the service client's map - Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello") - Resp "" _ (Msg _ _) <- tGet1 sh - serviceSubsMapSize senv serviceId `shouldReturn` Just 1 - -- a different client deletes the queue while the service stays connected - runSMPClient t $ \dh -> do - Resp "5" _ OK <- signSendRecv dh rKey ("5", rId, DEL) - pure () - -- the queue deletion is processed asynchronously; the delivery subscription must be removed - let go n = - serviceSubsMapSize senv serviceId >>= \case - Just 0 -> pure () - s - | n > 0 -> threadDelay 100000 >> go (n - 1) - | otherwise -> s `shouldBe` Just 0 - go (20 :: Int) - readServiceSubsMetric :: String -> Maybe Int readServiceSubsMetric content = case filter ("simplex_smp_subscribtion_service_subs_total " `isPrefixOf`) (lines content) of From 89963e0c7abd735bac788ccdb67175daffc942d6 Mon Sep 17 00:00:00 2001 From: shum Date: Mon, 29 Jun 2026 15:25:16 +0000 Subject: [PATCH 4/4] refactor: simplify unsubPrev with applicative Express the cancel-if-present logic as sequence_ (unsub_ <*> s_). --- src/Simplex/Messaging/Server.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 005b4301b..b8eee5a19 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -338,7 +338,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt forM_ unsub_ $ \unsub -> atomically (swapTVar (clientSubs c) M.empty) >>= mapM_ unsub where unsubPrev :: Maybe sub -> IO () - unsubPrev s_ = forM_ unsub_ $ \unsub -> mapM_ unsub s_ + unsubPrev s_ = sequence_ (unsub_ <*> s_) endSub :: Client s -> QueueId -> STM (Maybe sub) endSub c qId = TM.lookupDelete qId (clientSubs c) >>= (removeWhenNoSubs c $>) endServiceQueueSub :: Client s -> QueueId -> STM (Maybe sub)