Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.redisson.api.RStreamReactive
import org.redisson.api.stream.StreamMessageId
import reactor.core.Disposable
import reactor.core.publisher.Mono
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration
Expand Down Expand Up @@ -174,22 +173,17 @@ class SimpleRedisCacheImpl<K : Any, V : Any>(
return
}

val shouldInvalidate = AtomicBoolean(false)
val oldVersion = lastVersion.getAndUpdate { currentVer ->
val prevVersion = lastVersion.getAndUpdate { cur ->
when {
currentVer == 0L -> version
version <= currentVer -> currentVer
version == currentVer + 1 -> version
else -> {
shouldInvalidate.set(true)
version
}
cur == 0L -> version
version <= cur -> cur
else -> version
}
}

if (shouldInvalidate.get()) {
if (prevVersion != 0L && version > prevVersion + 1) {
log.atWarning()
.log("Version gap detected in cache '$namespace': last=$oldVersion, new=$version. Clearing near-cache.")
.log("Version gap detected in cache '$namespace': last=$prevVersion, new=$version. Clearing near-cache.")
clearNearCacheOnly()
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,22 +212,17 @@ class SimpleSetRedisCacheImpl<T : Any>(
return
}

val shouldInvalidate = AtomicBoolean(false)
val oldVersion = lastVersion.getAndUpdate { currentVer ->
val prevVersion = lastVersion.getAndUpdate { cur ->
when {
currentVer == 0L -> version
version <= currentVer -> currentVer
version == currentVer + 1 -> version
else -> {
shouldInvalidate.set(true)
version
}
cur == 0L -> version
version <= cur -> cur
else -> version
}
}

if (shouldInvalidate.get()) {
if (prevVersion != 0L && version > prevVersion + 1) {
log.atWarning()
.log("Version gap detected in cache '$namespace': last=$oldVersion, new=$version. Clearing near-cache.")
.log("Version gap detected in cache '$namespace': last=$prevVersion, new=$version. Clearing near-cache.")
clearNearCacheOnly()
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,9 @@ class SyncListImpl<T : Any>(


override fun overrideFromRemote(raw: SimpleVersionedSnapshot<List<String>>) {
val elements = raw.value.map(::decodeValue)
lock.write {
list.clear()
list.addAll(elements)
raw.value.mapTo(list, ::decodeValue)
}
super.overrideFromRemote(raw)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,26 @@ class SyncMapImpl<K : Any, V : Any>(
}

override fun removeIf(predicate: (K, V) -> Boolean): Boolean {
val keysToRemove = lock.write {
val keys = ObjectArrayList<K>()
val removedKeys = ObjectArrayList<K>()
val removedVals = ObjectArrayList<V>()
lock.write {
val it = map.object2ObjectEntrySet().fastIterator()
while (it.hasNext()) {
val e = it.next()
if (predicate(e.key, e.value)) keys.add(e.key)
if (predicate(e.key, e.value)) {
removedKeys.add(e.key)
removedVals.add(e.value)
it.remove()
}
}
keys
}
if (keysToRemove.isEmpty) return false
if (removedKeys.isEmpty) return false

val removedLocal = ObjectArrayList<Pair<K, V>>(keysToRemove.size)
lock.write {
for (k in keysToRemove) {
val old = map.remove(k) ?: continue
removedLocal.add(k to old)
}
removeManyRemote(removedKeys)
for (i in removedKeys.indices) {
notifyListeners(SyncMapChange.Removed(removedKeys[i], removedVals[i]))
}

removeManyRemote(keysToRemove)
removedLocal.forEach { (k, v) -> notifyListeners(SyncMapChange.Removed(k, v)) }

return true
}

Expand All @@ -142,7 +140,10 @@ class SyncMapImpl<K : Any, V : Any>(
).map { SimpleVersionedSnapshot.fromTuple(it) }

override fun overrideFromRemote(raw: SimpleVersionedSnapshot<Map<String, String>>) {
val decoded = raw.value.map { (k, v) -> decodeKey(k) to decodeValue(v) }.toMap()
val decoded = Object2ObjectOpenHashMap<K, V>(raw.value.size)
for ((k, v) in raw.value) {
decoded[decodeKey(k)] = decodeValue(v)
}
lock.write {
map.clear()
map.putAll(decoded)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ class SyncSetImpl<T : Any>(
).map { SimpleVersionedSnapshot.fromTuple(it) }

override fun overrideFromRemote(raw: SimpleVersionedSnapshot<Set<String>>) {
val decoded = raw.value.map(::decodeValue)
lock.write {
set.clear()
set.addAll(decoded)
raw.value.mapTo(set, ::decodeValue)
}
super.overrideFromRemote(raw)
}
Expand Down