Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/mobility-core/src/Kernel/Beam/Lib/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import qualified EulerHS.Language as L
import EulerHS.Prelude
import qualified Kafka.Producer as KafkaProd
import Kernel.Beam.Types (KafkaConn (..))
import qualified Kernel.Streaming.Kafka.Producer as KafkaProducer
import Kernel.Types.App
import Kernel.Types.Error
import Kernel.Utils.Error.Throwing (throwError)
import Kernel.Utils.Logging (logError)
import Text.Casing (camel, quietSnake)

textToSnakeCaseText :: Text -> Text
Expand Down Expand Up @@ -67,8 +69,10 @@ pushToKafka messageRecord topic key = do
case kafkaProducerTools of
Nothing -> throwError $ InternalError "Kafka producer tools not found"
Just kafkaProducerTools' -> do
mbErr <- KafkaProd.produceMessage kafkaProducerTools'.producer (kafkaMessage topic messageRecord key)
let msg = kafkaMessage topic messageRecord key
mbErr <- KafkaProd.produceMessage kafkaProducerTools'.producer msg
whenJust mbErr (throwError . KafkaUnableToProduceMessage)
KafkaProducer.produceToSecondaryProducer kafkaProducerTools'.secondaryProducer msg logError

kafkaMessage :: ToJSON a => Text -> a -> Text -> KafkaProd.ProducerRecord
kafkaMessage topicName event key =
Expand Down
18 changes: 18 additions & 0 deletions lib/mobility-core/src/Kernel/Streaming/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ module Kernel.Streaming.Kafka.Producer
( buildKafkaProducerTools,
Kernel.Streaming.Kafka.Producer.produceMessage,
produceMessageInPartition,
produceToSecondaryProducer,
releaseKafkaProducerTools,
(..=),
A.Value (Object),
A.emptyObject,
shouldProduceSecondary,
)
where

Expand All @@ -35,21 +37,37 @@ import Kernel.Streaming.Kafka.Producer.Types
import Kernel.Types.Error
import Kernel.Types.Logging
import Kernel.Utils.Error.Throwing (throwError)
import Kernel.Utils.Logging (logError)
import System.Environment (lookupEnv)

type KPartitionId = Int

shouldProduceSecondary :: IO Bool
shouldProduceSecondary = fromMaybe False . (readMaybe =<<) <$> lookupEnv "PRODUCE_SECONDARY_KAFKA"

produceMessage :: (Log m, MonadThrow m, MonadIO m, MonadReader r m, HasKafkaProducer r, ToJSON a) => (KafkaTopic, Maybe KafkaKey) -> a -> m ()
produceMessage (topic, key) event = produceMessageImpl (topic, key) event Nothing

produceMessageInPartition :: (Log m, MonadThrow m, MonadIO m, MonadReader r m, HasKafkaProducer r, ToJSON a) => (KafkaTopic, Maybe KafkaKey) -> a -> KPartitionId -> m ()
produceMessageInPartition (topic, key) event partitionId = produceMessageImpl (topic, key) event $ Just partitionId

-- | Push to secondary Kafka producer if present. Calls onError with message on failure (does not throw).
produceToSecondaryProducer :: (MonadIO m) => Maybe KafkaProd.KafkaProducer -> KafkaProd.ProducerRecord -> (Text -> m ()) -> m ()
produceToSecondaryProducer mbSecondaryProducer record onError = do
shouldProduce <- liftIO shouldProduceSecondary
when shouldProduce $
whenJust mbSecondaryProducer $ \secondaryProd -> do
mbErr <- liftIO $ KafkaProd.produceMessage secondaryProd record
whenJust mbErr $ \err ->
onError $ "Secondary Kafka produce failed: " <> show err

produceMessageImpl :: (Log m, MonadThrow m, MonadIO m, MonadReader r m, HasKafkaProducer r, ToJSON a) => (KafkaTopic, Maybe KafkaKey) -> a -> Maybe KPartitionId -> m ()
produceMessageImpl (topic, key) event mbPartitionId = do
when (null topic) $ throwM KafkaTopicIsEmptyString
kafkaProducerTools <- asks (.kafkaProducerTools)
mbErr <- KafkaProd.produceMessage kafkaProducerTools.producer message
whenJust mbErr (throwError . KafkaUnableToProduceMessage)
produceToSecondaryProducer kafkaProducerTools.secondaryProducer message logError
where
message =
ProducerRecord
Expand Down
27 changes: 19 additions & 8 deletions lib/mobility-core/src/Kernel/Streaming/Kafka/Producer/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

module Kernel.Streaming.Kafka.Producer.Types
( KafkaProducerCfg (..),
KafkaProducerTools,
KafkaProducerTools (..),
buildKafkaProducerTools,
buildKafkaProducerTools',
releaseKafkaProducerTools,
Expand All @@ -41,8 +41,9 @@ data KafkaProducerCfg = KafkaProducerCfg
}
deriving (Generic, FromDhall)

newtype KafkaProducerTools = KafkaProducerTools
{ producer :: Producer.KafkaProducer
data KafkaProducerTools = KafkaProducerTools
{ producer :: Producer.KafkaProducer,
secondaryProducer :: Maybe Producer.KafkaProducer
}
deriving (Generic)

Expand All @@ -67,15 +68,25 @@ castCompression kafkaCompression =
SNAPPY -> Snappy
LZ4 -> Lz4

buildKafkaProducerTools' :: KafkaProducerCfg -> [KTC.KafkaProperties] -> IO KafkaProducerTools
buildKafkaProducerTools' kafkaProducerCfg kafkaProperties = do
buildKafkaProducerTools' :: KafkaProducerCfg -> Maybe KafkaProducerCfg -> [KTC.KafkaProperties] -> IO KafkaProducerTools
buildKafkaProducerTools' kafkaProducerCfg mbSecondaryCfg kafkaProperties = do
producer <- newProducer (addProperties (producerProps kafkaProducerCfg) kafkaProperties) >>= either (throwM . KafkaUnableToBuildTools) return
secondaryProducer <- case mbSecondaryCfg of
Nothing -> pure Nothing
Just secondaryCfg ->
Just <$> (newProducer (producerProps secondaryCfg) >>= either (throwM . KafkaUnableToBuildTools) return)
return $ KafkaProducerTools {..}

buildKafkaProducerTools :: KafkaProducerCfg -> IO KafkaProducerTools
buildKafkaProducerTools kafkaProducerCfg = do
buildKafkaProducerTools :: KafkaProducerCfg -> Maybe KafkaProducerCfg -> IO KafkaProducerTools
buildKafkaProducerTools kafkaProducerCfg mbSecondaryCfg = do
producer <- newProducer (producerProps kafkaProducerCfg) >>= either (throwM . KafkaUnableToBuildTools) return
secondaryProducer <- case mbSecondaryCfg of
Nothing -> pure Nothing
Just secondaryCfg ->
Just <$> (newProducer (producerProps secondaryCfg) >>= either (throwM . KafkaUnableToBuildTools) return)
return $ KafkaProducerTools {..}

releaseKafkaProducerTools :: KafkaProducerTools -> IO ()
releaseKafkaProducerTools kafkaProducerTools = closeProducer kafkaProducerTools.producer
releaseKafkaProducerTools kafkaProducerTools = do
closeProducer kafkaProducerTools.producer
whenJust kafkaProducerTools.secondaryProducer closeProducer
5 changes: 4 additions & 1 deletion lib/mobility-core/src/Kernel/Tools/ARTUtils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Data.Default.Class
import qualified Data.Text.Encoding as TE
import qualified Kafka.Producer as KafkaProd
import Kernel.Prelude
import qualified Kernel.Streaming.Kafka.Producer as KafkaProducer
import Kernel.Streaming.Kafka.Producer.Types
import Kernel.Utils.IOLogging (LoggerEnv)

Expand Down Expand Up @@ -51,7 +52,9 @@ pushToKafka kafkaConn messageRecord topic key = do
case kafkaConn of
Nothing -> pure ()
Just kafkaProducerTools' -> do
void $ KafkaProd.produceMessage kafkaProducerTools'.producer (kafkaMessage topic messageRecord key)
let msg = kafkaMessage topic messageRecord key
void $ KafkaProd.produceMessage kafkaProducerTools'.producer msg
KafkaProducer.produceToSecondaryProducer kafkaProducerTools'.secondaryProducer msg $ \_ -> pure ()

kafkaMessage :: Text -> BL.ByteString -> Text -> KafkaProd.ProducerRecord
kafkaMessage topicName event key =
Expand Down