Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger
* - Small bounded queues (10 tasks) to prevent memory bloat
* - Reduced context switching overhead
* - Efficient thread management with controlled resource usage
*
* Made public to allow mocking in tests via IOMockHelper.
*/
internal object OneSignalDispatchers {
object OneSignalDispatchers {
// Optimized pool sizes based on CPU cores and workload analysis
private const val IO_CORE_POOL_SIZE = 2 // Increased for better concurrency
private const val IO_MAX_POOL_SIZE = 3 // Increased for better concurrency
Expand All @@ -35,7 +37,7 @@ internal object OneSignalDispatchers {
private const val KEEP_ALIVE_TIME_SECONDS =
30L // Keep threads alive longer to reduce recreation
private const val QUEUE_CAPACITY =
10 // Small queue that allows up to 10 tasks to wait in queue when all threads are busy
200 // Increased to handle more queued operations during init, while still preventing memory bloat
internal const val BASE_THREAD_NAME = "OneSignal" // Base thread name prefix
private const val IO_THREAD_NAME_PREFIX =
"$BASE_THREAD_NAME-IO" // Thread name prefix for I/O operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import com.onesignal.common.modules.IModule
import com.onesignal.common.services.IServiceProvider
import com.onesignal.common.services.ServiceBuilder
import com.onesignal.common.services.ServiceProvider
import com.onesignal.common.threading.CompletionAwaiter
import com.onesignal.common.threading.OneSignalDispatchers
import com.onesignal.common.threading.suspendifyOnIO
import com.onesignal.core.CoreModule
Expand Down Expand Up @@ -39,19 +38,16 @@ import com.onesignal.user.internal.identity.IdentityModelStore
import com.onesignal.user.internal.properties.PropertiesModelStore
import com.onesignal.user.internal.resolveAppId
import com.onesignal.user.internal.subscriptions.SubscriptionModelStore
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout

private const val MAX_TIMEOUT_TO_INIT = 30_000L // 30 seconds

internal class OneSignalImp(
private val ioDispatcher: CoroutineDispatcher = OneSignalDispatchers.IO,
) : IOneSignal, IServiceProvider {
@Volatile
private var initAwaiter = CompletionAwaiter("OneSignalImp")

private val suspendCompletion = CompletableDeferred<Unit>()

@Volatile
private var initState: InitState = InitState.NOT_STARTED
Expand Down Expand Up @@ -263,7 +259,6 @@ internal class OneSignalImp(
suspendifyOnIO {
internalInit(context, appId)
}
initState = InitState.SUCCESS
return true
}

Expand Down Expand Up @@ -306,22 +301,16 @@ internal class OneSignalImp(
) {
Logging.log(LogLevel.DEBUG, "Calling deprecated login(externalId: $externalId, jwtBearerToken: $jwtBearerToken)")

if (!initState.isSDKAccessible()) {
throw IllegalStateException("Must call 'initWithContext' before 'login'")
}
waitForInit(operationName = "login")

waitForInit()
suspendifyOnIO { loginHelper.login(externalId, jwtBearerToken) }
}

override fun logout() {
Logging.log(LogLevel.DEBUG, "Calling deprecated logout()")

if (!initState.isSDKAccessible()) {
throw IllegalStateException("Must call 'initWithContext' before 'logout'")
}
waitForInit(operationName = "logout")

waitForInit()
suspendifyOnIO { logoutHelper.logout() }
}

Expand All @@ -333,34 +322,82 @@ internal class OneSignalImp(

override fun <T> getAllServices(c: Class<T>): List<T> = services.getAllServices(c)

private fun waitForInit() {
val completed = initAwaiter.await()
if (!completed) {
throw IllegalStateException("initWithContext was not called or timed out")
/**
* Blocking version that waits for initialization to complete.
* Uses runBlocking to bridge to the suspend implementation.
* Waits indefinitely until init completes and logs how long it took.
*
* @param operationName Optional operation name to include in error messages (e.g., "login", "logout")
*/
private fun waitForInit(operationName: String? = null) {
runBlocking(ioDispatcher) {
waitUntilInitInternal(operationName)
}
}

/**
* Notifies both blocking and suspend callers that initialization is complete
*/
private fun notifyInitComplete() {
initAwaiter.complete()
suspendCompletion.complete(Unit)
}

private suspend fun suspendUntilInit() {
/**
* Suspend version that waits for initialization to complete.
* Waits indefinitely until init completes and logs how long it took.
*
* @param operationName Optional operation name to include in error messages (e.g., "login", "logout")
*/
private suspend fun suspendUntilInit(operationName: String? = null) {
waitUntilInitInternal(operationName)
}

/**
* Common implementation for waiting until initialization completes.
* Waits indefinitely until init completes (SUCCESS or FAILED) to ensure consistent state.
* Logs how long initialization took when it completes.
*
* @param operationName Optional operation name to include in error messages (e.g., "login", "logout")
*/
private suspend fun waitUntilInitInternal(operationName: String? = null) {
when (initState) {
InitState.NOT_STARTED -> {
throw IllegalStateException("Must call 'initWithContext' before use")
val message = if (operationName != null) {
"Must call 'initWithContext' before '$operationName'"
} else {
"Must call 'initWithContext' before use"
}
throw IllegalStateException(message)
}
InitState.IN_PROGRESS -> {
Logging.debug("Suspend waiting for init to complete...")
try {
withTimeout(MAX_TIMEOUT_TO_INIT) {
initAwaiter.awaitSuspend()
}
} catch (e: TimeoutCancellationException) {
throw IllegalStateException("initWithContext was timed out after $MAX_TIMEOUT_TO_INIT ms")
Logging.debug("Waiting for init to complete...")

val startTime = System.currentTimeMillis()

// Wait indefinitely until init actually completes - ensures consistent state
// Function only returns when initState is SUCCESS or FAILED
// NOTE: This is a suspend function, so it's non-blocking when called from coroutines.
// However, if waitForInit() (which uses runBlocking) is called from the main thread,
// it will block the main thread indefinitely until init completes, which can cause ANRs.
// This is intentional per PR #2412: "ANR is the lesser of two evils and the app can recover,
// where an uncaught throw it can not." To avoid ANRs, call SDK methods from background threads
// or use the suspend API from coroutines.
suspendCompletion.await()

// Log how long initialization took
val elapsed = System.currentTimeMillis() - startTime
val message = if (operationName != null) {
"OneSignalImp initialization completed before '$operationName' (took ${elapsed}ms)"
} else {
"OneSignalImp initialization completed (took ${elapsed}ms)"
}
Logging.debug(message)

// Re-check state after waiting - init might have failed during the wait
if (initState == InitState.FAILED) {
throw IllegalStateException("Initialization failed. Cannot proceed.")
}
// initState is guaranteed to be SUCCESS here - consistent state
}
InitState.FAILED -> {
throw IllegalStateException("Initialization failed. Cannot proceed.")
Expand All @@ -377,23 +414,7 @@ internal class OneSignalImp(
}

private fun <T> waitAndReturn(getter: () -> T): T {
when (initState) {
InitState.NOT_STARTED -> {
throw IllegalStateException("Must call 'initWithContext' before use")
}
InitState.IN_PROGRESS -> {
Logging.debug("Waiting for init to complete...")
waitForInit()
}
InitState.FAILED -> {
throw IllegalStateException("Initialization failed. Cannot proceed.")
}
else -> {
// SUCCESS
waitForInit()
}
}

waitForInit()
return getter()
}

Expand All @@ -407,8 +428,9 @@ internal class OneSignalImp(
// because Looper.getMainLooper() is not mocked. This is safe to ignore.
Logging.debug("Could not check main thread status (likely in test environment): ${e.message}")
}
// Call suspendAndReturn directly to avoid nested runBlocking (waitAndReturn -> waitForInit -> runBlocking)
return runBlocking(ioDispatcher) {
waitAndReturn(getter)
suspendAndReturn(getter)
}
}

Expand Down Expand Up @@ -508,7 +530,8 @@ internal class OneSignalImp(
) = withContext(ioDispatcher) {
Logging.log(LogLevel.DEBUG, "login(externalId: $externalId, jwtBearerToken: $jwtBearerToken)")

suspendUntilInit()
suspendUntilInit(operationName = "login")

if (!isInitialized) {
throw IllegalStateException("'initWithContext failed' before 'login'")
}
Expand All @@ -520,7 +543,7 @@ internal class OneSignalImp(
withContext(ioDispatcher) {
Logging.log(LogLevel.DEBUG, "logoutSuspend()")

suspendUntilInit()
suspendUntilInit(operationName = "logout")

if (!isInitialized) {
throw IllegalStateException("'initWithContext failed' before 'logout'")
Expand Down
Loading