diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleRedisCacheImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleRedisCacheImpl.kt index 52efd2a..e50a2c7 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleRedisCacheImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleRedisCacheImpl.kt @@ -16,7 +16,6 @@ import org.redisson.api.RStreamReactive import org.redisson.api.stream.StreamMessageId import reactor.core.Disposable import reactor.core.publisher.Mono -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference import kotlin.time.Duration @@ -174,22 +173,17 @@ class SimpleRedisCacheImpl( return } - val shouldInvalidate = AtomicBoolean(false) - val oldVersion = lastVersion.getAndUpdate { currentVer -> + val prevVersion = lastVersion.getAndUpdate { cur -> when { - currentVer == 0L -> version - version <= currentVer -> currentVer - version == currentVer + 1 -> version - else -> { - shouldInvalidate.set(true) - version - } + cur == 0L -> version + version <= cur -> cur + else -> version } } - if (shouldInvalidate.get()) { + if (prevVersion != 0L && version > prevVersion + 1) { log.atWarning() - .log("Version gap detected in cache '$namespace': last=$oldVersion, new=$version. Clearing near-cache.") + .log("Version gap detected in cache '$namespace': last=$prevVersion, new=$version. Clearing near-cache.") clearNearCacheOnly() return } diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleSetRedisCacheImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleSetRedisCacheImpl.kt index 5100b72..27b070e 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleSetRedisCacheImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/cache/SimpleSetRedisCacheImpl.kt @@ -212,22 +212,17 @@ class SimpleSetRedisCacheImpl( return } - val shouldInvalidate = AtomicBoolean(false) - val oldVersion = lastVersion.getAndUpdate { currentVer -> + val prevVersion = lastVersion.getAndUpdate { cur -> when { - currentVer == 0L -> version - version <= currentVer -> currentVer - version == currentVer + 1 -> version - else -> { - shouldInvalidate.set(true) - version - } + cur == 0L -> version + version <= cur -> cur + else -> version } } - if (shouldInvalidate.get()) { + if (prevVersion != 0L && version > prevVersion + 1) { log.atWarning() - .log("Version gap detected in cache '$namespace': last=$oldVersion, new=$version. Clearing near-cache.") + .log("Version gap detected in cache '$namespace': last=$prevVersion, new=$version. Clearing near-cache.") clearNearCacheOnly() return } diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/list/SyncListImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/list/SyncListImpl.kt index 49cd832..c5ae1cf 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/list/SyncListImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/list/SyncListImpl.kt @@ -167,10 +167,9 @@ class SyncListImpl( override fun overrideFromRemote(raw: SimpleVersionedSnapshot>) { - val elements = raw.value.map(::decodeValue) lock.write { list.clear() - list.addAll(elements) + raw.value.mapTo(list, ::decodeValue) } super.overrideFromRemote(raw) } diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/map/SyncMapImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/map/SyncMapImpl.kt index 3ce1a5e..f8a2f44 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/map/SyncMapImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/map/SyncMapImpl.kt @@ -99,28 +99,26 @@ class SyncMapImpl( } override fun removeIf(predicate: (K, V) -> Boolean): Boolean { - val keysToRemove = lock.write { - val keys = ObjectArrayList() + val removedKeys = ObjectArrayList() + val removedVals = ObjectArrayList() + lock.write { val it = map.object2ObjectEntrySet().fastIterator() while (it.hasNext()) { val e = it.next() - if (predicate(e.key, e.value)) keys.add(e.key) + if (predicate(e.key, e.value)) { + removedKeys.add(e.key) + removedVals.add(e.value) + it.remove() + } } - keys } - if (keysToRemove.isEmpty) return false + if (removedKeys.isEmpty) return false - val removedLocal = ObjectArrayList>(keysToRemove.size) - lock.write { - for (k in keysToRemove) { - val old = map.remove(k) ?: continue - removedLocal.add(k to old) - } + removeManyRemote(removedKeys) + for (i in removedKeys.indices) { + notifyListeners(SyncMapChange.Removed(removedKeys[i], removedVals[i])) } - removeManyRemote(keysToRemove) - removedLocal.forEach { (k, v) -> notifyListeners(SyncMapChange.Removed(k, v)) } - return true } @@ -142,7 +140,10 @@ class SyncMapImpl( ).map { SimpleVersionedSnapshot.fromTuple(it) } override fun overrideFromRemote(raw: SimpleVersionedSnapshot>) { - val decoded = raw.value.map { (k, v) -> decodeKey(k) to decodeValue(v) }.toMap() + val decoded = Object2ObjectOpenHashMap(raw.value.size) + for ((k, v) in raw.value) { + decoded[decodeKey(k)] = decodeValue(v) + } lock.write { map.clear() map.putAll(decoded) diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSetImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSetImpl.kt index 1e19fcd..0e883ad 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSetImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSetImpl.kt @@ -191,10 +191,9 @@ class SyncSetImpl( ).map { SimpleVersionedSnapshot.fromTuple(it) } override fun overrideFromRemote(raw: SimpleVersionedSnapshot>) { - val decoded = raw.value.map(::decodeValue) lock.write { set.clear() - set.addAll(decoded) + raw.value.mapTo(set, ::decodeValue) } super.overrideFromRemote(raw) }