diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift index d1149f26..fe9fba8a 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift @@ -150,7 +150,12 @@ internal final class InternalDefaultLiveCounter: Sendable { objectMessageSerialTimestamp: Date?, ) -> LiveObjectUpdate { mutex.withLock { - mutableState.replaceData(using: state, objectMessageSerialTimestamp: objectMessageSerialTimestamp) + mutableState.replaceData( + using: state, + objectMessageSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, + ) } } @@ -191,11 +196,28 @@ internal final class InternalDefaultLiveCounter: Sendable { objectMessageSerialTimestamp: objectMessageSerialTimestamp, objectsPool: &objectsPool, logger: logger, + clock: clock, userCallbackQueue: userCallbackQueue, ) } } + // MARK: - LiveObject + + /// Returns the object's RTLO3d `isTombstone` property. + internal var isTombstone: Bool { + mutex.withLock { + mutableState.liveObjectMutableState.isTombstone + } + } + + /// Returns the object's RTLO3e `tombstonedAt` property. + internal var tombstonedAt: Date? { + mutex.withLock { + mutableState.liveObjectMutableState.tombstonedAt + } + } + // MARK: - Mutable state and the operations that affect it private struct MutableState: InternalLiveObject { @@ -212,10 +234,31 @@ internal final class InternalDefaultLiveCounter: Sendable { internal mutating func replaceData( using state: ObjectState, objectMessageSerialTimestamp: Date?, + logger: Logger, + clock: SimpleClock, ) -> LiveObjectUpdate { // RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials liveObjectMutableState.siteTimeserials = state.siteTimeserials + // RTLC6e, RTLC6e1: No-op if we're already tombstone + if liveObjectMutableState.isTombstone { + return .noop + } + + // RTLC6f: Tombstone if state indicates tombstoned + if state.tombstone { + let dataBeforeTombstoning = data + + tombstone( + objectMessageSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, + ) + + // RTLC6f1 + return .update(.init(amount: -dataBeforeTombstoning)) + } + // RTLC6b: Set the private flag createOperationIsMerged to false liveObjectMutableState.createOperationIsMerged = false @@ -259,6 +302,7 @@ internal final class InternalDefaultLiveCounter: Sendable { objectMessageSerialTimestamp: Date?, objectsPool: inout ObjectsPool, logger: Logger, + clock: SimpleClock, userCallbackQueue: DispatchQueue, ) { guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else { @@ -270,6 +314,12 @@ internal final class InternalDefaultLiveCounter: Sendable { // RTLC7c liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial + // RTLC7e + // TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854 + if liveObjectMutableState.isTombstone { + return + } + switch operation.action { case .known(.counterCreate): // RTLC7d1 @@ -284,6 +334,18 @@ internal final class InternalDefaultLiveCounter: Sendable { let update = applyCounterIncOperation(operation.counterOp) // RTLC7d2a liveObjectMutableState.emit(update, on: userCallbackQueue) + case .known(.objectDelete): + let dataBeforeApplyingOperation = data + + // RTLC7d4 + applyObjectDeleteOperation( + objectMessageSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, + ) + + // RTLC7d4a + liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue) default: // RTLC7d3 logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn) @@ -317,5 +379,11 @@ internal final class InternalDefaultLiveCounter: Sendable { data += amount return .update(DefaultLiveCounterUpdate(amount: amount)) } + + /// Needed for ``InternalLiveObject`` conformance. + mutating func resetDataToZeroValued() { + // RTLC4 + data = 0 + } } } diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift index 78033b51..427a09c6 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift @@ -113,6 +113,11 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM5c: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001 try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.get") + // RTLM5e - Return nil if self is tombstone + if isTombstone { + return nil + } + let entry = mutex.withLock { mutableState.data[key] } @@ -126,14 +131,14 @@ internal final class InternalDefaultLiveMap: Sendable { return convertEntryToLiveMapValue(entry, delegate: delegate) } - internal func size(coreSDK: CoreSDK) throws(ARTErrorInfo) -> Int { + internal func size(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> Int { // RTLM10c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001 try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.size") return mutex.withLock { // RTLM10d: Returns the number of non-tombstoned entries (per RTLM14) in the internal data map mutableState.data.values.count { entry in - !Self.isEntryTombstoned(entry) + !Self.isEntryTombstoned(entry, delegate: delegate) } } } @@ -147,7 +152,7 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM11d1: Pairs with tombstoned entries (per RTLM14) are not returned var result: [(key: String, value: InternalLiveMapValue)] = [] - for (key, entry) in mutableState.data where !Self.isEntryTombstoned(entry) { + for (key, entry) in mutableState.data where !Self.isEntryTombstoned(entry, delegate: delegate) { // Convert entry to LiveMapValue using the same logic as get(key:) if let value = convertEntryToLiveMapValue(entry, delegate: delegate) { result.append((key: key, value: value)) @@ -315,11 +320,14 @@ internal final class InternalDefaultLiveMap: Sendable { /// Applies a `MAP_REMOVE` operation to a key, per RTLM8. /// /// This is currently exposed just so that the tests can test RTLM8 without having to go through a convoluted replaceData(…) call, but I _think_ that it's going to be used in further contexts when we introduce the handling of incoming object operations in a future spec PR. - internal func testsOnly_applyMapRemoveOperation(key: String, operationTimeserial: String?) -> LiveObjectUpdate { + internal func testsOnly_applyMapRemoveOperation(key: String, operationTimeserial: String?, operationSerialTimestamp: Date?) -> LiveObjectUpdate { mutex.withLock { mutableState.applyMapRemoveOperation( key: key, operationTimeserial: operationTimeserial, + operationSerialTimestamp: operationSerialTimestamp, + logger: logger, + clock: clock, ) } } @@ -331,6 +339,29 @@ internal final class InternalDefaultLiveMap: Sendable { } } + /// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19. + internal func releaseTombstonedEntries(gracePeriod: TimeInterval, clock: SimpleClock) { + mutex.withLock { + mutableState.releaseTombstonedEntries(gracePeriod: gracePeriod, logger: logger, clock: clock) + } + } + + // MARK: - LiveObject + + /// Returns the object's RTLO3d `isTombstone` property. + internal var isTombstone: Bool { + mutex.withLock { + mutableState.liveObjectMutableState.isTombstone + } + } + + /// Returns the object's RTLO3e `tombstonedAt` property. + internal var tombstonedAt: Date? { + mutex.withLock { + mutableState.liveObjectMutableState.tombstonedAt + } + } + // MARK: - Mutable state and the operations that affect it private struct MutableState: InternalLiveObject { @@ -359,11 +390,48 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials liveObjectMutableState.siteTimeserials = state.siteTimeserials + // RTLM6e, RTLM6e1: No-op if we're already tombstone + if liveObjectMutableState.isTombstone { + return .noop + } + + // RTLM6f: Tombstone if state indicates tombstoned + if state.tombstone { + let dataBeforeTombstoning = data + + tombstone( + objectMessageSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, + ) + + // RTLM6f1 + return .update(.init(update: dataBeforeTombstoning.mapValues { _ in .removed })) + } + // RTLM6b: Set the private flag createOperationIsMerged to false liveObjectMutableState.createOperationIsMerged = false // RTLM6c: Set data to ObjectState.map.entries, or to an empty map if it does not exist - data = state.map?.entries?.mapValues { .init(objectsMapEntry: $0) } ?? [:] + data = state.map?.entries?.mapValues { entry in + // Set tombstonedAt for tombstoned entries + let tombstonedAt: Date? + if entry.tombstone == true { + // RTLM6c1a + if let serialTimestamp = entry.serialTimestamp { + tombstonedAt = serialTimestamp + } else { + // RTLM6c1b + logger.log("serialTimestamp not found in ObjectsMapEntry, using local clock for tombstone timestamp", level: .debug) + // RTLM6cb1 + tombstonedAt = clock.now + } + } else { + tombstonedAt = nil + } + + return .init(objectsMapEntry: entry, tombstonedAt: tombstonedAt) + } ?? [:] // RTLM6d: If ObjectState.createOp is present, merge the initial value into the LiveMap as described in RTLM17 return if let createOp = state.createOp { @@ -393,10 +461,13 @@ internal final class InternalDefaultLiveMap: Sendable { entries.map { key, entry in if entry.tombstone == true { // RTLM17a2: If ObjectsMapEntry.tombstone is true, apply the MAP_REMOVE operation - // as described in RTLM8, passing in the current key as ObjectsMapOp, and ObjectsMapEntry.timeserial as the operation's serial + // as described in RTLM8, passing in the current key as ObjectsMapOp, ObjectsMapEntry.timeserial as the operation's serial, and ObjectsMapEntry.serialTimestamp as the operation's serial timestamp applyMapRemoveOperation( key: key, operationTimeserial: entry.timeserial, + operationSerialTimestamp: entry.serialTimestamp, + logger: logger, + clock: clock, ) } else { // RTLM17a1: If ObjectsMapEntry.tombstone is false, apply the MAP_SET operation @@ -456,6 +527,12 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM15c liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial + // RTLM15e + // TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854 + if liveObjectMutableState.isTombstone { + return + } + switch operation.action { case .known(.mapCreate): // RTLM15d1 @@ -499,9 +576,24 @@ internal final class InternalDefaultLiveMap: Sendable { let update = applyMapRemoveOperation( key: mapOp.key, operationTimeserial: applicableOperation.objectMessageSerial, + operationSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, ) // RTLM15d3a liveObjectMutableState.emit(update, on: userCallbackQueue) + case .known(.objectDelete): + let dataBeforeApplyingOperation = data + + // RTLM15d5 + applyObjectDeleteOperation( + objectMessageSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, + ) + + // RTLM15d5a + liveObjectMutableState.emit(.update(.init(update: dataBeforeApplyingOperation.mapValues { _ in .removed })), on: userCallbackQueue) default: // RTLM15d4 logger.log("Operation \(operation) has unsupported action for LiveMap; discarding", level: .warn) @@ -527,17 +619,17 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM7a2: Otherwise, apply the operation // RTLM7a2a: Set ObjectsMapEntry.data to the ObjectData from the operation // RTLM7a2b: Set ObjectsMapEntry.timeserial to the operation's serial - // RTLM7a2c: Set ObjectsMapEntry.tombstone to false + // RTLM7a2c: Set ObjectsMapEntry.tombstone to false (same as RTLM7a2d: Set ObjectsMapEntry.tombstonedAt to nil) var updatedEntry = existingEntry updatedEntry.data = operationData updatedEntry.timeserial = operationTimeserial - updatedEntry.tombstone = false + updatedEntry.tombstonedAt = nil data[key] = updatedEntry } else { // RTLM7b: If an entry does not exist in the private data for the specified key // RTLM7b1: Create a new entry in data for the specified key with the provided ObjectData and the operation's serial - // RTLM7b2: Set ObjectsMapEntry.tombstone for the new entry to false - data[key] = InternalObjectsMapEntry(tombstone: false, timeserial: operationTimeserial, data: operationData) + // RTLM7b2: Set ObjectsMapEntry.tombstone for the new entry to false (same as RTLM7b3: Set tombstonedAt to nil) + data[key] = InternalObjectsMapEntry(tombstonedAt: nil, timeserial: operationTimeserial, data: operationData) } // RTLM7c: If the operation has a non-empty ObjectData.objectId attribute @@ -551,9 +643,21 @@ internal final class InternalDefaultLiveMap: Sendable { } /// Applies a `MAP_REMOVE` operation to a key, per RTLM8. - internal mutating func applyMapRemoveOperation(key: String, operationTimeserial: String?) -> LiveObjectUpdate { + internal mutating func applyMapRemoveOperation(key: String, operationTimeserial: String?, operationSerialTimestamp: Date?, logger: Logger, clock: SimpleClock) -> LiveObjectUpdate { // (Note that, where the spec tells us to set ObjectsMapEntry.data to nil, we actually set it to an empty ObjectData, which is equivalent, since it contains no data) + // Calculate the tombstonedAt for the new or updated entry per RTLM8f + let tombstonedAt: Date? + if let operationSerialTimestamp { + // RTLM8f1 + tombstonedAt = operationSerialTimestamp + } else { + // RTLM8f2 + logger.log("serialTimestamp not provided for MAP_REMOVE, using local clock for tombstone timestamp", level: .debug) + // RTLM8f2a + tombstonedAt = clock.now + } + // RTLM8a: If an entry exists in the private data for the specified key if let existingEntry = data[key] { // RTLM8a1: If the operation cannot be applied as per RTLM9, discard the operation @@ -563,17 +667,19 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM8a2: Otherwise, apply the operation // RTLM8a2a: Set ObjectsMapEntry.data to undefined/null // RTLM8a2b: Set ObjectsMapEntry.timeserial to the operation's serial - // RTLM8a2c: Set ObjectsMapEntry.tombstone to true + // RTLM8a2c: Set ObjectsMapEntry.tombstone to true (equivalent to next point) + // RTLM8a2d: Set ObjectsMapEntry.tombstonedAt per RTLM8a2d var updatedEntry = existingEntry updatedEntry.data = ObjectData() updatedEntry.timeserial = operationTimeserial - updatedEntry.tombstone = true + updatedEntry.tombstonedAt = tombstonedAt data[key] = updatedEntry } else { // RTLM8b: If an entry does not exist in the private data for the specified key // RTLM8b1: Create a new entry in data for the specified key, with ObjectsMapEntry.data set to undefined/null and the operation's serial // RTLM8b2: Set ObjectsMapEntry.tombstone for the new entry to true - data[key] = InternalObjectsMapEntry(tombstone: true, timeserial: operationTimeserial, data: ObjectData()) + // RTLM8b3: Set ObjectsMapEntry.tombstonedAt per RTLM8f + data[key] = InternalObjectsMapEntry(tombstonedAt: tombstonedAt, timeserial: operationTimeserial, data: ObjectData()) } return .update(DefaultLiveMapUpdate(update: [key: .removed])) @@ -655,14 +761,57 @@ internal final class InternalDefaultLiveMap: Sendable { let mapUpdate = DefaultLiveMapUpdate(update: previousData.mapValues { _ in .removed }) liveObjectMutableState.emit(.update(mapUpdate), on: userCallbackQueue) } + + /// Needed for ``InternalLiveObject`` conformance. + mutating func resetDataToZeroValued() { + // RTLM4 + data = [:] + } + + /// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19. + internal mutating func releaseTombstonedEntries( + gracePeriod: TimeInterval, + logger: Logger, + clock: SimpleClock, + ) { + let now = clock.now + + // RTLM19a, RTLM19a1 + data = data.filter { key, entry in + let shouldRelease = { + guard let tombstonedAt = entry.tombstonedAt else { + return false + } + + return now.timeIntervalSince(tombstonedAt) >= gracePeriod + }() + + if shouldRelease { + logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug) + } + return !shouldRelease + } + } } // MARK: - Helper Methods /// Returns whether a map entry should be considered tombstoned, per the check described in RTLM14. - private static func isEntryTombstoned(_ entry: InternalObjectsMapEntry) -> Bool { - // RTLM14a, RTLM14b - entry.tombstone == true + private static func isEntryTombstoned(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> Bool { + // RTLM14a + if entry.tombstone { + return true + } + + // RTLM14c + if let objectId = entry.data.objectId { + if let poolEntry = delegate.getObjectFromPool(id: objectId), poolEntry.isTombstone { + return true + } + } + + // RTLM14b + return false } /// Converts an InternalObjectsMapEntry to LiveMapValue using the same logic as get(key:) @@ -712,7 +861,12 @@ internal final class InternalDefaultLiveMap: Sendable { return nil } - // RTLM5d2f2: If an object with id objectId exists, return it + // RTLM5d2f3: If referenced object is tombstoned, return nil + if poolEntry.isTombstone { + return nil + } + + // RTLM5d2f2: Return referenced object switch poolEntry { case let .map(map): return .liveMap(map) diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 6d413353..904265f7 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -18,6 +18,26 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool private let receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]> private let receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation + /// The RTO10a interval at which we will perform garbage collection. + private let garbageCollectionInterval: TimeInterval + /// The RTO10b grace period for which we will retain tombstoned objects and map entries. + private nonisolated(unsafe) var garbageCollectionGracePeriod: TimeInterval + // The task that runs the periodic garbage collection described in RTO10. + private nonisolated(unsafe) var garbageCollectionTask: Task! + + /// Parameters used to control the garbage collection of tombstoned objects and map entries, as described in RTO10. + internal struct GarbageCollectionOptions { + /// The RTO10a interval at which we will perform garbage collection. + /// + /// The default value comes from the suggestion in RTO10a. + internal var interval: TimeInterval = 5 * 60 + + /// The initial RTO10b grace period for which we will retain tombstoned objects and map entries. This value may later get overridden by the `gcGracePeriod` of a `CONNECTED` `ProtocolMessage` from Realtime. + /// + /// This default value comes from RTO10b3; can be overridden for testing. + internal var gracePeriod: TimeInterval = 24 * 60 * 60 + } + internal var testsOnly_objectsPool: ObjectsPool { mutex.withLock { mutableState.objectsPool @@ -71,7 +91,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool } } - internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock) { + internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock, garbageCollectionOptions: GarbageCollectionOptions = .init()) { self.logger = logger self.userCallbackQueue = userCallbackQueue self.clock = clock @@ -79,6 +99,30 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool (receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream() (waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream() mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)) + garbageCollectionInterval = garbageCollectionOptions.interval + garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod + + garbageCollectionTask = Task { [weak self, garbageCollectionInterval] in + do { + while true { + logger.log("Will perform garbage collection in \(garbageCollectionInterval)s", level: .debug) + try await Task.sleep(nanoseconds: UInt64(garbageCollectionInterval) * NSEC_PER_SEC) + + guard let self else { + return + } + + performGarbageCollection() + } + } catch { + precondition(error is CancellationError) + logger.log("Garbage collection task terminated due to cancellation", level: .debug) + } + } + } + + deinit { + garbageCollectionTask.cancel() } // MARK: - LiveMapObjectPoolDelegate @@ -210,6 +254,19 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool try await coreSDK.publish(objectMessages: objectMessages) } + // MARK: - Garbage collection of deleted objects and map entries + + /// Performs garbage collection of tombstoned objects and map entries, per RTO10c. + internal func performGarbageCollection() { + mutex.withLock { + mutableState.objectsPool.performGarbageCollection( + gracePeriod: garbageCollectionGracePeriod, + clock: clock, + logger: logger, + ) + } + } + // MARK: - Testing /// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment: diff --git a/Sources/AblyLiveObjects/Internal/InternalLiveObject.swift b/Sources/AblyLiveObjects/Internal/InternalLiveObject.swift index e36dce1c..d80d53b4 100644 --- a/Sources/AblyLiveObjects/Internal/InternalLiveObject.swift +++ b/Sources/AblyLiveObjects/Internal/InternalLiveObject.swift @@ -1,3 +1,5 @@ +internal import AblyPlugin + /// Provides RTLO spec point functionality common to all LiveObjects. /// /// This exists in addition to ``LiveObjectMutableState`` to enable polymorphism. @@ -5,4 +7,44 @@ internal protocol InternalLiveObject { associatedtype Update: Sendable var liveObjectMutableState: LiveObjectMutableState { get set } + + /// Resets the LiveObject's internal data to that of a zero-value, per RTLO4e4. + mutating func resetDataToZeroValued() +} + +internal extension InternalLiveObject { + /// Convenience method for tombstoning a `LiveObject`, as specified in RTLO4e. + mutating func tombstone( + objectMessageSerialTimestamp: Date?, + logger: Logger, + clock: SimpleClock, + ) { + // RTLO4e2, RTLO4e3 + if let objectMessageSerialTimestamp { + // RTLO4e3a + liveObjectMutableState.tombstonedAt = objectMessageSerialTimestamp + } else { + // RTLO4e3b1 + logger.log("serialTimestamp not found in ObjectMessage, using local clock for tombstone timestamp", level: .debug) + // RTLO4e3b + liveObjectMutableState.tombstonedAt = clock.now + } + + // RTLO4e4 + resetDataToZeroValued() + } + + /// Applies an `OBJECT_DELETE` operation, per RTLO5. + mutating func applyObjectDeleteOperation( + objectMessageSerialTimestamp: Date?, + logger: Logger, + clock: SimpleClock, + ) { + // RTLO5b + tombstone( + objectMessageSerialTimestamp: objectMessageSerialTimestamp, + logger: logger, + clock: clock, + ) + } } diff --git a/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift b/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift index 435dec23..da969c3b 100644 --- a/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift +++ b/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift @@ -1,13 +1,20 @@ -/// The entries stored in a `LiveMap`'s data. Same as an `ObjectsMapEntry` but with an additional `tombstonedAt` property, per RTLM3a. (This property will be added in an upcoming commit.) +import Foundation + +/// The entries stored in a `LiveMap`'s data. Same as an `ObjectsMapEntry` but with an additional `tombstonedAt` property, per RTLM3a. internal struct InternalObjectsMapEntry { - internal var tombstone: Bool? // OME2a + internal var tombstonedAt: Date? // RTLM3a + internal var tombstone: Bool { + // TODO: Confirm that we don't need to store this (https://github.com/ably/specification/pull/350/files#r2213895661) + tombstonedAt != nil + } + internal var timeserial: String? // OME2b internal var data: ObjectData // OME2c } internal extension InternalObjectsMapEntry { - init(objectsMapEntry: ObjectsMapEntry) { - tombstone = objectsMapEntry.tombstone + init(objectsMapEntry: ObjectsMapEntry, tombstonedAt: Date?) { + self.tombstonedAt = tombstonedAt timeserial = objectsMapEntry.timeserial data = objectsMapEntry.data } diff --git a/Sources/AblyLiveObjects/Internal/LiveObjectMutableState.swift b/Sources/AblyLiveObjects/Internal/LiveObjectMutableState.swift index f35fc9ba..f3326208 100644 --- a/Sources/AblyLiveObjects/Internal/LiveObjectMutableState.swift +++ b/Sources/AblyLiveObjects/Internal/LiveObjectMutableState.swift @@ -10,6 +10,14 @@ internal struct LiveObjectMutableState { internal var siteTimeserials: [String: String] = [:] // RTLO3c internal var createOperationIsMerged = false + // RTLO3d + internal var isTombstone: Bool { + // TODO: Confirm that we don't need to store this (https://github.com/ably/specification/pull/350/files#r2213895661) + tombstonedAt != nil + } + + // RTLO3e + internal var tombstonedAt: Date? /// Internal bookkeeping for subscriptions. private var subscriptionsByID: [Subscription.ID: Subscription] = [:] @@ -17,9 +25,11 @@ internal struct LiveObjectMutableState { internal init( objectID: String, testsOnly_siteTimeserials siteTimeserials: [String: String]? = nil, + testsOnly_tombstonedAt tombstonedAt: Date? = nil, ) { self.objectID = objectID self.siteTimeserials = siteTimeserials ?? [:] + self.tombstonedAt = tombstonedAt } /// Represents parameters of an operation that `canApplyOperation` has decided can be applied to a `LiveObject`. diff --git a/Sources/AblyLiveObjects/Internal/ObjectsPool.swift b/Sources/AblyLiveObjects/Internal/ObjectsPool.swift index f047e9da..933198a5 100644 --- a/Sources/AblyLiveObjects/Internal/ObjectsPool.swift +++ b/Sources/AblyLiveObjects/Internal/ObjectsPool.swift @@ -104,6 +104,25 @@ internal struct ObjectsPool { ) } } + + /// Returns the object's RTLO3d `isTombstone` property. + internal var isTombstone: Bool { + switch self { + case let .counter(counter): + counter.isTombstone + case let .map(map): + map.isTombstone + } + } + + internal var tombstonedAt: Date? { + switch self { + case let .counter(counter): + counter.tombstonedAt + case let .map(map): + map.tombstonedAt + } + } } /// Keyed by `objectId`. @@ -298,4 +317,32 @@ internal struct ObjectsPool { // TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458. I believe that the answer is that we should just clear its data but the spec point needs to be clearer, see https://github.com/ably/specification/pull/346/files#r2201434895. root.resetData() } + + /// Performs garbage collection of tombstoned objects and map entries, per RTO10c. + internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) { + logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug) + + let now = clock.now + + entries = entries.filter { key, entry in + if case let .map(map) = entry { + // RTO10c1a + map.releaseTombstonedEntries(gracePeriod: gracePeriod, clock: clock) + } + + // RTO10c1b + let shouldRelease = { + guard let tombstonedAt = entry.tombstonedAt else { + return false + } + + return now.timeIntervalSince(tombstonedAt) >= gracePeriod + }() + + if shouldRelease { + logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug) + } + return !shouldRelease + } + } } diff --git a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift index f3a9c9ef..3569a4cc 100644 --- a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift +++ b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift @@ -34,7 +34,7 @@ internal final class PublicDefaultLiveMap: LiveMap { internal var size: Int { get throws(ARTErrorInfo) { - try proxied.size(coreSDK: coreSDK) + try proxied.size(coreSDK: coreSDK, delegate: delegate) } } diff --git a/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift b/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift index dd7cc38d..3d6966c3 100644 --- a/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift +++ b/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift @@ -406,12 +406,12 @@ struct TestFactories { /// /// This should be kept in sync with ``mapEntry``. static func internalMapEntry( - tombstone: Bool? = false, + tombstonedAt: Date? = nil, timeserial: String? = "ts1", data: ObjectData, ) -> InternalObjectsMapEntry { InternalObjectsMapEntry( - tombstone: tombstone, + tombstonedAt: tombstonedAt, timeserial: timeserial, data: data, ) @@ -440,13 +440,13 @@ struct TestFactories { static func internalStringMapEntry( key: String = "testKey", value: String = "testValue", - tombstone: Bool? = false, + tombstonedAt: Date? = nil, timeserial: String? = "ts1", ) -> (key: String, entry: InternalObjectsMapEntry) { ( key: key, entry: internalMapEntry( - tombstone: tombstone, + tombstonedAt: tombstonedAt, timeserial: timeserial, data: ObjectData(string: value), ), diff --git a/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift b/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift index 80e22a98..d68e7ccd 100644 --- a/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift +++ b/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift @@ -39,7 +39,7 @@ struct InternalDefaultLiveMapTests { func returnsNilWhenEntryIsTombstoned() throws { let logger = TestLogger() let entry = TestFactories.internalMapEntry( - tombstone: true, + tombstonedAt: Date(), data: ObjectData(boolean: true), // Value doesn't matter as it's tombstoned ) let coreSDK = MockCoreSDK(channelState: .attaching) @@ -284,7 +284,7 @@ struct InternalDefaultLiveMapTests { // Define actions to test let actions: [(String, () throws -> Any)] = [ - ("size", { try map.size(coreSDK: coreSDK) }), + ("size", { try map.size(coreSDK: coreSDK, delegate: delegate) }), ("entries", { try map.entries(coreSDK: coreSDK, delegate: delegate) }), ("keys", { try map.keys(coreSDK: coreSDK, delegate: delegate) }), ("values", { try map.values(coreSDK: coreSDK, delegate: delegate) }), @@ -317,12 +317,11 @@ struct InternalDefaultLiveMapTests { let delegate = MockLiveMapObjectPoolDelegate() let map = InternalDefaultLiveMap( testsOnly_data: [ - // tombstone is nil, so not considered tombstoned + // tombstonedAt is nil, so not considered tombstoned "active1": TestFactories.internalMapEntry(data: ObjectData(string: "value1")), - // tombstone is false, so not considered tombstoned[ - "active2": TestFactories.internalMapEntry(tombstone: false, data: ObjectData(string: "value2")), - "tombstoned": TestFactories.internalMapEntry(tombstone: true, data: ObjectData(string: "tombstoned")), - "tombstoned2": TestFactories.internalMapEntry(tombstone: true, data: ObjectData(string: "tombstoned2")), + // tombstonedAt is false, so not considered tombstoned + "tombstoned": TestFactories.internalMapEntry(tombstonedAt: Date(), data: ObjectData(string: "tombstoned")), + "tombstoned2": TestFactories.internalMapEntry(tombstonedAt: Date(), data: ObjectData(string: "tombstoned2")), ], objectID: "arbitrary", logger: logger, @@ -331,25 +330,24 @@ struct InternalDefaultLiveMapTests { ) // Test size - should only count non-tombstoned entries - let size = try map.size(coreSDK: coreSDK) - #expect(size == 2) + let size = try map.size(coreSDK: coreSDK, delegate: delegate) + #expect(size == 1) // Test entries - should only return non-tombstoned entries let entries = try map.entries(coreSDK: coreSDK, delegate: delegate) - #expect(entries.count == 2) - #expect(Set(entries.map(\.key)) == ["active1", "active2"]) + #expect(entries.count == 1) + #expect(Set(entries.map(\.key)) == ["active1"]) #expect(entries.first { $0.key == "active1" }?.value.stringValue == "value1") - #expect(entries.first { $0.key == "active2" }?.value.stringValue == "value2") // Test keys - should only return keys from non-tombstoned entries let keys = try map.keys(coreSDK: coreSDK, delegate: delegate) - #expect(keys.count == 2) - #expect(Set(keys) == ["active1", "active2"]) + #expect(keys.count == 1) + #expect(Set(keys) == ["active1"]) // Test values - should only return values from non-tombstoned entries let values = try map.values(coreSDK: coreSDK, delegate: delegate) - #expect(values.count == 2) - #expect(Set(values.compactMap(\.stringValue)) == Set(["value1", "value2"])) + #expect(values.count == 1) + #expect(Set(values.compactMap(\.stringValue)) == Set(["value1"])) } // MARK: - Consistency Tests @@ -374,7 +372,7 @@ struct InternalDefaultLiveMapTests { clock: MockSimpleClock(), ) - let size = try map.size(coreSDK: coreSDK) + let size = try map.size(coreSDK: coreSDK, delegate: delegate) let entries = try map.entries(coreSDK: coreSDK, delegate: delegate) let keys = try map.keys(coreSDK: coreSDK, delegate: delegate) let values = try map.values(coreSDK: coreSDK, delegate: delegate) @@ -424,7 +422,7 @@ struct InternalDefaultLiveMapTests { clock: MockSimpleClock(), ) - let size = try map.size(coreSDK: coreSDK) + let size = try map.size(coreSDK: coreSDK, delegate: delegate) let entries = try map.entries(coreSDK: coreSDK, delegate: delegate) let keys = try map.keys(coreSDK: coreSDK, delegate: delegate) let values = try map.values(coreSDK: coreSDK, delegate: delegate) @@ -507,7 +505,7 @@ struct InternalDefaultLiveMapTests { let delegate = MockLiveMapObjectPoolDelegate() let coreSDK = MockCoreSDK(channelState: .attaching) let map = InternalDefaultLiveMap( - testsOnly_data: ["key1": TestFactories.internalMapEntry(tombstone: true, timeserial: "ts1", data: ObjectData(string: "existing"))], + testsOnly_data: ["key1": TestFactories.internalMapEntry(tombstonedAt: Date(), timeserial: "ts1", data: ObjectData(string: "existing"))], objectID: "arbitrary", logger: logger, userCallbackQueue: .main, @@ -687,7 +685,7 @@ struct InternalDefaultLiveMapTests { ) // Try to apply operation with lower timeserial (ts1 < ts2), cannot be applied per RTLM9 - let update = map.testsOnly_applyMapRemoveOperation(key: "key1", operationTimeserial: "ts1") + let update = map.testsOnly_applyMapRemoveOperation(key: "key1", operationTimeserial: "ts1", operationSerialTimestamp: nil) // Verify the operation was discarded - existing data unchanged #expect(try map.get(key: "key1", coreSDK: coreSDK, delegate: delegate)?.stringValue == "existing") @@ -705,7 +703,7 @@ struct InternalDefaultLiveMapTests { let delegate = MockLiveMapObjectPoolDelegate() let coreSDK = MockCoreSDK(channelState: .attaching) let map = InternalDefaultLiveMap( - testsOnly_data: ["key1": TestFactories.internalMapEntry(tombstone: false, timeserial: "ts1", data: ObjectData(string: "existing"))], + testsOnly_data: ["key1": TestFactories.internalMapEntry(tombstonedAt: nil, timeserial: "ts1", data: ObjectData(string: "existing"))], objectID: "arbitrary", logger: logger, userCallbackQueue: .main, @@ -713,7 +711,7 @@ struct InternalDefaultLiveMapTests { ) // Apply operation with higher timeserial (ts2 > ts1), so can be applied per RTLM9 - let update = map.testsOnly_applyMapRemoveOperation(key: "key1", operationTimeserial: "ts2") + let update = map.testsOnly_applyMapRemoveOperation(key: "key1", operationTimeserial: "ts2", operationSerialTimestamp: nil) // Verify the operation was applied #expect(try map.get(key: "key1", coreSDK: coreSDK, delegate: delegate) == nil) @@ -747,7 +745,7 @@ struct InternalDefaultLiveMapTests { let logger = TestLogger() let map = InternalDefaultLiveMap.createZeroValued(objectID: "arbitrary", logger: logger, userCallbackQueue: .main, clock: MockSimpleClock()) - let update = map.testsOnly_applyMapRemoveOperation(key: "newKey", operationTimeserial: "ts1") + let update = map.testsOnly_applyMapRemoveOperation(key: "newKey", operationTimeserial: "ts1", operationSerialTimestamp: nil) // Verify new entry was created let entry = map.testsOnly_data["newKey"] @@ -769,7 +767,7 @@ struct InternalDefaultLiveMapTests { let logger = TestLogger() let map = InternalDefaultLiveMap.createZeroValued(objectID: "arbitrary", logger: logger, userCallbackQueue: .main, clock: MockSimpleClock()) - _ = map.testsOnly_applyMapRemoveOperation(key: "newKey", operationTimeserial: "ts1") + _ = map.testsOnly_applyMapRemoveOperation(key: "newKey", operationTimeserial: "ts1", operationSerialTimestamp: nil) // Verify tombstone is true for new entry #expect(map.testsOnly_data["newKey"]?.tombstone == true)