11package com .avsystem .commons
22package redis .actor
33
4- import akka .actor .{Actor , ActorRef }
4+ import akka .actor .{Actor , ActorRef , Cancellable }
55import akka .stream .scaladsl ._
66import akka .stream .{CompletionStrategy , IgnoreComplete , Materializer , SystemMaterializer }
77import akka .util .ByteString
@@ -92,6 +92,18 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
9292 }
9393 }
9494
95+ /**
96+ * Without separate "reconnecting" state we would not be able to drop old messages
97+ * (especially Connected message if arrived after ConnectionClosed because of some race conditions).
98+ * Without this state, we would process old ones instead of dropping them and creating a new connection.
99+ */
100+ private def reconnecting (retryStrategy : RetryStrategy ): Receive = {
101+ case Connect =>
102+ become(connecting(retryStrategy, Opt .Empty ))
103+ doConnect()
104+ case _ : TcpEvent => // Ignore old messages
105+ }
106+
95107 private def connecting (retryStrategy : RetryStrategy , readInitSender : Opt [ActorRef ]): Receive = {
96108 case open : Open =>
97109 onOpen(open)
@@ -107,8 +119,8 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
107119 // TODO: use dedicated retry strategy for initialization instead of reconnection strategy
108120 new ConnectedTo (connection, localAddress, remoteAddress).initialize(config.reconnectionStrategy)
109121 readInitSender.foreach(_ ! ReadAck )
110- case _ : ConnectionFailed =>
111- log.error(s " Connection attempt to Redis at $address failed " )
122+ case _ : ConnectionFailed | _ : ConnectionClosed =>
123+ log.error(s " Connection attempt to Redis at $address failed or was closed " )
112124 tryReconnect(retryStrategy, new ConnectionFailedException (address))
113125 case Close (cause, stopSelf) =>
114126 close(cause, stopSelf, tcpConnecting = true )
@@ -178,7 +190,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
178190 if (delay > Duration .Zero ) {
179191 log.info(s " Next reconnection attempt to $address in $delay" )
180192 }
181- become(connecting (nextStrategy, Opt . Empty ))
193+ become(reconnecting (nextStrategy))
182194 system.scheduler.scheduleOnce(delay, self, Connect )
183195 case Opt .Empty =>
184196 close(failureCause, stopSelf = false )
@@ -214,29 +226,33 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
214226 // Make sure that at least PING is sent so that LOADING errors are detected
215227 val initBatch = config.initCommands *> RedisApi .Batches .StringTyped .ping
216228 val initBuffer = ByteBuffer .allocate(initBatch.rawCommandPacks.encodedSize)
229+ // schedule a Cancellable RetryInit in case we do not receive a response for our request
230+ val scheduledRetry = system.scheduler.scheduleOnce(config.initTimeout, self, RetryInit (retryStrategy.next))
217231 new ReplyCollector (initBatch.rawCommandPacks, initBuffer, onInitResult(_, retryStrategy))
218232 .sendEmptyReplyOr { collector =>
219233 flip(initBuffer)
220234 val data = ByteString (initBuffer)
221235 logWrite(data)
222236 connection ! data
223- become(initializing(collector, retryStrategy))
237+ become(initializing(collector, retryStrategy, scheduledRetry ))
224238 }
225239 }
226240
227- def initializing (collector : ReplyCollector , retryStrategy : RetryStrategy ): Receive = {
241+ def initializing (collector : ReplyCollector , retryStrategy : RetryStrategy , scheduledRetry : Cancellable ): Receive = {
228242 case open : Open =>
229243 onOpen(open)
230244 case IncomingPacks (packs) =>
231245 handlePacks(packs)
232246 case Release if reservedBy.contains(sender()) =>
233247 handleRelease()
234248 case cc : ConnectionClosed =>
249+ scheduledRetry.cancel()
235250 onConnectionClosed(cc)
236- tryReconnect(config.reconnectionStrategy , new ConnectionClosedException (address, cc.error))
251+ tryReconnect(retryStrategy , new ConnectionClosedException (address, cc.error))
237252 case WriteAck =>
238253 case data : ByteString =>
239254 logReceived(data)
255+ scheduledRetry.cancel()
240256 try decoder.decodeMore(data)(collector.processMessage(_, this )) catch {
241257 case NonFatal (cause) =>
242258 // TODO: is there a possibility to NOT receive WriteAck up to this point? currently assuming that no
@@ -247,8 +263,10 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
247263 case ReadInit =>
248264 sender() ! ReadAck
249265 case RetryInit (newStrategy) =>
266+ scheduledRetry.cancel()
250267 initialize(newStrategy)
251268 case Close (cause, stop) =>
269+ scheduledRetry.cancel()
252270 close(cause, stop)
253271 }
254272
0 commit comments