Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.net.InetAddress
import java.net.InetSocketAddress
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlin.time.DurationUnit
import kotlin.time.toDuration
Expand All @@ -46,13 +48,16 @@ class WebSocketConnection :

private var lastFrameTime = System.currentTimeMillis()

private val isDisconnecting = AtomicBoolean(false)

override val coroutineContext: CoroutineContext = Dispatchers.IO + job

override fun connect(endPoint: InetSocketAddress, timeout: Int) {
launch {
logger.debug("Trying connection to ${endPoint.hostName}:${endPoint.port}")

try {
isDisconnecting.set(false)
endpoint = endPoint

client = HttpClient(CIO) {
Expand Down Expand Up @@ -90,6 +95,8 @@ class WebSocketConnection :
is Frame.Text -> logger.debug("Received plain text ${frame.readText()}")
}
}
} catch (e: CancellationException) {
logger.debug("Frame listener was cancelled", e)
} catch (e: Exception) {
logger.error("An error occurred while receiving data", e)
disconnect(false)
Expand All @@ -106,7 +113,13 @@ class WebSocketConnection :
}

override fun disconnect(userInitiated: Boolean) {
if (!isDisconnecting.compareAndSet(false, true)) {
logger.error("Already disconnected or disconnecting")
return
}

logger.debug("Disconnect called: $userInitiated")

launch {
try {
session?.close()
Expand All @@ -123,6 +136,10 @@ class WebSocketConnection :
}

override fun send(data: ByteArray) {
if (isDisconnecting.get()) {
return
}

launch {
try {
val frame = Frame.Binary(true, data)
Expand All @@ -145,30 +162,34 @@ class WebSocketConnection :
*/
private fun startConnectionMonitoring() {
launch {
while (isActive) {
if (client?.isActive == false || session?.isActive == false) {
logger.error("Client or Session is no longer active")
disconnect(userInitiated = false)
}

val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime

// logger.debug("Watchdog status: $timeSinceLastFrame")
when {
timeSinceLastFrame > 30000 -> {
logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam")
try {
while (isActive && !isDisconnecting.get()) {
if (client?.isActive == false || session?.isActive == false) {
logger.error("Client or Session is no longer active")
disconnect(userInitiated = false)
break
}

timeSinceLastFrame > 25000 -> logger.debug("Watchdog: No response for 25 seconds")
val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime

timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds")
// logger.debug("Watchdog status: $timeSinceLastFrame")
when {
timeSinceLastFrame > 30000 -> {
logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam")
disconnect(userInitiated = false)
break
}

timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds")
}
timeSinceLastFrame > 25000 -> logger.debug("Watchdog: No response for 25 seconds")

delay(5000)
timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds")

timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds")
}

delay(5000)
}
} catch (e: CancellationException) {
logger.debug("Watchdog Cancelled.", e)
}
}
}
Expand Down
Loading