@@ -18,6 +18,7 @@ import java.nio.{Buffer, ByteBuffer}
1818import scala .annotation .tailrec
1919import scala .collection .mutable
2020import scala .concurrent .duration .Duration
21+ import scala .util .Random
2122
2223final class RedisConnectionActor (address : NodeAddress , config : ConnectionConfig )
2324 extends Actor with ActorLazyLogging { actor =>
@@ -35,13 +36,15 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
3536 }
3637 }
3738
39+ private val random = new Random
3840 private var mustInitiallyConnect : Boolean = _
3941 private var initPromise : Promise [Unit ] = _
4042
4143 // indicates how many times connection was restarted
4244 private var incarnation = 0
4345 private var reservedBy = Opt .empty[ActorRef ]
4446 private var reservationIncarnation = Opt .empty[Int ]
47+ private var currentConnectionId = Opt .empty[Int ]
4548
4649 private val queuedToReserve = new JArrayDeque [QueuedPacks ]
4750 private val queuedToWrite = new JArrayDeque [QueuedPacks ]
@@ -92,18 +95,6 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
9295 }
9396 }
9497
95- /**
96- * Without separate "reconnecting" state we would not be able to drop old messages
97- * (especially Connected message if arrived after ConnectionClosed because of some race conditions).
98- * Without this state, we would process old ones instead of dropping them and creating a new connection.
99- */
100- private def reconnecting (retryStrategy : RetryStrategy ): Receive = {
101- case Connect =>
102- become(connecting(retryStrategy, Opt .Empty ))
103- doConnect()
104- case _ : TcpEvent => // Ignore old messages
105- }
106-
10798 private def connecting (retryStrategy : RetryStrategy , readInitSender : Opt [ActorRef ]): Receive = {
10899 case open : Open =>
109100 onOpen(open)
@@ -119,9 +110,13 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
119110 // TODO: use dedicated retry strategy for initialization instead of reconnection strategy
120111 new ConnectedTo (connection, localAddress, remoteAddress).initialize(config.reconnectionStrategy)
121112 readInitSender.foreach(_ ! ReadAck )
122- case _ : ConnectionFailed | _ : ConnectionClosed =>
123- log.error(s " Connection attempt to Redis at $address failed or was closed " )
124- tryReconnect(retryStrategy, new ConnectionFailedException (address))
113+ case cf : ConnectionFailure =>
114+ if (currentConnectionId.forall(_ != cf.connectionId)){
115+ log.error(s " Received ConnectionFailure for connection different than currently trying to establish " )
116+ } else {
117+ log.error(s " Connection attempt to Redis at $address failed or was closed " )
118+ tryReconnect(retryStrategy, new ConnectionFailedException (address))
119+ }
125120 case Close (cause, stopSelf) =>
126121 close(cause, stopSelf, tcpConnecting = true )
127122 case ReadInit =>
@@ -165,20 +160,26 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
165160 )
166161 }
167162
163+ // create connection Id to match connection failures message with
164+ // corresponding connections. It prevents against handling old failures
165+ // in context of new connections.
166+ val connectionId = random.nextInt()
167+ currentConnectionId = connectionId.opt
168+
168169 val sink = Sink .actorRefWithBackpressure(
169170 ref = self,
170171 onInitMessage = ReadInit ,
171172 ackMessage = ReadAck ,
172- onCompleteMessage = ConnectionClosed (Opt .Empty ),
173- onFailureMessage = cause => ConnectionClosed (Opt (cause))
173+ onCompleteMessage = ConnectionClosed (Opt .Empty , connectionId ),
174+ onFailureMessage = cause => ConnectionClosed (Opt (cause), connectionId )
174175 )
175176
176177 val (actorRef, connFuture) = src.viaMat(conn)((_, _)).to(sink).run()
177178 connFuture.onCompleteNow {
178179 case Success (Tcp .OutgoingConnection (remoteAddress, localAddress)) =>
179180 self ! Connected (actorRef, remoteAddress, localAddress)
180181 case Failure (cause) =>
181- self ! ConnectionFailed (cause)
182+ self ! ConnectionFailed (cause, connectionId )
182183 }
183184 }
184185
@@ -190,7 +191,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
190191 if (delay > Duration .Zero ) {
191192 log.info(s " Next reconnection attempt to $address in $delay" )
192193 }
193- become(reconnecting (nextStrategy))
194+ become(connecting (nextStrategy, Opt . Empty ))
194195 system.scheduler.scheduleOnce(delay, self, Connect )
195196 case Opt .Empty =>
196197 close(failureCause, stopSelf = false )
@@ -471,6 +472,9 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
471472 }
472473
473474 def onConnectionClosed (cc : ConnectionClosed ): Unit = {
475+ if (currentConnectionId.forall(_ != cc.connectionId)){
476+ log.error(s " Received ConnectionClosed for connection different than currently handling " )
477+ }
474478 open = false
475479 incarnation += 1
476480 subscribed.foreach { receiver =>
@@ -553,8 +557,9 @@ object RedisConnectionActor {
553557 localAddress : InetSocketAddress
554558 ) extends TcpEvent
555559
556- private case class ConnectionFailed (cause : Throwable ) extends TcpEvent
557- private case class ConnectionClosed (error : Opt [Throwable ]) extends TcpEvent
560+ private sealed abstract class ConnectionFailure (val connectionId : Int ) extends TcpEvent
561+ private case class ConnectionFailed (cause : Throwable , id : Int ) extends ConnectionFailure (id)
562+ private case class ConnectionClosed (error : Opt [Throwable ], id : Int ) extends ConnectionFailure (id)
558563
559564 private case class CloseConnection (immediate : Boolean = false ) extends TcpEvent
560565
0 commit comments