@@ -105,15 +105,20 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
105105 case Connect =>
106106 log.debug(s " Connecting to $address" )
107107 doConnect()
108- case Connected (connection, remoteAddress, localAddress) =>
109- log.debug(s " Connected to Redis at $address" )
110- // TODO: use dedicated retry strategy for initialization instead of reconnection strategy
111- new ConnectedTo (connection, localAddress, remoteAddress).initialize(config.reconnectionStrategy)
112- readInitSender.foreach(_ ! ReadAck )
108+ case Connected (connection, remoteAddress, localAddress, connectionId) =>
109+ if (currentConnectionId.forall(_ != connectionId)) {
110+ log.error(s " Received Connected for connection different than currently trying to establish " )
111+ connection ! CloseConnection (immediate = true )
112+ } else {
113+ // TODO: use dedicated retry strategy for initialization instead of reconnection strategy
114+ new ConnectedTo (connection, localAddress, remoteAddress).initialize(retryStrategy)
115+ readInitSender.foreach(_ ! ReadAck )
116+ }
113117 case cf : ConnectionFailure =>
114- if (currentConnectionId.forall(_ != cf.connectionId)){
118+ if (currentConnectionId.forall(_ != cf.connectionId)) {
115119 log.error(s " Received ConnectionFailure for connection different than currently trying to establish " )
116120 } else {
121+ currentConnectionId = Opt .Empty
117122 log.error(s " Connection attempt to Redis at $address failed or was closed " )
118123 tryReconnect(retryStrategy, new ConnectionFailedException (address))
119124 }
@@ -160,7 +165,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
160165 )
161166 }
162167
163- // create connection Id to match connection failures message with
168+ // create connection Id to match connection message with
164169 // corresponding connections. It prevents against handling old failures
165170 // in context of new connections.
166171 val connectionId = random.nextInt()
@@ -177,7 +182,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
177182 val (actorRef, connFuture) = src.viaMat(conn)((_, _)).to(sink).run()
178183 connFuture.onCompleteNow {
179184 case Success (Tcp .OutgoingConnection (remoteAddress, localAddress)) =>
180- self ! Connected (actorRef, remoteAddress, localAddress)
185+ self ! Connected (actorRef, remoteAddress, localAddress, connectionId )
181186 case Failure (cause) =>
182187 self ! ConnectionFailed (cause, connectionId)
183188 }
@@ -219,7 +224,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
219224
220225 def become (receive : Receive ): Unit =
221226 context.become(receive unless {
222- case Connected (oldConnection, _, _) if oldConnection != connection =>
227+ case Connected (oldConnection, _, _, _ ) if oldConnection != connection =>
223228 oldConnection ! CloseConnection (immediate = true )
224229 })
225230
@@ -472,7 +477,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
472477 }
473478
474479 def onConnectionClosed (cc : ConnectionClosed ): Unit = {
475- if (currentConnectionId.forall(_ != cc.connectionId)){
480+ if (currentConnectionId.forall(_ != cc.connectionId)) {
476481 log.error(s " Received ConnectionClosed for connection different than currently handling " )
477482 }
478483 open = false
@@ -527,7 +532,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
527532 case IncomingPacks (packs) =>
528533 packs.reply(PacksResult .Failure (cause))
529534 case Release => // ignore
530- case Connected (connection, _, _) if tcpConnecting =>
535+ case Connected (connection, _, _, _ ) if tcpConnecting =>
531536 // failure may have happened while connecting, simply close the connection
532537 connection ! CloseConnection (immediate = true )
533538 become(closed(cause, tcpConnecting = false ))
@@ -554,7 +559,8 @@ object RedisConnectionActor {
554559 private case class Connected (
555560 connection : ActorRef ,
556561 remoteAddress : InetSocketAddress ,
557- localAddress : InetSocketAddress
562+ localAddress : InetSocketAddress ,
563+ connectionId : Int ,
558564 ) extends TcpEvent
559565
560566 private sealed abstract class ConnectionFailure (val connectionId : Int ) extends TcpEvent
0 commit comments