diff --git a/lib/mobility-core/src/Kernel/Beam/Lib/Utils.hs b/lib/mobility-core/src/Kernel/Beam/Lib/Utils.hs index a5298882c..381f431fe 100644 --- a/lib/mobility-core/src/Kernel/Beam/Lib/Utils.hs +++ b/lib/mobility-core/src/Kernel/Beam/Lib/Utils.hs @@ -29,9 +29,11 @@ import qualified EulerHS.Language as L import EulerHS.Prelude import qualified Kafka.Producer as KafkaProd import Kernel.Beam.Types (KafkaConn (..)) +import qualified Kernel.Streaming.Kafka.Producer as KafkaProducer import Kernel.Types.App import Kernel.Types.Error import Kernel.Utils.Error.Throwing (throwError) +import Kernel.Utils.Logging (logError) import Text.Casing (camel, quietSnake) textToSnakeCaseText :: Text -> Text @@ -67,8 +69,10 @@ pushToKafka messageRecord topic key = do case kafkaProducerTools of Nothing -> throwError $ InternalError "Kafka producer tools not found" Just kafkaProducerTools' -> do - mbErr <- KafkaProd.produceMessage kafkaProducerTools'.producer (kafkaMessage topic messageRecord key) + let msg = kafkaMessage topic messageRecord key + mbErr <- KafkaProd.produceMessage kafkaProducerTools'.producer msg whenJust mbErr (throwError . KafkaUnableToProduceMessage) + KafkaProducer.produceToSecondaryProducer kafkaProducerTools'.secondaryProducer msg logError kafkaMessage :: ToJSON a => Text -> a -> Text -> KafkaProd.ProducerRecord kafkaMessage topicName event key = diff --git a/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer.hs b/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer.hs index bb5abd306..3277e85f7 100644 --- a/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer.hs +++ b/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer.hs @@ -16,10 +16,12 @@ module Kernel.Streaming.Kafka.Producer ( buildKafkaProducerTools, Kernel.Streaming.Kafka.Producer.produceMessage, produceMessageInPartition, + produceToSecondaryProducer, releaseKafkaProducerTools, (..=), A.Value (Object), A.emptyObject, + shouldProduceSecondary, ) where @@ -35,21 +37,37 @@ import Kernel.Streaming.Kafka.Producer.Types import Kernel.Types.Error import Kernel.Types.Logging import Kernel.Utils.Error.Throwing (throwError) +import Kernel.Utils.Logging (logError) +import System.Environment (lookupEnv) type KPartitionId = Int +shouldProduceSecondary :: IO Bool +shouldProduceSecondary = fromMaybe False . (readMaybe =<<) <$> lookupEnv "PRODUCE_SECONDARY_KAFKA" + produceMessage :: (Log m, MonadThrow m, MonadIO m, MonadReader r m, HasKafkaProducer r, ToJSON a) => (KafkaTopic, Maybe KafkaKey) -> a -> m () produceMessage (topic, key) event = produceMessageImpl (topic, key) event Nothing produceMessageInPartition :: (Log m, MonadThrow m, MonadIO m, MonadReader r m, HasKafkaProducer r, ToJSON a) => (KafkaTopic, Maybe KafkaKey) -> a -> KPartitionId -> m () produceMessageInPartition (topic, key) event partitionId = produceMessageImpl (topic, key) event $ Just partitionId +-- | Push to secondary Kafka producer if present. Calls onError with message on failure (does not throw). +produceToSecondaryProducer :: (MonadIO m) => Maybe KafkaProd.KafkaProducer -> KafkaProd.ProducerRecord -> (Text -> m ()) -> m () +produceToSecondaryProducer mbSecondaryProducer record onError = do + shouldProduce <- liftIO shouldProduceSecondary + when shouldProduce $ + whenJust mbSecondaryProducer $ \secondaryProd -> do + mbErr <- liftIO $ KafkaProd.produceMessage secondaryProd record + whenJust mbErr $ \err -> + onError $ "Secondary Kafka produce failed: " <> show err + produceMessageImpl :: (Log m, MonadThrow m, MonadIO m, MonadReader r m, HasKafkaProducer r, ToJSON a) => (KafkaTopic, Maybe KafkaKey) -> a -> Maybe KPartitionId -> m () produceMessageImpl (topic, key) event mbPartitionId = do when (null topic) $ throwM KafkaTopicIsEmptyString kafkaProducerTools <- asks (.kafkaProducerTools) mbErr <- KafkaProd.produceMessage kafkaProducerTools.producer message whenJust mbErr (throwError . KafkaUnableToProduceMessage) + produceToSecondaryProducer kafkaProducerTools.secondaryProducer message logError where message = ProducerRecord diff --git a/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer/Types.hs b/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer/Types.hs index bbfcbfb2d..c3bd9b887 100644 --- a/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer/Types.hs +++ b/lib/mobility-core/src/Kernel/Streaming/Kafka/Producer/Types.hs @@ -14,7 +14,7 @@ module Kernel.Streaming.Kafka.Producer.Types ( KafkaProducerCfg (..), - KafkaProducerTools, + KafkaProducerTools (..), buildKafkaProducerTools, buildKafkaProducerTools', releaseKafkaProducerTools, @@ -41,8 +41,9 @@ data KafkaProducerCfg = KafkaProducerCfg } deriving (Generic, FromDhall) -newtype KafkaProducerTools = KafkaProducerTools - { producer :: Producer.KafkaProducer +data KafkaProducerTools = KafkaProducerTools + { producer :: Producer.KafkaProducer, + secondaryProducer :: Maybe Producer.KafkaProducer } deriving (Generic) @@ -67,15 +68,25 @@ castCompression kafkaCompression = SNAPPY -> Snappy LZ4 -> Lz4 -buildKafkaProducerTools' :: KafkaProducerCfg -> [KTC.KafkaProperties] -> IO KafkaProducerTools -buildKafkaProducerTools' kafkaProducerCfg kafkaProperties = do +buildKafkaProducerTools' :: KafkaProducerCfg -> Maybe KafkaProducerCfg -> [KTC.KafkaProperties] -> IO KafkaProducerTools +buildKafkaProducerTools' kafkaProducerCfg mbSecondaryCfg kafkaProperties = do producer <- newProducer (addProperties (producerProps kafkaProducerCfg) kafkaProperties) >>= either (throwM . KafkaUnableToBuildTools) return + secondaryProducer <- case mbSecondaryCfg of + Nothing -> pure Nothing + Just secondaryCfg -> + Just <$> (newProducer (producerProps secondaryCfg) >>= either (throwM . KafkaUnableToBuildTools) return) return $ KafkaProducerTools {..} -buildKafkaProducerTools :: KafkaProducerCfg -> IO KafkaProducerTools -buildKafkaProducerTools kafkaProducerCfg = do +buildKafkaProducerTools :: KafkaProducerCfg -> Maybe KafkaProducerCfg -> IO KafkaProducerTools +buildKafkaProducerTools kafkaProducerCfg mbSecondaryCfg = do producer <- newProducer (producerProps kafkaProducerCfg) >>= either (throwM . KafkaUnableToBuildTools) return + secondaryProducer <- case mbSecondaryCfg of + Nothing -> pure Nothing + Just secondaryCfg -> + Just <$> (newProducer (producerProps secondaryCfg) >>= either (throwM . KafkaUnableToBuildTools) return) return $ KafkaProducerTools {..} releaseKafkaProducerTools :: KafkaProducerTools -> IO () -releaseKafkaProducerTools kafkaProducerTools = closeProducer kafkaProducerTools.producer +releaseKafkaProducerTools kafkaProducerTools = do + closeProducer kafkaProducerTools.producer + whenJust kafkaProducerTools.secondaryProducer closeProducer diff --git a/lib/mobility-core/src/Kernel/Tools/ARTUtils.hs b/lib/mobility-core/src/Kernel/Tools/ARTUtils.hs index 39e3dedaa..c00913c9c 100644 --- a/lib/mobility-core/src/Kernel/Tools/ARTUtils.hs +++ b/lib/mobility-core/src/Kernel/Tools/ARTUtils.hs @@ -6,6 +6,7 @@ import Data.Default.Class import qualified Data.Text.Encoding as TE import qualified Kafka.Producer as KafkaProd import Kernel.Prelude +import qualified Kernel.Streaming.Kafka.Producer as KafkaProducer import Kernel.Streaming.Kafka.Producer.Types import Kernel.Utils.IOLogging (LoggerEnv) @@ -51,7 +52,9 @@ pushToKafka kafkaConn messageRecord topic key = do case kafkaConn of Nothing -> pure () Just kafkaProducerTools' -> do - void $ KafkaProd.produceMessage kafkaProducerTools'.producer (kafkaMessage topic messageRecord key) + let msg = kafkaMessage topic messageRecord key + void $ KafkaProd.produceMessage kafkaProducerTools'.producer msg + KafkaProducer.produceToSecondaryProducer kafkaProducerTools'.secondaryProducer msg $ \_ -> pure () kafkaMessage :: Text -> BL.ByteString -> Text -> KafkaProd.ProducerRecord kafkaMessage topicName event key =