From 45deb538b3d3b55a88f3bacedc9a2aa702b3ed55 Mon Sep 17 00:00:00 2001 From: Daniele Bortoluzzi Date: Mon, 23 Mar 2026 21:58:54 +0100 Subject: [PATCH 1/5] fix(mqtt): add resilient ipv4 fallback and retry status UI --- .gitignore | 4 +- .../echo/network/mqtt/HostResolver.android.kt | 16 ++++ .../echo/network/mqtt/HostResolver.kt | 8 ++ .../echo/network/mqtt/MqttMailbox.kt | 59 +++++++++---- .../it/unibo/collektive/echo/ui/Screen.kt | 11 +++ .../echo/viewmodels/NearbyDevicesViewModel.kt | 87 ++++++++++++------- .../echo/network/mqtt/HostResolver.ios.kt | 59 +++++++++++++ 7 files changed, 191 insertions(+), 53 deletions(-) create mode 100644 composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt create mode 100644 composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.kt create mode 100644 composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt diff --git a/.gitignore b/.gitignore index 09a2f77..301cde5 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,6 @@ captures !*.xcodeproj/project.xcworkspace/ !*.xcworkspace/contents.xcworkspacedata **/xcshareddata/WorkspaceSettings.xcsettings -fastlane/report.xml \ No newline at end of file +fastlane/report.xml +.specstory/ +.cursorindexingignore \ No newline at end of file diff --git a/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt b/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt new file mode 100644 index 0000000..8684a7a --- /dev/null +++ b/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt @@ -0,0 +1,16 @@ +package it.unibo.collektive.echo.network.mqtt + +import java.net.Inet4Address +import java.net.InetAddress + +internal actual fun resolveIpv4Candidates(host: String): List = try { + InetAddress.getAllByName(host) + .filterIsInstance() + .map { it.hostAddress } + .distinct() +} catch ( + @Suppress("TooGenericExceptionCaught") + _: Exception, +) { + emptyList() +} diff --git a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.kt b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.kt new file mode 100644 index 0000000..250f222 --- /dev/null +++ b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.kt @@ -0,0 +1,8 @@ +package it.unibo.collektive.echo.network.mqtt + +/** + * Resolves all IPv4 addresses for [host] on the current platform. + * + * Returns an empty list when resolution fails or when the platform has no IPv4 result. + */ +internal expect fun resolveIpv4Candidates(host: String): List diff --git a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt index 8f048d6..80875db 100644 --- a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt +++ b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt @@ -112,15 +112,7 @@ class MqttMailbox private constructor( */ @OptIn(ExperimentalUnsignedTypes::class) private fun initializeMqttClient() { - mqttClient = MQTTClient( - MQTTVersion.MQTT3_1_1, - host, - port, - webSocket = WEBSOCKET_ENDPOINT, - tls = null, - ) { message -> - handleIncomingMessage(message.topicName, message.payload?.toByteArray()) - } + mqttClient = createMqttClientWithFallback() log.i { "Connected to the broker" } @@ -161,16 +153,8 @@ class MqttMailbox private constructor( } delay(RECONNECT_DELAY) try { - // Recreate the MQTT client from scratch - mqttClient = MQTTClient( - MQTTVersion.MQTT3_1_1, - host, - port, - webSocket = WEBSOCKET_ENDPOINT, - tls = null, - ) { message -> - handleIncomingMessage(message.topicName, message.payload?.toByteArray()) - } + // Recreate the MQTT client from scratch. + mqttClient = createMqttClientWithFallback() mqttClient?.subscribe( listOf( Subscription(HEARTBEAT_WILD_CARD, SubscriptionOptions(Qos.AT_MOST_ONCE)), @@ -188,6 +172,43 @@ class MqttMailbox private constructor( } } + @OptIn(ExperimentalUnsignedTypes::class) + private fun createMqttClientWithFallback(): MQTTClient { + val resolvedIpv4Hosts = resolveIpv4Candidates(host) + if (resolvedIpv4Hosts.isNotEmpty()) { + log.i { "Resolved IPv4 candidates for '$host': $resolvedIpv4Hosts" } + } + val hostsToTry = listOf(host) + resolvedIpv4Hosts + var lastError: Exception? = null + hostsToTry.distinct().forEach { candidateHost -> + try { + return MQTTClient( + MQTTVersion.MQTT3_1_1, + candidateHost, + port, + webSocket = WEBSOCKET_ENDPOINT, + tls = null, + ) { message -> + handleIncomingMessage(message.topicName, message.payload?.toByteArray()) + }.also { + if (candidateHost != host) { + log.w { "Connected via IPv4 fallback host: $candidateHost" } + } + } + } catch ( + @Suppress("TooGenericExceptionCaught") + e: Exception, + ) { + lastError = e + log.w { "MQTT connection attempt failed for host '$candidateHost': ${e.message}" } + } + } + throw IllegalStateException( + "Failed to create MQTT client for all configured hosts", + checkNotNull(lastError), + ) + } + /** * Sends a heartbeat pulse with the current device location. */ diff --git a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/ui/Screen.kt b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/ui/Screen.kt index cbaa027..0a9a5cb 100644 --- a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/ui/Screen.kt +++ b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/ui/Screen.kt @@ -83,6 +83,7 @@ fun Screen( val sendingCounter by viewModel.sendingCounterFlow.collectAsState() val discoveredDevices by viewModel.dataFlow.collectAsState() val currentLocation by viewModel.currentLocationFlow.collectAsState() + val connectionErrorMessage by viewModel.connectionErrorMessageFlow.collectAsState() // Auto-scroll when new messages arrive LaunchedEffect(messages.size) { @@ -103,6 +104,7 @@ fun Screen( sendingCounter = sendingCounter, discoveredDevicesCount = discoveredDevices.size, currentLocation = currentLocation, + connectionErrorMessage = connectionErrorMessage, ) Spacer(Modifier.padding(4.dp)) @@ -250,6 +252,7 @@ fun ConnectionStatusCard( sendingCounter: Int = 0, discoveredDevicesCount: Int = 0, currentLocation: it.unibo.collektive.echo.location.Location? = null, + connectionErrorMessage: String? = null, ) { val stateColor = connectionColor(connection) @@ -287,6 +290,14 @@ fun ConnectionStatusCard( color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), ) + if (!connectionErrorMessage.isNullOrBlank()) { + Text( + text = connectionErrorMessage, + style = MaterialTheme.typography.labelSmall, + color = DisconnectedColor, + ) + } + // GPS Status indicator currentLocation?.let { location -> Text( diff --git a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/viewmodels/NearbyDevicesViewModel.kt b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/viewmodels/NearbyDevicesViewModel.kt index 835190e..2fcadc9 100644 --- a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/viewmodels/NearbyDevicesViewModel.kt +++ b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/viewmodels/NearbyDevicesViewModel.kt @@ -64,6 +64,11 @@ class NearbyDevicesViewModel( /** Flow of the current MQTT connection state. */ val connectionFlow: StateFlow = _connectionFlow.asStateFlow() + private val _connectionErrorMessageFlow = MutableStateFlow(null) + + /** Flow of latest human-readable connection error to show in UI. */ + val connectionErrorMessageFlow: StateFlow = _connectionErrorMessageFlow.asStateFlow() + private val _messagesFlow = MutableStateFlow>(emptyList()) /** Flow of received and sent chat messages. */ @@ -116,6 +121,7 @@ class NearbyDevicesViewModel( companion object { /** Interval in milliseconds between GPS availability checks. */ private const val GPS_POLL_INTERVAL_MS = 500L + private const val MQTT_RETRY_INTERVAL_MS = 3000L /** Mean Earth radius in metres, used by the Haversine formula. */ private const val EARTH_RADIUS_METERS = 6371000.0 @@ -222,44 +228,59 @@ class NearbyDevicesViewModel( log.i { "Waiting for GPS location before starting Collektive program..." } delay(GPS_POLL_INTERVAL_MS) // Check periodically } - - _connectionFlow.value = ConnectionState.CONNECTED - val program = collektiveProgram() - log.i { "Collektive program started with GPS location: ${_currentLocationFlow.value}" } while (isActive) { - val (newDevices, newMessages) = program.cycle() - _dataFlow.value = newDevices - - // Update messages, only add new unique messages based on messageId - val currentMessages = _messagesFlow.value.toMutableList() - newMessages.forEach { newMessage -> - // Check if we already have this message (same messageId) - val isDuplicate = currentMessages.any { existing -> - existing.messageId == newMessage.messageId - } - - if (!isDuplicate) { - log.i { - "Adding NEW message to UI: '${newMessage.text}' " + - "from ${newMessage.sender} (ID: ${newMessage.messageId})" + try { + val program = collektiveProgram() + _connectionFlow.value = ConnectionState.CONNECTED + _connectionErrorMessageFlow.value = null + log.i { "Collektive program started with GPS location: ${_currentLocationFlow.value}" } + + while (isActive) { + val (newDevices, newMessages) = program.cycle() + _dataFlow.value = newDevices + + // Update messages, only add new unique messages based on messageId + val currentMessages = _messagesFlow.value.toMutableList() + newMessages.forEach { newMessage -> + // Check if we already have this message (same messageId) + val isDuplicate = currentMessages.any { existing -> + existing.messageId == newMessage.messageId + } + + if (!isDuplicate) { + log.i { + "Adding NEW message to UI: '${newMessage.text}' " + + "from ${newMessage.sender} (ID: ${newMessage.messageId})" + } + currentMessages.add(newMessage) + } else { + log.i { + "Skipping duplicate message: '${newMessage.text}' " + + "from ${newMessage.sender} (ID: ${newMessage.messageId})" + } + } } - currentMessages.add(newMessage) - } else { - log.i { - "Skipping duplicate message: '${newMessage.text}' " + - "from ${newMessage.sender} (ID: ${newMessage.messageId})" + _messagesFlow.value = currentMessages + + // Check if current message has expired + val currentTime = Clock.System.now().epochSeconds.toDouble() + if (currentMessage.isNotEmpty() && (currentTime - messageStartTime) > messageLifeTime) { + stopSendingMessage() } - } - } - _messagesFlow.value = currentMessages - // Check if current message has expired - val currentTime = Clock.System.now().epochSeconds.toDouble() - if (currentMessage.isNotEmpty() && (currentTime - messageStartTime) > messageLifeTime) { - stopSendingMessage() + delay(1.seconds) + } + } catch (e: CancellationException) { + throw e + } catch ( + @Suppress("TooGenericExceptionCaught") + e: Exception, + ) { + _connectionFlow.value = ConnectionState.DISCONNECTED + _connectionErrorMessageFlow.value = "Connessione al broker fallita. Riprovo..." + log.e { "MQTT connection/setup failed: ${e.message}. Retrying..." } + delay(MQTT_RETRY_INTERVAL_MS) } - - delay(1.seconds) } } } diff --git a/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt b/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt new file mode 100644 index 0000000..ec0c67c --- /dev/null +++ b/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt @@ -0,0 +1,59 @@ +package it.unibo.collektive.echo.network.mqtt + +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.alloc +import kotlinx.cinterop.memScoped +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.toKString +import platform.posix.AF_INET +import platform.posix.INET_ADDRSTRLEN +import platform.posix.addrinfo +import platform.posix.freeaddrinfo +import platform.posix.getaddrinfo +import platform.posix.inet_ntop +import platform.posix.sockaddr_in + +internal actual fun resolveIpv4Candidates(host: String): List = memScoped { + val hints = alloc().apply { + ai_family = AF_INET + ai_socktype = 0 + ai_protocol = 0 + ai_flags = 0 + ai_addrlen = 0u + ai_addr = null + ai_canonname = null + ai_next = null + } + + val resultHolder = alloc?>() + val status = getaddrinfo(host, null, hints.ptr, resultHolder.ptr) + if (status != 0) { + return@memScoped emptyList() + } + + val ipv4Addresses = mutableListOf() + var current = resultHolder.value + while (current != null) { + val info = current.pointed + if (info.ai_family == AF_INET && info.ai_addr != null) { + val sockaddr = info.ai_addr!!.reinterpret().pointed + val buffer = alloc(INET_ADDRSTRLEN) + val converted = inet_ntop( + AF_INET, + sockaddr.sin_addr.ptr, + buffer.ptr, + INET_ADDRSTRLEN.toUInt(), + ) + if (converted != null) { + ipv4Addresses += buffer.toKString() + } + } + current = info.ai_next + } + + freeaddrinfo(resultHolder.value) + ipv4Addresses.distinct() +} From d8c12f891b2880dfba60b738e9aacd72c60b32e1 Mon Sep 17 00:00:00 2001 From: Danilo Pianini Date: Tue, 24 Mar 2026 16:57:31 +0100 Subject: [PATCH 2/5] fix(ios): use `getnameinfo` for IPv4 address resolution --- .../echo/network/mqtt/HostResolver.ios.kt | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt b/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt index ec0c67c..e50581e 100644 --- a/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt +++ b/composeApp/src/iosMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.ios.kt @@ -1,21 +1,24 @@ package it.unibo.collektive.echo.network.mqtt -import kotlinx.cinterop.CPointer import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.allocPointerTo import kotlinx.cinterop.memScoped import kotlinx.cinterop.pointed import kotlinx.cinterop.ptr -import kotlinx.cinterop.reinterpret import kotlinx.cinterop.toKString +import kotlinx.cinterop.value import platform.posix.AF_INET -import platform.posix.INET_ADDRSTRLEN +import platform.posix.NI_MAXHOST +import platform.posix.NI_NUMERICHOST import platform.posix.addrinfo import platform.posix.freeaddrinfo import platform.posix.getaddrinfo -import platform.posix.inet_ntop -import platform.posix.sockaddr_in +import platform.posix.getnameinfo +@OptIn(ExperimentalForeignApi::class) internal actual fun resolveIpv4Candidates(host: String): List = memScoped { val hints = alloc().apply { ai_family = AF_INET @@ -28,32 +31,37 @@ internal actual fun resolveIpv4Candidates(host: String): List = memScope ai_next = null } - val resultHolder = alloc?>() - val status = getaddrinfo(host, null, hints.ptr, resultHolder.ptr) + val result = allocPointerTo() + val status = getaddrinfo(host, null, hints.ptr, result.ptr) if (status != 0) { return@memScoped emptyList() } - val ipv4Addresses = mutableListOf() - var current = resultHolder.value - while (current != null) { - val info = current.pointed - if (info.ai_family == AF_INET && info.ai_addr != null) { - val sockaddr = info.ai_addr!!.reinterpret().pointed - val buffer = alloc(INET_ADDRSTRLEN) - val converted = inet_ntop( - AF_INET, - sockaddr.sin_addr.ptr, - buffer.ptr, - INET_ADDRSTRLEN.toUInt(), - ) - if (converted != null) { - ipv4Addresses += buffer.toKString() + try { + buildList { + var current = result.value + while (current != null) { + val info = current.pointed + val addr = info.ai_addr + if (info.ai_family == AF_INET && addr != null) { + val hostBuffer = allocArray(NI_MAXHOST) + val callReturnStatus = getnameinfo( + addr, + info.ai_addrlen, + hostBuffer, + NI_MAXHOST.toUInt(), + null, + 0u, + NI_NUMERICHOST, + ) + if (callReturnStatus == 0) { + add(hostBuffer.toKString()) + } + } + current = info.ai_next } - } - current = info.ai_next + }.distinct() + } finally { + result.value?.let(::freeaddrinfo) } - - freeaddrinfo(resultHolder.value) - ipv4Addresses.distinct() } From 3dc09cae8801aa94d61a6e68a5e1d3959b248b04 Mon Sep 17 00:00:00 2001 From: Danilo Pianini Date: Tue, 24 Mar 2026 17:02:05 +0100 Subject: [PATCH 3/5] style(android): simplify `resolveIpv4Candidates` using `runCatching` --- .../echo/network/mqtt/HostResolver.android.kt | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt b/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt index 8684a7a..019919c 100644 --- a/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt +++ b/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt @@ -3,14 +3,10 @@ package it.unibo.collektive.echo.network.mqtt import java.net.Inet4Address import java.net.InetAddress -internal actual fun resolveIpv4Candidates(host: String): List = try { - InetAddress.getAllByName(host) - .filterIsInstance() - .map { it.hostAddress } - .distinct() -} catch ( - @Suppress("TooGenericExceptionCaught") - _: Exception, -) { - emptyList() -} +internal actual fun resolveIpv4Candidates(host: String): List = + runCatching { + InetAddress.getAllByName(host) + .filterIsInstance() + .mapNotNull { it.hostAddress } + .distinct() + }.getOrElse { emptyList() } From 384ab2b86270a4dbf7f8239e26ba8cad49bf557d Mon Sep 17 00:00:00 2001 From: Danilo Pianini Date: Tue, 24 Mar 2026 17:10:25 +0100 Subject: [PATCH 4/5] refactor(mqtt): use sequence and runCatching for host connection attempts --- .../echo/network/mqtt/MqttMailbox.kt | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt index 80875db..5bddb6b 100644 --- a/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt +++ b/composeApp/src/commonMain/kotlin/it/unibo/collektive/echo/network/mqtt/MqttMailbox.kt @@ -179,10 +179,10 @@ class MqttMailbox private constructor( log.i { "Resolved IPv4 candidates for '$host': $resolvedIpv4Hosts" } } val hostsToTry = listOf(host) + resolvedIpv4Hosts - var lastError: Exception? = null - hostsToTry.distinct().forEach { candidateHost -> - try { - return MQTTClient( + var lastError: Throwable? = null + return hostsToTry.asSequence().distinct().map { candidateHost -> + runCatching { + MQTTClient( MQTTVersion.MQTT3_1_1, candidateHost, port, @@ -195,18 +195,13 @@ class MqttMailbox private constructor( log.w { "Connected via IPv4 fallback host: $candidateHost" } } } - } catch ( - @Suppress("TooGenericExceptionCaught") - e: Exception, - ) { - lastError = e - log.w { "MQTT connection attempt failed for host '$candidateHost': ${e.message}" } + }.onFailure { + log.w { "Connection failure to $candidateHost" } + lastError = it } - } - throw IllegalStateException( - "Failed to create MQTT client for all configured hosts", - checkNotNull(lastError), - ) + }.firstOrNull { it.isSuccess } + ?.getOrThrow() + ?: error("Cannot instance MQTT client for any host. Last error:\n${lastError?.stackTraceToString()}") } /** From 8b2fbcafe635db574b15dbd316c1594efeb36248 Mon Sep 17 00:00:00 2001 From: Danilo Pianini Date: Tue, 24 Mar 2026 19:22:47 +0100 Subject: [PATCH 5/5] style(android): reformat `resolveIpv4Candidates` in `HostResolver` --- .../echo/network/mqtt/HostResolver.android.kt | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt b/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt index 019919c..bec71b9 100644 --- a/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt +++ b/composeApp/src/androidMain/kotlin/it/unibo/collektive/echo/network/mqtt/HostResolver.android.kt @@ -3,10 +3,9 @@ package it.unibo.collektive.echo.network.mqtt import java.net.Inet4Address import java.net.InetAddress -internal actual fun resolveIpv4Candidates(host: String): List = - runCatching { - InetAddress.getAllByName(host) - .filterIsInstance() - .mapNotNull { it.hostAddress } - .distinct() - }.getOrElse { emptyList() } +internal actual fun resolveIpv4Candidates(host: String): List = runCatching { + InetAddress.getAllByName(host) + .filterIsInstance() + .mapNotNull { it.hostAddress } + .distinct() +}.getOrElse { emptyList() }