Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.redhat.devtools.gateway.util.isCancellationException
import com.redhat.devtools.gateway.view.ui.Dialogs
import io.kubernetes.client.openapi.ApiClient
import io.kubernetes.client.openapi.ApiException
import io.kubernetes.client.openapi.models.V1Pod
import kotlinx.coroutines.*
import java.io.Closeable
import java.io.IOException
Expand Down Expand Up @@ -78,7 +80,8 @@

remoteIdeServer = RemoteIDEServer(devSpacesContext)
remoteIdeServerStatus = runCatching {
remoteIdeServer.apply { waitServerReady(checkCancelled) }.getStatus()
remoteIdeServer.waitServerReady(checkCancelled)
remoteIdeServer.fetchStatus(checkCancelled)
}.getOrElse { e ->
if (e.isCancellationException()) throw e
RemoteIDEServerStatus.empty()
Expand Down Expand Up @@ -114,7 +117,12 @@

val pods = DevWorkspacePods(devSpacesContext.client)
val localPort = findFreePort()
forwarder = pods.forward(remoteIdeServer.pod, localPort, 5990)
forwarder = pods.forward(
{ refreshPod(remoteIdeServer) },
localPort,
5990,
RemoteIDEServer.readyTimeout,
)
pods.waitForForwardReady(localPort)

val effectiveJoinLink = joinLink.replace(":5990", ":$localPort")
Expand Down Expand Up @@ -172,7 +180,13 @@
client
} catch (e: Exception) {
runCatching { client?.close() }
onClientClosed(client, onDisconnected, onDevWorkspaceStopped, remoteIdeServer, forwarder)
onClientClosed(
client,
onDisconnected,
onDevWorkspaceStopped,
remoteIdeServer,
forwarder
)
throw e
}
}
Expand Down Expand Up @@ -264,6 +278,27 @@
)
}

private fun refreshPod(remoteIdeServer: RemoteIDEServer?): V1Pod? {
if (remoteIdeServer == null) {
thisLogger().warn("Cannot refresh workspace pod: remote IDE server is not available")
return null
}
return runCatching { remoteIdeServer.refreshPod() }
.onFailure { e -> logPodRefreshFailure(e) }
.getOrNull()
}

private fun logPodRefreshFailure(e: Throwable) {
when {

Check notice on line 292 in src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt

View workflow job for this annotation

GitHub Actions / Inspect code

'when' that can be simplified by introducing an argument

Introduce 'e' as subject of 'when'
e is ApiException ->
thisLogger().warn("Failed to refresh workspace pod: ${e.message}", e)
e is IOException && e.message?.contains("not running", ignoreCase = true) == true ->
thisLogger().info("Workspace pod not ready yet, will retry: ${e.message}")
else ->
thisLogger().info("Failed to refresh workspace pod, will retry: ${e.message}", e)
}
}

private fun findFreePort(): Int {
ServerSocket(0).use { socket ->
socket.reuseAddress = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.kubernetes.client.openapi.apis.CoreV1Api
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.openapi.models.V1PodList
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.*
import java.io.Closeable
import java.io.IOException
Expand All @@ -33,8 +34,7 @@

companion object {
const val WORKSPACE_LABEL_KEY = "controller.devfile.io/devworkspace_name"
private const val CONNECT_ATTEMPTS = 5
private const val RECONNECT_DELAY: Long = 1000
private const val RECONNECT_DELAY: Long = 1
}

private val logger = logger<DevWorkspacePods>()
Expand Down Expand Up @@ -135,13 +135,24 @@
ApiClientUtils.cloneForExec(base)

@Throws(IOException::class)
fun forward(pod: V1Pod, localPort: Int, remotePort: Int): Closeable {
fun forward(
fetchPod: suspend () -> V1Pod?,
localPort: Int,
remotePort: Int,
reconnectTimeoutSeconds: Long,
): Closeable {
val serverSocket = ServerSocket(localPort, 50, InetAddress.getLoopbackAddress())
val scope = CoroutineScope(
// dont cancel if child coroutine fails + use blocking I/O scope
SupervisorJob() + Dispatchers.IO
)
scope.acceptConnections(serverSocket, pod, localPort, remotePort)
scope.acceptConnections(
serverSocket,
fetchPod,
localPort,
remotePort,
reconnectTimeoutSeconds
)
return Closeable {
runCatching { serverSocket.close() }
scope.cancel()
Expand All @@ -150,9 +161,10 @@

private fun CoroutineScope.acceptConnections(
serverSocket: ServerSocket,
pod: V1Pod,
fetchPod: suspend () -> V1Pod?,
localPort: Int,
remotePort: Int
remotePort: Int,
reconnectTimeout: Long,
) {
launch {
logger.info("Starting port forward on local port $localPort...")
Expand All @@ -163,9 +175,10 @@
launch {
handleConnection(
clientSocket,
pod,
fetchPod,
localPort,
remotePort
remotePort,
reconnectTimeout,
)
}
}
Expand All @@ -186,39 +199,52 @@

private suspend fun CoroutineScope.handleConnection(
clientSocket: Socket,
pod: V1Pod,
fetchPod: suspend () -> V1Pod?,
localPort: Int,
remotePort: Int
remotePort: Int,
reconnectTimeout: Long,
) {
try {
repeat(CONNECT_ATTEMPTS) { attempt ->
if (!isActive) return@repeat
withTimeout(reconnectTimeout.seconds) {
while (isActive) {
if (!clientSocket.isConnected
|| clientSocket.isClosed) {
return@withTimeout
}

var forwardResult: PortForward.PortForwardResult? = null
try {
logger.info("Attempt #${attempt + 1}: Connecting $localPort -> $remotePort...")
val portForward = PortForward(client)
forwardResult = portForward.forward(pod, listOf(remotePort))
logger.info("forward successful: $localPort -> $remotePort...")
copyStreams(clientSocket, forwardResult, remotePort)
return
} catch (e: Exception) {
if (e.isCancellationException()) throw e
logger.info(
"Could not port forward $localPort -> $remotePort: ${e.message}. Retrying in ${RECONNECT_DELAY}ms..."
)
if (isActive) {
delay(RECONNECT_DELAY)
val pod = fetchPod()
if (pod == null) {
delay(RECONNECT_DELAY.seconds)
continue
}

var forwardResult: PortForward.PortForwardResult? = null
try {
logger.info(
"Connecting $localPort -> $remotePort to pod ${pod.metadata?.name}..."
)
val portForward = PortForward(client)
forwardResult = portForward.forward(pod, listOf(remotePort))
logger.info("forward successful: $localPort -> $remotePort...")
copyStreams(clientSocket, forwardResult, remotePort)
return@withTimeout
} catch (e: Exception) {
if (e.isCancellationException()) throw e
logger.info("Could not port forward $localPort -> $remotePort: ${e.message}. Retrying in ${RECONNECT_DELAY}s...")
delay(RECONNECT_DELAY.seconds)
} finally {
closeStreams(remotePort, forwardResult)
}
} finally {
closeStreams(remotePort, forwardResult)
}
}
} catch(e: Exception) {
} catch (_: TimeoutCancellationException) {
logger.warn("Could not port forward using port $localPort -> $remotePort within ${reconnectTimeout}s")
} catch (e: Exception) {
if (e.isCancellationException()) throw e
logger.warn(
"Could not port forward to pod ${pod.metadata?.name} using port $localPort -> $remotePort",
e)
"Could not port forward using port $localPort -> $remotePort",
e
)
} finally {
runCatching { clientSocket.close() }
}
Expand Down Expand Up @@ -293,9 +319,24 @@
}

@Throws(ApiException::class)
fun findFirst(namespace: String, labelSelector: String): V1Pod? {

Check warning on line 322 in src/main/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePods.kt

View workflow job for this annotation

GitHub Actions / Inspect code

Unused symbol

Function "findFirst" is never used
val pods = list(namespace, labelSelector)
return pods.items[0]
return pods.items.firstOrNull()
}

@Throws(ApiException::class)
fun findFirstRunning(namespace: String, labelSelector: String): V1Pod? {
val pods = list(namespace, labelSelector)
return pods.items.firstOrNull { isRunningAndReady(it) }
}

private fun isRunningAndReady(pod: V1Pod): Boolean {
if (pod.status?.phase != "Running") {
return false
}
return pod.status?.conditions?.any { condition ->
condition.type == "Ready" && condition.status == "True"
} == true
}

@Throws(ApiException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,43 @@ import kotlin.time.Duration.Companion.seconds
* Represent an IDE server running in a CDE.
*/
class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) {
var pod: V1Pod
private var container: V1Container

private var cachedPod: V1Pod? = null

companion object {
var readyTimeout: Long = 60 // seconds
}

init {
pod = findPod()
container = findContainer()
/**
* Returns the cached workspace pod, or fetches it from the cluster if not yet cached.
*/
@Throws(IOException::class)
fun fetchPod(): V1Pod {
cachedPod?.let { return it }
return refreshPod()
}

/**
* Asks the CDE for the remote IDE server status.
* Re-queries the cluster for the workspace pod and updates the cache.
*/
@Throws(CancellationException::class)
suspend fun getStatus(checkCancelled: (() -> Unit)? = null): RemoteIDEServerStatus =
@Throws(IOException::class)
fun refreshPod(): V1Pod = findPod().also { cachedPod = it }

/**
* Returns the IDE container from the workspace pod.
*/
@Throws(IOException::class)
fun fetchContainer(): V1Container = findContainer(fetchPod())

/**
* Fetches the workspace pod and IDE container, then asks the CDE for the remote IDE server status.
*/
@Throws(CancellationException::class, IOException::class)
suspend fun fetchStatus(checkCancelled: (() -> Unit)? = null): RemoteIDEServerStatus =
withContext(Dispatchers.IO) {
checkCancelled?.invoke()
val pod = fetchPod()
val container = findContainer(pod)
val output = DevWorkspacePods(devSpacesContext.client).exec(
pod = pod,
container = container.name,
Expand Down Expand Up @@ -93,7 +111,7 @@ class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) {
checkCancelled: (() -> Unit)? = null
): Boolean {
return try {
getStatus(checkCancelled).isReady == isReadyState
fetchStatus(checkCancelled).isReady == isReadyState
} catch (e: Exception) {
if (e.isCancellationException()) throw e
thisLogger().debug("Failed to check workspace IDE state.", e)
Expand Down Expand Up @@ -134,21 +152,22 @@ class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) {
false
} ?: false

private fun labelSelector(): String =
"${DevWorkspacePods.WORKSPACE_LABEL_KEY}=${devSpacesContext.devWorkspace.name}"

@Throws(IOException::class)
private fun findPod(): V1Pod {
val selector = "controller.devfile.io/devworkspace_name=${devSpacesContext.devWorkspace.name}"

return DevWorkspacePods(devSpacesContext.client)
.findFirst(
.findFirstRunning(
devSpacesContext.devWorkspace.namespace,
selector
labelSelector()
) ?: throw IOException(
"DevWorkspace '${devSpacesContext.devWorkspace.name}' is not running.",
)
}

@Throws(IOException::class)
private fun findContainer(): V1Container {
private fun findContainer(pod: V1Pod): V1Container {
return pod.spec!!.containers.find { container ->
container.ports?.any { port -> port.name == "idea-server" } != null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class DevSpacesWorkspacesStepView(
val remoteIdeServer = RemoteIDEServer(devSpacesContext)
status = runBlocking {
remoteIdeServer.waitServerReady(checkCancelled)
remoteIdeServer.getStatus()
remoteIdeServer.fetchStatus(checkCancelled)
}
} catch (e: Exception) {
if (e.isCancellationException()) {
Expand Down
Loading
Loading