From c0ffa7ad8ddfbb580fbe6b8de4e55a037d64749f Mon Sep 17 00:00:00 2001 From: Brian Carroll Date: Wed, 21 Feb 2024 10:51:25 +0000 Subject: [PATCH 1/3] Modify pause-resume bug example to be more like the kafka fire PHX-1253 --- .../scripts/pause-resume-bug/Consumer.hs | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/nri-kafka/scripts/pause-resume-bug/Consumer.hs b/nri-kafka/scripts/pause-resume-bug/Consumer.hs index 96a45ad6..f8900204 100644 --- a/nri-kafka/scripts/pause-resume-bug/Consumer.hs +++ b/nri-kafka/scripts/pause-resume-bug/Consumer.hs @@ -1,14 +1,14 @@ module Consumer where import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, putMVar, tryTakeMVar, withMVar) -import Control.Monad (void) +import Control.Concurrent.MVar (MVar, newMVar, withMVar) +import Control.Monad (void, when) import qualified Environment import qualified Kafka.Worker as Kafka import Message -import System.Environment (setEnv) -import System.IO (Handle, hPutStrLn, stderr, stdout) -import Prelude (IO, String, show) +import System.Environment (getEnv, setEnv) +import System.IO (Handle, hPutStrLn, stdout) +import Prelude (IO, String, mod, show, fromIntegral, pure) main :: IO () main = do @@ -20,39 +20,26 @@ main = do setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20" setEnv "KAFKA_POLL_BATCH_SIZE" "5" + fireDelay <- readIntEnvVar "FIRE_DELAY" + fireModulo <- readIntEnvVar "FIRE_MODULO" + settings <- Environment.decode Kafka.decoder doAnythingHandler <- Platform.doAnythingHandler - lastId <- newEmptyMVar lock <- newMVar () let processMsg (msg :: Message) = ( do - let msgId = ("ID(" ++ show (id msg) ++ ")") - prevId <- tryTakeMVar lastId - - case (prevId, id msg) of - (Nothing, _) -> - printAtomic lock stdout (msgId ++ " First message has been received") - (_, 1) -> - printAtomic lock stdout (msgId ++ " Producer has been restarted") - (Just prev, curr) - | prev + 1 == curr -> - -- This is the expected behavior - printAtomic lock stdout (msgId ++ " OK") - (Just prev, curr) -> - -- This is the bug - printAtomic - lock - stderr - ( "ERROR: Expected ID " - ++ show (prev + 1) - ++ " but got " - ++ show curr - ) - - putMVar lastId (id msg) - threadDelay 200000 + let msgId = id msg + let msgIdStr = "ID(" ++ show msgId ++ ")" + when + (msgId `mod` fireModulo == 0) + ( do + printAtomic lock stdout (msgIdStr ++ " Pausing consumer (simulating stuck MySQL)") + threadDelay (fromIntegral fireDelay * 1000000) + ) + printAtomic lock stdout (msgIdStr ++ " Done") + threadDelay 2000 ) |> fmap Ok |> Platform.doAnything doAnythingHandler @@ -66,3 +53,12 @@ printAtomic lock handle msg = do |> withMVar lock |> forkIO |> void + +readIntEnvVar :: String -> IO Int +readIntEnvVar name = do + valueStr <- getEnv name + valueStr + |> Text.fromList + |> Text.toInt + |> Maybe.withDefault (Debug.todo (Text.fromList name ++ " must be a number")) + |> pure From 2b7db7d6607955f775ab2c82b51d278eff8a5aeb Mon Sep 17 00:00:00 2001 From: Brian Carroll Date: Wed, 21 Feb 2024 15:10:37 +0000 Subject: [PATCH 2/3] Print a message on polling --- nri-kafka/src/Kafka/Worker/Fetcher.hs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/nri-kafka/src/Kafka/Worker/Fetcher.hs b/nri-kafka/src/Kafka/Worker/Fetcher.hs index 273b6b99..1623aceb 100644 --- a/nri-kafka/src/Kafka/Worker/Fetcher.hs +++ b/nri-kafka/src/Kafka/Worker/Fetcher.hs @@ -11,6 +11,7 @@ import qualified Kafka.Worker.Analytics as Analytics import qualified Kafka.Worker.Partition as Partition import qualified Kafka.Worker.Settings as Settings import qualified Prelude +import qualified Data.Either type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd) @@ -76,8 +77,12 @@ pollingLoop' -- See https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/CHANGELOG.md?plain=1#L90-L95 -- -- We have a small app to reproduce the bug. Check out scripts/pause-resume-bug/README.md - MVar.withMVar consumerLock - <| \_ -> Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize + MVar.withMVar consumerLock <| \_ -> do + Prelude.putStrLn "Polling for messages..." + em <- Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize + let digits = List.filterMap recordContents em |> ByteString.intercalate ", " + Prelude.putStrLn <| "Polling done. Found messages: " ++ Prelude.show digits + Prelude.pure em msgs <- Prelude.traverse handleKafkaError eitherMsgs assignment <- Consumer.assignment consumer @@ -103,6 +108,13 @@ pollingLoop' throttle maxMsgsPerSecondPerPartition maxPollIntervalMs (List.length appendResults) analytics now lastPollTimestamp pollingLoop' settings enqueueRecord analytics consumer consumerLock (pollTimeIsOld now) +recordContents :: Data.Either.Either x ConsumerRecord -> Maybe ByteString.ByteString +recordContents (Data.Either.Left _) = Nothing +recordContents (Data.Either.Right record) = do + val <- Consumer.crValue record + let digits = ByteString.filter (\c -> c >= 48 && c <= 57) val + Just digits + getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId) getPartitionKey record = ( Consumer.crTopic record, From 380ade3297f0fd5dc959d568a26e0f66a0ad23d9 Mon Sep 17 00:00:00 2001 From: Brian Carroll Date: Wed, 21 Feb 2024 15:13:55 +0000 Subject: [PATCH 3/3] Set default values for fire env vars --- nri-kafka/scripts/pause-resume-bug/Consumer.hs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nri-kafka/scripts/pause-resume-bug/Consumer.hs b/nri-kafka/scripts/pause-resume-bug/Consumer.hs index f8900204..02929822 100644 --- a/nri-kafka/scripts/pause-resume-bug/Consumer.hs +++ b/nri-kafka/scripts/pause-resume-bug/Consumer.hs @@ -20,8 +20,8 @@ main = do setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20" setEnv "KAFKA_POLL_BATCH_SIZE" "5" - fireDelay <- readIntEnvVar "FIRE_DELAY" - fireModulo <- readIntEnvVar "FIRE_MODULO" + fireDelay <- readIntEnvVar "FIRE_DELAY" 31 -- seconds + fireModulo <- readIntEnvVar "FIRE_MODULO" 5 -- sleep on every Nth message settings <- Environment.decode Kafka.decoder doAnythingHandler <- Platform.doAnythingHandler @@ -54,11 +54,11 @@ printAtomic lock handle msg = do |> forkIO |> void -readIntEnvVar :: String -> IO Int -readIntEnvVar name = do +readIntEnvVar :: String -> Int -> IO Int +readIntEnvVar name defaultVal = do valueStr <- getEnv name valueStr |> Text.fromList |> Text.toInt - |> Maybe.withDefault (Debug.todo (Text.fromList name ++ " must be a number")) + |> Maybe.withDefault defaultVal |> pure