diff --git a/log-elasticsearch/CHANGELOG.md b/log-elasticsearch/CHANGELOG.md index 1236caf..6d2ee75 100644 --- a/log-elasticsearch/CHANGELOG.md +++ b/log-elasticsearch/CHANGELOG.md @@ -1,3 +1,9 @@ +# log-elasticsearch-x.x.x.x (xxxx-xx-xx) +* Add support for specifying a retry policy on elasticsearch interactions. +* Add testsuite for logging to elastic. +* Make default retry policy a short 3.5s window, and subsequently dropping + log messages. + # log-elasticsearch-0.13.0.2 (2025-06-26) * Switch back to `http-client-tls` by default since `HsOpenSSL` is based on a deprecated `openssl` API (soon to be removed). diff --git a/log-elasticsearch/log-elasticsearch.cabal b/log-elasticsearch/log-elasticsearch.cabal index 4efa95a..e57632f 100644 --- a/log-elasticsearch/log-elasticsearch.cabal +++ b/log-elasticsearch/log-elasticsearch.cabal @@ -28,7 +28,26 @@ flag openssl description: Use http-client-openssl instead of http-client-tls default: False +common shared + ghc-options: -Wall + default-language: Haskell2010 + default-extensions: BangPatterns + , FlexibleContexts + , FlexibleInstances + , GeneralizedNewtypeDeriving + , LambdaCase + , MultiParamTypeClasses + , NumericUnderscores + , OverloadedStrings + , RankNTypes + , RecordWildCards + , ScopedTypeVariables + , TupleSections + , TypeFamilies + , UndecidableInstances + library + import: shared exposed-modules: Log.Backend.ElasticSearch Log.Backend.ElasticSearch.Internal Log.Backend.ElasticSearch.Lens @@ -38,10 +57,12 @@ library base64-bytestring, bytestring, deepseq, + exceptions, http-client, http-types, log-base >= 0.10 && <0.13, network-uri, + retry, semigroups, text, text-show >= 2, @@ -52,25 +73,35 @@ library vector hs-source-dirs: src - ghc-options: -Wall - - default-language: Haskell2010 - default-extensions: BangPatterns - , FlexibleContexts - , FlexibleInstances - , GeneralizedNewtypeDeriving - , LambdaCase - , MultiParamTypeClasses - , OverloadedStrings - , RankNTypes - , RecordWildCards - , ScopedTypeVariables - , TupleSections - , TypeFamilies - , UndecidableInstances - if flag(openssl) cpp-options: -DOPENSSL build-depends: http-client-openssl >= 0.3.2 else build-depends: http-client-tls >= 0.3.6.3 + +test-suite log-elasticsearch-tests + import: shared + type: exitcode-stdio-1.0 + main-is: Driver.hs + hs-source-dirs: tests + ghc-options: -threaded -rtsopts + build-depends: + , base + , aeson + , containers + , log-base + , log-elasticsearch + , hedgehog + , http-client + , http-types + , tasty + , tasty-hedgehog + , tasty-hunit + , tasty-discover + , text + , time + , uuid + build-tool-depends: + tasty-discover:tasty-discover + other-modules: + ElasticSearchTest diff --git a/log-elasticsearch/src/Log/Backend/ElasticSearch.hs b/log-elasticsearch/src/Log/Backend/ElasticSearch.hs index 27a9d6d..ba17d61 100644 --- a/log-elasticsearch/src/Log/Backend/ElasticSearch.hs +++ b/log-elasticsearch/src/Log/Backend/ElasticSearch.hs @@ -15,10 +15,11 @@ module Log.Backend.ElasticSearch ) where import Control.Applicative -import Control.Concurrent -import Control.Exception +import Control.Exception (SomeAsyncException) import Control.Monad +import Control.Monad.Catch import Control.Monad.IO.Unlift +import Control.Retry import Data.Aeson import Data.Aeson.Encode.Pretty import Data.IORef @@ -62,20 +63,15 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do mkBulkLogger "ElasticSearch" (\msgs -> do now <- getCurrentTime oldIndex <- readIORef indexRef - retryOnException versionRef $ do + retryTransientExceptions versionRef $ do -- We need to consider version of ES because ES >= 5.0.0 and ES >= 7.0.0 -- have slight differences in parts of API used for logging. version <- readIORef versionRef >>= \case Just version -> pure version Nothing -> serverInfo env >>= \case - Left (ex :: HttpException) -> error - $ "elasticSearchLogger: unexpected error: " - <> show ex - <> " (is ElasticSearch server running?)" + Left (ex :: HttpException) -> throwM $ ElasticSearchCouldNotConnectToServerError ex Right reply -> case parseEsVersion $ responseBody reply of - Nothing -> error - $ "elasticSearchLogger: invalid response when parsing version number: " - <> show reply + Nothing -> throwM $ ElasticSearchCouldNotParseVersion reply Just version -> pure version -- Elasticsearch index names are additionally indexed by date so that each -- day is logged to a separate index to make log management easier. @@ -115,16 +111,16 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do Nothing -> newData modifyData _ v = v - keyAddValueTypeSuffix k v acc = AC.insert - (case v of + keyAddValueTypeSuffix k v = flip AC.insert (modifyData Nothing v) $ + case v of Object{} -> k <> "_object" Array{} -> k <> "_array" String{} -> k <> "_string" Number{} -> k <> "_number" Bool{} -> k <> "_bool" Null{} -> k <> "_null" - ) (modifyData Nothing v) acc in adjustFailedMessagesWith modifyData jsonMsgs responses + -- Attempt to put modified messages. newReply <- responseBody <$> bulkIndex version env esConf index newMsgs case checkForBulkErrors newMsgs newReply of @@ -140,7 +136,7 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do in adjustFailedMessagesWith modifyData newMsgs newResponses -- Ignore any further errors. void $ bulkIndex version env esConf index newerMsgs) - (refreshIndex env =<< readIORef indexRef) + (recoverAll esRetryPolicy . const . refreshIndex env =<< readIORef indexRef) where -- Process reply of bulk indexing to get responses for each index operation -- and check whether any insertion failed. @@ -176,17 +172,21 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do printEsError msg body = T.putStrLn $ "elasticSearchLogger: " <> msg <> " " <> prettyJson body - retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r - retryOnException versionRef m = try m >>= \case - Left (ex::SomeException) -> do - putStrLn $ "ElasticSearch: unexpected error: " - <> show ex <> ", retrying in 10 seconds" - -- If there was an exception, ElasticSearch version might've changed, so - -- reset it. - writeIORef versionRef Nothing - threadDelay $ 10 * 1000000 - retryOnException versionRef m - Right result -> return result + retryTransientExceptions :: IORef (Maybe EsVersion) -> IO () -> IO () + retryTransientExceptions versionRef m = + catches (recovering esRetryPolicy (skipAsyncExceptions <> [const $ Handler $ allSomeExceptions versionRef]) (const m)) + [ -- Rethrow async exceptions so this thread doesn't unintentionally get + -- orphaned by the next handler + Handler $ \(e :: SomeAsyncException) -> throwM e + , Handler $ \(e :: SomeException) -> case esRetryFailure of + ElasticSearchThrowLogFailure -> throwM e + ElasticSearchDropLogMessage -> pure () + ] + + allSomeExceptions :: IORef (Maybe EsVersion) -> SomeException -> IO Bool + allSomeExceptions versionRef _ = do + writeIORef versionRef Nothing + pure True prettyJson :: Value -> T.Text prettyJson = TL.toStrict @@ -208,13 +208,11 @@ checkElasticSearchLogin ElasticSearchConfig{..} = liftIO $ do when (isJust esLogin && not esLoginInsecure && not ("https:" `T.isPrefixOf` esServer)) $ - error $ "ElasticSearch: insecure login: " - <> "Attempting to send login credentials over an insecure connection. " - <> "Set esLoginInsecure = True to disable this check." + throwM ElasticSearchInsecureLogin -- | Check that we can connect to the ES server. -- -- @since 0.10.0.0 checkElasticSearchConnection :: MonadIO m => ElasticSearchConfig -> m (Either HttpException ()) checkElasticSearchConnection esConf = liftIO $ do - fmap (const ()) <$> (serverInfo =<< mkEsEnv esConf) + void <$> (serverInfo =<< mkEsEnv esConf) diff --git a/log-elasticsearch/src/Log/Backend/ElasticSearch/Internal.hs b/log-elasticsearch/src/Log/Backend/ElasticSearch/Internal.hs index 78e38b6..95350bc 100644 --- a/log-elasticsearch/src/Log/Backend/ElasticSearch/Internal.hs +++ b/log-elasticsearch/src/Log/Backend/ElasticSearch/Internal.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DeriveGeneric #-} module Log.Backend.ElasticSearch.Internal ( ElasticSearchConfig(..) + , ElasticSearchLogFailure (..) , defaultElasticSearchConfig -- * ES version , EsVersion(..) @@ -19,9 +20,14 @@ module Log.Backend.ElasticSearch.Internal , dispatch , decodeReply , isSuccess + -- * ES error cases + , ElasticSearchCouldNotConnectToServerError (..) + , ElasticSearchCouldNotParseVersion (..) + , ElasticSearchInsecureLogin (..) ) where import Control.Exception +import Control.Retry import Control.Monad import Data.Aeson import Data.Ix (inRange) @@ -62,7 +68,20 @@ data ElasticSearchConfig = ElasticSearchConfig -- ^ Elasticsearch basic authentication username and password. , esLoginInsecure :: !Bool -- ^ Allow basic authentication over non-TLS connections. - } deriving (Eq, Show, Generic) + , esRetryPolicy :: RetryPolicyM IO + -- ^ Allow basic authentication over non-TLS connections. + -- + -- @since 0.xx.x.x + , esRetryFailure :: ElasticSearchLogFailure + -- ^ Allow specifying what should happen with log messages that fail to be + -- sent off to elastic. + -- + -- @since 0.xx.x.x + } deriving Generic + +-- | Indicates what to do when the logging action fails beyond the retry policy. +-- We either drop the log message (recommended), or we rethrow the exception. +data ElasticSearchLogFailure = ElasticSearchDropLogMessage | ElasticSearchThrowLogFailure -- | Sensible defaults for 'ElasticSearchConfig'. defaultElasticSearchConfig :: ElasticSearchConfig @@ -74,6 +93,10 @@ defaultElasticSearchConfig = ElasticSearchConfig , esMapping = "log" , esLogin = Nothing , esLoginInsecure = False + -- Will retry upto 3 times with exponential backoff and jitter. + -- So approx: sum [0.5 * (2 ** i) / 2 | i <- [1..3]] ~ 3.5s + , esRetryPolicy = limitRetries 3 <> fullJitterBackoff 500_000 -- 0.5 Secs + , esRetryFailure = ElasticSearchDropLogMessage } ---------------------------------------- @@ -247,3 +270,29 @@ isSuccess = statusCheck (inRange (200, 299)) where statusCheck :: (Int -> Bool) -> Response a -> Bool statusCheck p = p . statusCode . responseStatus + +---------------------------------------- + +newtype ElasticSearchCouldNotConnectToServerError = ElasticSearchCouldNotConnectToServerError HttpException + deriving Show + +instance Exception ElasticSearchCouldNotConnectToServerError where + displayException (ElasticSearchCouldNotConnectToServerError ex) = "elasticSearchLogger: unexpected error: " + <> show ex + <> " (is ElasticSearch server running?)" + +data ElasticSearchInsecureLogin = ElasticSearchInsecureLogin + deriving Show + +instance Exception ElasticSearchInsecureLogin where + displayException ElasticSearchInsecureLogin = "ElasticSearch: insecure login: " + <> "Attempting to send login credentials over an insecure connection. " + <> "Set esLoginInsecure = True to disable this check." + +newtype ElasticSearchCouldNotParseVersion = ElasticSearchCouldNotParseVersion (Response Value) + deriving Show + +instance Exception ElasticSearchCouldNotParseVersion where + displayException (ElasticSearchCouldNotParseVersion reply) = + "elasticSearchLogger: invalid response when parsing version number: " + <> show reply diff --git a/log-elasticsearch/tests/Driver.hs b/log-elasticsearch/tests/Driver.hs new file mode 100644 index 0000000..70c55f5 --- /dev/null +++ b/log-elasticsearch/tests/Driver.hs @@ -0,0 +1 @@ +{-# OPTIONS_GHC -F -pgmF tasty-discover #-} diff --git a/log-elasticsearch/tests/ElasticSearchTest.hs b/log-elasticsearch/tests/ElasticSearchTest.hs new file mode 100644 index 0000000..1f37f1d --- /dev/null +++ b/log-elasticsearch/tests/ElasticSearchTest.hs @@ -0,0 +1,119 @@ +module ElasticSearchTest where + +import Control.Exception +import Data.Aeson +import qualified Data.Aeson.Key as Key +import qualified Data.Aeson.KeyMap as KM +import Data.Aeson.Types +import Data.Set (Set) +import qualified Data.Set as Set +import qualified Data.Text as T +import Data.Time +import Data.Traversable (for) +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import Log +import Log.Backend.ElasticSearch +import Log.Backend.ElasticSearch.Internal +import Network.HTTP.Client +import Network.HTTP.Types +import System.IO.Unsafe (unsafePerformIO) +import System.Timeout +import Test.Tasty hiding (Timeout) +import Test.Tasty.HUnit + +-- NOTE: The test suite assumes a running instance of elasticsearch. Each test +-- suite then interacts against this instance with unique global indices / date +-- info. The run id and run date globals are fixed for the duration of the run. + +testRunId :: UUID.UUID +{-# NOINLINE testRunId #-} +testRunId = unsafePerformIO UUID.nextRandom + +testRunDate :: UTCTime +{-# NOINLINE testRunDate #-} +testRunDate = unsafePerformIO getCurrentTime + +-- | Key we keep constant across all indices we use in tests +indexTestKey :: T.Text +indexTestKey = "-log-elasticsearch-" + +test_elasticSearchLogger :: TestTree +test_elasticSearchLogger = do + -- Clear the elasticsearch instance of all test indices we are known to use + before_ (resetTestIndices (testConfig 0)) $ + testGroup + "elasticSearchLogger" + [ testCase "can log to elasticsearch" $ do + let config = testConfig 0 + withElasticSearchLogger config $ \logger -> do + runLogT "test" logger LogInfo $ do + logInfo_ "test-message" + logs <- readElasticSearchLogs config + assertEqual "found expected log messages" logs ["test-message"] + , testCase "logging creates the expected index" $ do + -- I think there's a race condition here where the logging action and + -- the fetch request happen on different days. Should be fine :shrug:. + let config = testConfig 1 + withElasticSearchLogger config $ \logger -> do + runLogT "test" logger LogInfo $ do + logInfo_ "test-message" + + indices <- getIndices config + assertBool "found expected logged indices" (loggedTestIndex config `Set.member` indices) + , testCase "does not hang with no available instance" $ do + let config = (testConfig 2) {esServer = "http://127.0.0.1:0"} + success (_ :: SomeException) = pure (Just ()) + + result <- handle success $ timeout 10_000_000 $ do + withElasticSearchLogger config $ \logger -> + runLogT "test" logger LogInfo $ + logInfo_ "test-message" + assertFailure "should have failed with an exception" + assertEqual "should not have timed out" result (Just ()) + ] + where + testIndex ix = T.pack (show testRunId) <> indexTestKey <> T.pack (show (ix :: Int)) + testConfig ix = defaultElasticSearchConfig {esIndex = testIndex ix} + before_ f = withResource f (const (pure ())) . const + +loggedTestIndex :: ElasticSearchConfig -> T.Text +loggedTestIndex config = esIndex config <> "-" <> T.pack (formatTime defaultTimeLocale "%F" testRunDate) + +readElasticSearchLogs :: ElasticSearchConfig -> IO [T.Text] +readElasticSearchLogs config = do + -- TODO: Might be better to reuse this environment across tests. + esEnv <- mkEsEnv config + response <- + dispatch esEnv methodGet [loggedTestIndex config, "_search"] $ + Just "{\"query\":{\"match_all\":{}}}" + let parser v = flip (withObject ".hits") v $ \obj -> do + hits <- obj .: "hits" + msgs <- hits .: "hits" + for msgs $ \msg -> do + source <- msg .: "_source" + source .: "message" + + runParser parser (responseBody response) + +getIndices :: ElasticSearchConfig -> IO (Set T.Text) +getIndices config = do + -- TODO: Might be better to reuse this environment across tests. + esEnv <- mkEsEnv config + response <- dispatch esEnv methodGet ["_aliases"] Nothing + let parser v = flip (withObject "_aliases.keys") v $ \obj -> + pure $ map fst $ KM.toList obj + Set.fromList . map Key.toText <$> runParser parser (responseBody response) + +runParser :: Applicative f => (a -> Parser b) -> a -> f b +runParser parser body = case parse parser body of + Error b -> error ("incorrect parser: " <> b) + Success s -> pure s + +resetTestIndices :: ElasticSearchConfig -> IO () +resetTestIndices config = do + -- TODO: Might be better to reuse this environment across tests, to reuse + -- HTTPManager and such. + esEnv <- mkEsEnv config + _ <- dispatch esEnv methodDelete ["*" <> indexTestKey <> "*"] Nothing + pure ()