Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions log-elasticsearch/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# log-elasticsearch-x.x.x.x (xxxx-xx-xx)
* Add support for specifying a retry policy on elasticsearch interactions.
* Add testsuite for logging to elastic.
* Make default retry policy a short 3.5s window, and subsequently dropping
log messages.

# log-elasticsearch-0.13.0.2 (2025-06-26)
* Switch back to `http-client-tls` by default since `HsOpenSSL` is based on a
deprecated `openssl` API (soon to be removed).
Expand Down
65 changes: 48 additions & 17 deletions log-elasticsearch/log-elasticsearch.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,26 @@ flag openssl
description: Use http-client-openssl instead of http-client-tls
default: False

common shared
ghc-options: -Wall
default-language: Haskell2010
default-extensions: BangPatterns
, FlexibleContexts
, FlexibleInstances
, GeneralizedNewtypeDeriving
, LambdaCase
, MultiParamTypeClasses
, NumericUnderscores
, OverloadedStrings
, RankNTypes
, RecordWildCards
, ScopedTypeVariables
, TupleSections
, TypeFamilies
, UndecidableInstances

library
import: shared
exposed-modules: Log.Backend.ElasticSearch
Log.Backend.ElasticSearch.Internal
Log.Backend.ElasticSearch.Lens
Expand All @@ -38,10 +57,12 @@ library
base64-bytestring,
bytestring,
deepseq,
exceptions,
http-client,
http-types,
log-base >= 0.10 && <0.13,
network-uri,
retry,
semigroups,
text,
text-show >= 2,
Expand All @@ -52,25 +73,35 @@ library
vector
hs-source-dirs: src

ghc-options: -Wall

default-language: Haskell2010
default-extensions: BangPatterns
, FlexibleContexts
, FlexibleInstances
, GeneralizedNewtypeDeriving
, LambdaCase
, MultiParamTypeClasses
, OverloadedStrings
, RankNTypes
, RecordWildCards
, ScopedTypeVariables
, TupleSections
, TypeFamilies
, UndecidableInstances

if flag(openssl)
cpp-options: -DOPENSSL
build-depends: http-client-openssl >= 0.3.2
else
build-depends: http-client-tls >= 0.3.6.3

test-suite log-elasticsearch-tests
import: shared
type: exitcode-stdio-1.0
main-is: Driver.hs
hs-source-dirs: tests
ghc-options: -threaded -rtsopts
build-depends:
, base
, aeson
, containers
, log-base
, log-elasticsearch
, hedgehog
, http-client
, http-types
, tasty
, tasty-hedgehog
, tasty-hunit
, tasty-discover
, text
, time
, uuid
build-tool-depends:
tasty-discover:tasty-discover
other-modules:
ElasticSearchTest
56 changes: 27 additions & 29 deletions log-elasticsearch/src/Log/Backend/ElasticSearch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ module Log.Backend.ElasticSearch
) where

import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Exception (SomeAsyncException)
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Unlift
import Control.Retry
import Data.Aeson
import Data.Aeson.Encode.Pretty
import Data.IORef
Expand Down Expand Up @@ -62,20 +63,15 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do
mkBulkLogger "ElasticSearch" (\msgs -> do
now <- getCurrentTime
oldIndex <- readIORef indexRef
retryOnException versionRef $ do
retryTransientExceptions versionRef $ do
-- We need to consider version of ES because ES >= 5.0.0 and ES >= 7.0.0
-- have slight differences in parts of API used for logging.
version <- readIORef versionRef >>= \case
Just version -> pure version
Nothing -> serverInfo env >>= \case
Left (ex :: HttpException) -> error
$ "elasticSearchLogger: unexpected error: "
<> show ex
<> " (is ElasticSearch server running?)"
Left (ex :: HttpException) -> throwM $ ElasticSearchCouldNotConnectToServerError ex
Right reply -> case parseEsVersion $ responseBody reply of
Nothing -> error
$ "elasticSearchLogger: invalid response when parsing version number: "
<> show reply
Nothing -> throwM $ ElasticSearchCouldNotParseVersion reply
Just version -> pure version
-- Elasticsearch index names are additionally indexed by date so that each
-- day is logged to a separate index to make log management easier.
Expand Down Expand Up @@ -115,16 +111,16 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do
Nothing -> newData
modifyData _ v = v

keyAddValueTypeSuffix k v acc = AC.insert
(case v of
keyAddValueTypeSuffix k v = flip AC.insert (modifyData Nothing v) $
case v of
Object{} -> k <> "_object"
Array{} -> k <> "_array"
String{} -> k <> "_string"
Number{} -> k <> "_number"
Bool{} -> k <> "_bool"
Null{} -> k <> "_null"
) (modifyData Nothing v) acc
in adjustFailedMessagesWith modifyData jsonMsgs responses

-- Attempt to put modified messages.
newReply <- responseBody <$> bulkIndex version env esConf index newMsgs
case checkForBulkErrors newMsgs newReply of
Expand All @@ -140,7 +136,7 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do
in adjustFailedMessagesWith modifyData newMsgs newResponses
-- Ignore any further errors.
void $ bulkIndex version env esConf index newerMsgs)
(refreshIndex env =<< readIORef indexRef)
(recoverAll esRetryPolicy . const . refreshIndex env =<< readIORef indexRef)
where
-- Process reply of bulk indexing to get responses for each index operation
-- and check whether any insertion failed.
Expand Down Expand Up @@ -176,17 +172,21 @@ elasticSearchLogger esConf@ElasticSearchConfig{..} = do
printEsError msg body =
T.putStrLn $ "elasticSearchLogger: " <> msg <> " " <> prettyJson body

retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException versionRef m = try m >>= \case
Left (ex::SomeException) -> do
putStrLn $ "ElasticSearch: unexpected error: "
<> show ex <> ", retrying in 10 seconds"
-- If there was an exception, ElasticSearch version might've changed, so
-- reset it.
writeIORef versionRef Nothing
threadDelay $ 10 * 1000000
retryOnException versionRef m
Right result -> return result
retryTransientExceptions :: IORef (Maybe EsVersion) -> IO () -> IO ()
retryTransientExceptions versionRef m =
catches (recovering esRetryPolicy (skipAsyncExceptions <> [const $ Handler $ allSomeExceptions versionRef]) (const m))
[ -- Rethrow async exceptions so this thread doesn't unintentionally get
-- orphaned by the next handler
Handler $ \(e :: SomeAsyncException) -> throwM e
, Handler $ \(e :: SomeException) -> case esRetryFailure of
ElasticSearchThrowLogFailure -> throwM e
ElasticSearchDropLogMessage -> pure ()
]

allSomeExceptions :: IORef (Maybe EsVersion) -> SomeException -> IO Bool
allSomeExceptions versionRef _ = do
writeIORef versionRef Nothing
pure True

prettyJson :: Value -> T.Text
prettyJson = TL.toStrict
Expand All @@ -208,13 +208,11 @@ checkElasticSearchLogin ElasticSearchConfig{..} = liftIO $ do
when (isJust esLogin
&& not esLoginInsecure
&& not ("https:" `T.isPrefixOf` esServer)) $
error $ "ElasticSearch: insecure login: "
<> "Attempting to send login credentials over an insecure connection. "
<> "Set esLoginInsecure = True to disable this check."
throwM ElasticSearchInsecureLogin

-- | Check that we can connect to the ES server.
--
-- @since 0.10.0.0
checkElasticSearchConnection :: MonadIO m => ElasticSearchConfig -> m (Either HttpException ())
checkElasticSearchConnection esConf = liftIO $ do
fmap (const ()) <$> (serverInfo =<< mkEsEnv esConf)
void <$> (serverInfo =<< mkEsEnv esConf)
51 changes: 50 additions & 1 deletion log-elasticsearch/src/Log/Backend/ElasticSearch/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE DeriveGeneric #-}
module Log.Backend.ElasticSearch.Internal
( ElasticSearchConfig(..)
, ElasticSearchLogFailure (..)
, defaultElasticSearchConfig
-- * ES version
, EsVersion(..)
Expand All @@ -19,9 +20,14 @@ module Log.Backend.ElasticSearch.Internal
, dispatch
, decodeReply
, isSuccess
-- * ES error cases
, ElasticSearchCouldNotConnectToServerError (..)
, ElasticSearchCouldNotParseVersion (..)
, ElasticSearchInsecureLogin (..)
) where

import Control.Exception
import Control.Retry
import Control.Monad
import Data.Aeson
import Data.Ix (inRange)
Expand Down Expand Up @@ -62,7 +68,20 @@ data ElasticSearchConfig = ElasticSearchConfig
-- ^ Elasticsearch basic authentication username and password.
, esLoginInsecure :: !Bool
-- ^ Allow basic authentication over non-TLS connections.
} deriving (Eq, Show, Generic)
, esRetryPolicy :: RetryPolicyM IO
-- ^ Allow basic authentication over non-TLS connections.
--
-- @since 0.xx.x.x
, esRetryFailure :: ElasticSearchLogFailure
-- ^ Allow specifying what should happen with log messages that fail to be
-- sent off to elastic.
--
-- @since 0.xx.x.x
} deriving Generic

-- | Indicates what to do when the logging action fails beyond the retry policy.
-- We either drop the log message (recommended), or we rethrow the exception.
data ElasticSearchLogFailure = ElasticSearchDropLogMessage | ElasticSearchThrowLogFailure

-- | Sensible defaults for 'ElasticSearchConfig'.
defaultElasticSearchConfig :: ElasticSearchConfig
Expand All @@ -74,6 +93,10 @@ defaultElasticSearchConfig = ElasticSearchConfig
, esMapping = "log"
, esLogin = Nothing
, esLoginInsecure = False
-- Will retry upto 3 times with exponential backoff and jitter.
-- So approx: sum [0.5 * (2 ** i) / 2 | i <- [1..3]] ~ 3.5s
, esRetryPolicy = limitRetries 3 <> fullJitterBackoff 500_000 -- 0.5 Secs
, esRetryFailure = ElasticSearchDropLogMessage
}

----------------------------------------
Expand Down Expand Up @@ -247,3 +270,29 @@ isSuccess = statusCheck (inRange (200, 299))
where
statusCheck :: (Int -> Bool) -> Response a -> Bool
statusCheck p = p . statusCode . responseStatus

----------------------------------------

newtype ElasticSearchCouldNotConnectToServerError = ElasticSearchCouldNotConnectToServerError HttpException
deriving Show

instance Exception ElasticSearchCouldNotConnectToServerError where
displayException (ElasticSearchCouldNotConnectToServerError ex) = "elasticSearchLogger: unexpected error: "
<> show ex
<> " (is ElasticSearch server running?)"

data ElasticSearchInsecureLogin = ElasticSearchInsecureLogin
deriving Show

instance Exception ElasticSearchInsecureLogin where
displayException ElasticSearchInsecureLogin = "ElasticSearch: insecure login: "
<> "Attempting to send login credentials over an insecure connection. "
<> "Set esLoginInsecure = True to disable this check."

newtype ElasticSearchCouldNotParseVersion = ElasticSearchCouldNotParseVersion (Response Value)
deriving Show

instance Exception ElasticSearchCouldNotParseVersion where
displayException (ElasticSearchCouldNotParseVersion reply) =
"elasticSearchLogger: invalid response when parsing version number: "
<> show reply
1 change: 1 addition & 0 deletions log-elasticsearch/tests/Driver.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{-# OPTIONS_GHC -F -pgmF tasty-discover #-}
Loading