Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ captures
!*.xcodeproj/project.xcworkspace/
!*.xcworkspace/contents.xcworkspacedata
**/xcshareddata/WorkspaceSettings.xcsettings
fastlane/report.xml
fastlane/report.xml
.specstory/
.cursorindexingignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package it.unibo.collektive.echo.network.mqtt

import java.net.Inet4Address
import java.net.InetAddress

internal actual fun resolveIpv4Candidates(host: String): List<String> = runCatching {
InetAddress.getAllByName(host)
.filterIsInstance<Inet4Address>()
.mapNotNull { it.hostAddress }
.distinct()
}.getOrElse { emptyList() }
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.unibo.collektive.echo.network.mqtt

/**
* Resolves all IPv4 addresses for [host] on the current platform.
*
* Returns an empty list when resolution fails or when the platform has no IPv4 result.
*/
internal expect fun resolveIpv4Candidates(host: String): List<String>
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,7 @@ class MqttMailbox private constructor(
*/
@OptIn(ExperimentalUnsignedTypes::class)
private fun initializeMqttClient() {
mqttClient = MQTTClient(
MQTTVersion.MQTT3_1_1,
host,
port,
webSocket = WEBSOCKET_ENDPOINT,
tls = null,
) { message ->
handleIncomingMessage(message.topicName, message.payload?.toByteArray())
}
mqttClient = createMqttClientWithFallback()

log.i { "Connected to the broker" }

Expand Down Expand Up @@ -161,16 +153,8 @@ class MqttMailbox private constructor(
}
delay(RECONNECT_DELAY)
try {
// Recreate the MQTT client from scratch
mqttClient = MQTTClient(
MQTTVersion.MQTT3_1_1,
host,
port,
webSocket = WEBSOCKET_ENDPOINT,
tls = null,
) { message ->
handleIncomingMessage(message.topicName, message.payload?.toByteArray())
}
// Recreate the MQTT client from scratch.
mqttClient = createMqttClientWithFallback()
mqttClient?.subscribe(
listOf(
Subscription(HEARTBEAT_WILD_CARD, SubscriptionOptions(Qos.AT_MOST_ONCE)),
Expand All @@ -188,6 +172,38 @@ class MqttMailbox private constructor(
}
}

@OptIn(ExperimentalUnsignedTypes::class)
private fun createMqttClientWithFallback(): MQTTClient {
val resolvedIpv4Hosts = resolveIpv4Candidates(host)
if (resolvedIpv4Hosts.isNotEmpty()) {
log.i { "Resolved IPv4 candidates for '$host': $resolvedIpv4Hosts" }
}
val hostsToTry = listOf(host) + resolvedIpv4Hosts
var lastError: Throwable? = null
return hostsToTry.asSequence().distinct().map { candidateHost ->
runCatching {
MQTTClient(
MQTTVersion.MQTT3_1_1,
candidateHost,
port,
webSocket = WEBSOCKET_ENDPOINT,
tls = null,
) { message ->
handleIncomingMessage(message.topicName, message.payload?.toByteArray())
}.also {
if (candidateHost != host) {
log.w { "Connected via IPv4 fallback host: $candidateHost" }
}
}
}.onFailure {
log.w { "Connection failure to $candidateHost" }
lastError = it
}
}.firstOrNull { it.isSuccess }
?.getOrThrow()
?: error("Cannot instance MQTT client for any host. Last error:\n${lastError?.stackTraceToString()}")
}

/**
* Sends a heartbeat pulse with the current device location.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fun Screen(
val sendingCounter by viewModel.sendingCounterFlow.collectAsState()
val discoveredDevices by viewModel.dataFlow.collectAsState()
val currentLocation by viewModel.currentLocationFlow.collectAsState()
val connectionErrorMessage by viewModel.connectionErrorMessageFlow.collectAsState()

// Auto-scroll when new messages arrive
LaunchedEffect(messages.size) {
Expand All @@ -103,6 +104,7 @@ fun Screen(
sendingCounter = sendingCounter,
discoveredDevicesCount = discoveredDevices.size,
currentLocation = currentLocation,
connectionErrorMessage = connectionErrorMessage,
)

Spacer(Modifier.padding(4.dp))
Expand Down Expand Up @@ -250,6 +252,7 @@ fun ConnectionStatusCard(
sendingCounter: Int = 0,
discoveredDevicesCount: Int = 0,
currentLocation: it.unibo.collektive.echo.location.Location? = null,
connectionErrorMessage: String? = null,
) {
val stateColor = connectionColor(connection)

Expand Down Expand Up @@ -287,6 +290,14 @@ fun ConnectionStatusCard(
color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f),
)

if (!connectionErrorMessage.isNullOrBlank()) {
Text(
text = connectionErrorMessage,
style = MaterialTheme.typography.labelSmall,
color = DisconnectedColor,
)
}

// GPS Status indicator
currentLocation?.let { location ->
Text(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class NearbyDevicesViewModel(
/** Flow of the current MQTT connection state. */
val connectionFlow: StateFlow<ConnectionState> = _connectionFlow.asStateFlow()

private val _connectionErrorMessageFlow = MutableStateFlow<String?>(null)

/** Flow of latest human-readable connection error to show in UI. */
val connectionErrorMessageFlow: StateFlow<String?> = _connectionErrorMessageFlow.asStateFlow()

private val _messagesFlow = MutableStateFlow<List<ChatMessage>>(emptyList())

/** Flow of received and sent chat messages. */
Expand Down Expand Up @@ -116,6 +121,7 @@ class NearbyDevicesViewModel(
companion object {
/** Interval in milliseconds between GPS availability checks. */
private const val GPS_POLL_INTERVAL_MS = 500L
private const val MQTT_RETRY_INTERVAL_MS = 3000L

/** Mean Earth radius in metres, used by the Haversine formula. */
private const val EARTH_RADIUS_METERS = 6371000.0
Expand Down Expand Up @@ -222,44 +228,59 @@ class NearbyDevicesViewModel(
log.i { "Waiting for GPS location before starting Collektive program..." }
delay(GPS_POLL_INTERVAL_MS) // Check periodically
}

_connectionFlow.value = ConnectionState.CONNECTED
val program = collektiveProgram()
log.i { "Collektive program started with GPS location: ${_currentLocationFlow.value}" }
while (isActive) {
val (newDevices, newMessages) = program.cycle()
_dataFlow.value = newDevices

// Update messages, only add new unique messages based on messageId
val currentMessages = _messagesFlow.value.toMutableList()
newMessages.forEach { newMessage ->
// Check if we already have this message (same messageId)
val isDuplicate = currentMessages.any { existing ->
existing.messageId == newMessage.messageId
}

if (!isDuplicate) {
log.i {
"Adding NEW message to UI: '${newMessage.text}' " +
"from ${newMessage.sender} (ID: ${newMessage.messageId})"
try {
val program = collektiveProgram()
_connectionFlow.value = ConnectionState.CONNECTED
_connectionErrorMessageFlow.value = null
log.i { "Collektive program started with GPS location: ${_currentLocationFlow.value}" }

while (isActive) {
val (newDevices, newMessages) = program.cycle()
_dataFlow.value = newDevices

// Update messages, only add new unique messages based on messageId
val currentMessages = _messagesFlow.value.toMutableList()
newMessages.forEach { newMessage ->
// Check if we already have this message (same messageId)
val isDuplicate = currentMessages.any { existing ->
existing.messageId == newMessage.messageId
}

if (!isDuplicate) {
log.i {
"Adding NEW message to UI: '${newMessage.text}' " +
"from ${newMessage.sender} (ID: ${newMessage.messageId})"
}
currentMessages.add(newMessage)
} else {
log.i {
"Skipping duplicate message: '${newMessage.text}' " +
"from ${newMessage.sender} (ID: ${newMessage.messageId})"
}
}
}
currentMessages.add(newMessage)
} else {
log.i {
"Skipping duplicate message: '${newMessage.text}' " +
"from ${newMessage.sender} (ID: ${newMessage.messageId})"
_messagesFlow.value = currentMessages

// Check if current message has expired
val currentTime = Clock.System.now().epochSeconds.toDouble()
if (currentMessage.isNotEmpty() && (currentTime - messageStartTime) > messageLifeTime) {
stopSendingMessage()
}
}
}
_messagesFlow.value = currentMessages

// Check if current message has expired
val currentTime = Clock.System.now().epochSeconds.toDouble()
if (currentMessage.isNotEmpty() && (currentTime - messageStartTime) > messageLifeTime) {
stopSendingMessage()
delay(1.seconds)
}
} catch (e: CancellationException) {
throw e
} catch (
@Suppress("TooGenericExceptionCaught")
e: Exception,
) {
_connectionFlow.value = ConnectionState.DISCONNECTED
_connectionErrorMessageFlow.value = "Connessione al broker fallita. Riprovo..."
log.e { "MQTT connection/setup failed: ${e.message}. Retrying..." }
delay(MQTT_RETRY_INTERVAL_MS)
}

delay(1.seconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package it.unibo.collektive.echo.network.mqtt

import kotlinx.cinterop.ByteVar
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.alloc
import kotlinx.cinterop.allocArray
import kotlinx.cinterop.allocPointerTo
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import kotlinx.cinterop.toKString
import kotlinx.cinterop.value
import platform.posix.AF_INET
import platform.posix.NI_MAXHOST
import platform.posix.NI_NUMERICHOST
import platform.posix.addrinfo
import platform.posix.freeaddrinfo
import platform.posix.getaddrinfo
import platform.posix.getnameinfo

@OptIn(ExperimentalForeignApi::class)
internal actual fun resolveIpv4Candidates(host: String): List<String> = memScoped {
val hints = alloc<addrinfo>().apply {
ai_family = AF_INET
ai_socktype = 0
ai_protocol = 0
ai_flags = 0
ai_addrlen = 0u
ai_addr = null
ai_canonname = null
ai_next = null
}

val result = allocPointerTo<addrinfo>()
val status = getaddrinfo(host, null, hints.ptr, result.ptr)
if (status != 0) {
return@memScoped emptyList()
}

try {
buildList {
var current = result.value
while (current != null) {
val info = current.pointed
val addr = info.ai_addr
if (info.ai_family == AF_INET && addr != null) {
val hostBuffer = allocArray<ByteVar>(NI_MAXHOST)
val callReturnStatus = getnameinfo(
addr,
info.ai_addrlen,
hostBuffer,
NI_MAXHOST.toUInt(),
null,
0u,
NI_NUMERICHOST,
)
if (callReturnStatus == 0) {
add(hostBuffer.toKString())
}
}
current = info.ai_next
}
}.distinct()
} finally {
result.value?.let(::freeaddrinfo)
}
}