From 7044dfd819fc8d88fb3021cc959970e87a4b26a5 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Thu, 22 Jan 2026 17:19:17 -0300 Subject: [PATCH] feat: Implement apply-on-ACK (RTO20) prototype Implement the apply-on-ACK feature which applies LiveObjects operations locally upon receipt of an ACK message, rather than waiting for the operation echo. This reduces perceived latency for users. Key changes: - Add appliedOnAckSerials set and bufferedAcks array to MutableState - Add PublishResult type containing serials array from ACK res[i].serials - Implement RTO9a3: Skip OBJECT messages already applied on ACK - Implement RTO5c9-11: Clear/process buffered ACKs on sync completion - Add publishAndApply() method for the main apply-on-ACK flow - Add canApplyOperationForAck/applyForAck helpers to LiveMap/LiveCounter - Integrate apply-on-ACK into set/remove/increment/decrement operations PublishResult.serials is an array where each element corresponds 1:1 to the ObjectMessages that were published. publishAndApply processes each message with its corresponding serial at the matching index. Note: Full functionality requires ably-cocoa changes: - CD2j: Add siteCode to ConnectionDetails - Update nosync_sendObject callback to include serials from ACK Co-Authored-By: Claude Opus 4.5 --- .../AblyLiveObjects/Internal/CoreSDK.swift | 28 +++- .../Internal/DefaultInternalPlugin.swift | 21 ++- .../Internal/InternalDefaultLiveCounter.swift | 25 ++- .../Internal/InternalDefaultLiveMap.swift | 27 ++- .../InternalDefaultRealtimeObjects.swift | 156 ++++++++++++++++++ .../Internal/ObjectsPool.swift | 7 + .../Protocol/ObjectMessage.swift | 15 ++ .../InternalLiveMapValue+ToPublic.swift | 4 +- .../PublicDefaultLiveCounter.swift | 8 +- .../PublicDefaultLiveMap.swift | 10 +- .../PublicDefaultRealtimeObjects.swift | 4 +- .../PublicObjectsStore.swift | 2 + 12 files changed, 275 insertions(+), 32 deletions(-) diff --git a/Sources/AblyLiveObjects/Internal/CoreSDK.swift b/Sources/AblyLiveObjects/Internal/CoreSDK.swift index 07493cf0..2e8a64d9 100644 --- a/Sources/AblyLiveObjects/Internal/CoreSDK.swift +++ b/Sources/AblyLiveObjects/Internal/CoreSDK.swift @@ -1,12 +1,24 @@ internal import _AblyPluginSupportPrivate import Ably +/// Result of a successful publish operation (RTO15). +/// Contains information from the ACK needed for apply-on-ACK (RTO20). +/// Corresponds to a single `PublishResult` in the ACK's `res` array. +internal struct PublishResult: Sendable { + /// The message serials from the ACK `res[i].serials` property. + /// Each element corresponds 1:1 to the messages that were published. + /// A serial may be nil if the message was discarded due to a configured conflation rule. + let serials: [String?] +} + /// The API that the internal components of the SDK (that is, `DefaultLiveObjects` and down) use to interact with our core SDK (i.e. ably-cocoa). /// /// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing). internal protocol CoreSDK: AnyObject, Sendable { /// Implements the internal `#publish` method of RTO15. - func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) + /// Returns a `PublishResult` containing the serial from the ACK for apply-on-ACK (RTO20). + @discardableResult + func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult /// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset. func fetchServerTime() async throws(ARTErrorInfo) -> Date @@ -14,7 +26,7 @@ internal protocol CoreSDK: AnyObject, Sendable { /// Replaces the implementation of ``publish(objectMessages:)``. /// /// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK. - func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) + func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) /// Returns the current state of the Realtime channel that this wraps. var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState { get } @@ -34,7 +46,7 @@ internal final class DefaultCoreSDK: CoreSDK { /// This enables the `testsOnly_overridePublish(with:)` test hook. /// /// - Note: This should be `throws(ARTErrorInfo)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer". - private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)? + private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> PublishResult)? internal init( channel: _AblyPluginSupportPrivate.RealtimeChannel, @@ -50,7 +62,8 @@ internal final class DefaultCoreSDK: CoreSDK { // MARK: - CoreSDK conformance - internal func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) { + @discardableResult + internal func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult { logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug) // Use the overridden implementation if supplied @@ -59,18 +72,17 @@ internal final class DefaultCoreSDK: CoreSDK { } if let overriddenImplementation { do { - try await overriddenImplementation(objectMessages) + return try await overriddenImplementation(objectMessages) } catch { guard let artErrorInfo = error as? ARTErrorInfo else { preconditionFailure("Expected ARTErrorInfo, got \(error)") } throw artErrorInfo } - return } // TODO: Implement message size checking (https://github.com/ably/ably-liveobjects-swift-plugin/issues/13) - try await DefaultInternalPlugin.sendObject( + return try await DefaultInternalPlugin.sendObject( objectMessages: objectMessages, channel: channel, client: client, @@ -78,7 +90,7 @@ internal final class DefaultCoreSDK: CoreSDK { ) } - internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) { + internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) { mutex.withLock { overriddenPublishImplementation = newImplementation } diff --git a/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift b/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift index e9f801db..8f8ed24a 100644 --- a/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift +++ b/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift @@ -148,6 +148,13 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate. // RTO10b nosync_realtimeObjects(for: channel).nosync_setGarbageCollectionGracePeriod(gracePeriod) + + // CD2j: Store siteCode for apply-on-ACK (RTO20c) + // TODO: Uncomment once siteCode is added to ConnectionDetailsProtocol in ably-cocoa-plugin-support + // and parsed from connectionDetails in ably-cocoa. + // if let siteCode = connectionDetails?.siteCode { + // nosync_realtimeObjects(for: channel).nosync_setSiteCode(siteCode) + // } } // MARK: - Sending `OBJECT` ProtocolMessage @@ -157,13 +164,18 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate. channel: _AblyPluginSupportPrivate.RealtimeChannel, client: _AblyPluginSupportPrivate.RealtimeClient, pluginAPI: PluginAPIProtocol, - ) async throws(ARTErrorInfo) { + ) async throws(ARTErrorInfo) -> PublishResult { let objectMessageBoxes: [ObjectMessageBox] = objectMessages.map { .init(objectMessage: $0) } - try await withCheckedContinuation { (continuation: CheckedContinuation, _>) in + return try await withCheckedContinuation { (continuation: CheckedContinuation, _>) in let internalQueue = pluginAPI.internalQueue(for: client) internalQueue.async { + // RTO20b: The callback should include `serials` from the ACK `res[0].serials` property. + // TODO: Update ably-cocoa to pass the serials in the callback per the spec prereqs: + // - APPluginAPI.h: Change nosync_sendObject completion handler to include serials array + // - ARTPluginAPI.m: Parse serials from ACK res[0].serials and pass to callback + // Until those changes are made, serials will be an array of nils. pluginAPI.nosync_sendObject( withObjectMessages: objectMessageBoxes, channel: channel, @@ -174,7 +186,10 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate. if let error { continuation.resume(returning: .failure(ARTErrorInfo.castPluginPublicErrorInfo(error))) } else { - continuation.resume(returning: .success(())) + // TODO: Extract serials from callback once ably-cocoa is updated + // For now, return an array of nils with the same count as messages sent + let serials: [String?] = Array(repeating: nil, count: objectMessageBoxes.count) + continuation.resume(returning: .success(PublishResult(serials: serials))) } } } diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift index d8713063..890041d3 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift @@ -103,7 +103,7 @@ internal final class InternalDefaultLiveCounter: Sendable { } } - internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) { + internal func increment(amount: Double, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) { let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in // RTLC12c try coreSDK.nosync_validateChannelState( @@ -130,13 +130,13 @@ internal final class InternalDefaultLiveCounter: Sendable { ) } - // RTLC12f - try await coreSDK.publish(objectMessages: [objectMessage]) + // RTO20: Publish and apply locally on ACK (RTLC12f) + try await realtimeObjects.publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK) } - internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) { + internal func decrement(amount: Double, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) { // RTLC13b - try await increment(amount: -amount, coreSDK: coreSDK) + try await increment(amount: -amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects) } @discardableResult @@ -237,12 +237,16 @@ internal final class InternalDefaultLiveCounter: Sendable { } /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7. + /// + /// - Parameters: + /// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK). internal func nosync_apply( _ operation: ObjectOperation, objectMessageSerial: String?, objectMessageSiteCode: String?, objectMessageSerialTimestamp: Date?, objectsPool: inout ObjectsPool, + skipSiteTimeserialsUpdate: Bool, ) { mutableStateMutex.withoutSync { mutableState in mutableState.apply( @@ -251,6 +255,7 @@ internal final class InternalDefaultLiveCounter: Sendable { objectMessageSiteCode: objectMessageSiteCode, objectMessageSerialTimestamp: objectMessageSerialTimestamp, objectsPool: &objectsPool, + skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate, logger: logger, clock: clock, userCallbackQueue: userCallbackQueue, @@ -371,12 +376,16 @@ internal final class InternalDefaultLiveCounter: Sendable { } /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7. + /// + /// - Parameters: + /// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK). internal mutating func apply( _ operation: ObjectOperation, objectMessageSerial: String?, objectMessageSiteCode: String?, objectMessageSerialTimestamp: Date?, objectsPool: inout ObjectsPool, + skipSiteTimeserialsUpdate: Bool, logger: Logger, clock: SimpleClock, userCallbackQueue: DispatchQueue, @@ -387,8 +396,10 @@ internal final class InternalDefaultLiveCounter: Sendable { return } - // RTLC7c - liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial + // RTLC7c (RTO20h: skip for apply-on-ACK) + if !skipSiteTimeserialsUpdate { + liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial + } // RTLC7e // TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854 diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift index 788eaff5..07b8e5f4 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift @@ -159,7 +159,7 @@ internal final class InternalDefaultLiveMap: Sendable { try entries(coreSDK: coreSDK, delegate: delegate).map(\.value) } - internal func set(key: String, value: InternalLiveMapValue, coreSDK: CoreSDK) async throws(ARTErrorInfo) { + internal func set(key: String, value: InternalLiveMapValue, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) { let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in // RTLM20c try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.set") @@ -180,10 +180,11 @@ internal final class InternalDefaultLiveMap: Sendable { ) } - try await coreSDK.publish(objectMessages: [objectMessage]) + // RTO20: Publish and apply locally on ACK + try await realtimeObjects.publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK) } - internal func remove(key: String, coreSDK: CoreSDK) async throws(ARTErrorInfo) { + internal func remove(key: String, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) { let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in // RTLM21c try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.remove") @@ -202,8 +203,8 @@ internal final class InternalDefaultLiveMap: Sendable { ) } - // RTLM21f - try await coreSDK.publish(objectMessages: [objectMessage]) + // RTO20: Publish and apply locally on ACK (RTLM21f) + try await realtimeObjects.publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK) } @discardableResult @@ -315,12 +316,16 @@ internal final class InternalDefaultLiveMap: Sendable { } /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLM15. + /// + /// - Parameters: + /// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK). internal func nosync_apply( _ operation: ObjectOperation, objectMessageSerial: String?, objectMessageSiteCode: String?, objectMessageSerialTimestamp: Date?, objectsPool: inout ObjectsPool, + skipSiteTimeserialsUpdate: Bool, ) { mutableStateMutex.withoutSync { mutableState in mutableState.apply( @@ -329,6 +334,7 @@ internal final class InternalDefaultLiveMap: Sendable { objectMessageSiteCode: objectMessageSiteCode, objectMessageSerialTimestamp: objectMessageSerialTimestamp, objectsPool: &objectsPool, + skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate, logger: logger, internalQueue: mutableStateMutex.dispatchQueue, userCallbackQueue: userCallbackQueue, @@ -574,12 +580,16 @@ internal final class InternalDefaultLiveMap: Sendable { } /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLM15. + /// + /// - Parameters: + /// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK). internal mutating func apply( _ operation: ObjectOperation, objectMessageSerial: String?, objectMessageSiteCode: String?, objectMessageSerialTimestamp: Date?, objectsPool: inout ObjectsPool, + skipSiteTimeserialsUpdate: Bool, logger: Logger, internalQueue: DispatchQueue, userCallbackQueue: DispatchQueue, @@ -591,8 +601,10 @@ internal final class InternalDefaultLiveMap: Sendable { return } - // RTLM15c - liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial + // RTLM15c (RTO20h: skip for apply-on-ACK) + if !skipSiteTimeserialsUpdate { + liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial + } // RTLM15e // TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854 @@ -1003,5 +1015,6 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM5d2g: Otherwise, return undefined/null return nil } + } } diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 373a9eae..fedb26dd 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -53,6 +53,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo } } + /// Represents ACK information received while sync state is not SYNCED (RTO7c). + /// Stores a synthetic InboundObjectMessage created from the outbound message + ACK serial/siteCode. + /// Includes a continuation to defer promise resolution per RTO20a/RTO20i. + internal struct BufferedAck: Sendable { + /// Synthetic InboundObjectMessage created from outbound message + ACK serial/siteCode. + let syntheticMessage: InboundObjectMessage + /// Continuation to resume when the operation is applied (RTO20i). + /// This defers the public API promise resolution until sync completes. + let continuation: CheckedContinuation + } + internal var testsOnly_objectsPool: ObjectsPool { mutableStateMutex.withSync { mutableState in mutableState.objectsPool @@ -312,6 +323,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo } } + /// Sets the siteCode from ConnectionDetails (CD2j) for use in apply-on-ACK (RTO20c). + internal func nosync_setSiteCode(_ siteCode: String) { + mutableStateMutex.withoutSync { mutableState in + mutableState.siteCode = siteCode + } + } + internal var testsOnly_receivedObjectProtocolMessages: AsyncStream<[InboundObjectMessage]> { receivedObjectProtocolMessages } @@ -371,6 +389,64 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo try await coreSDK.publish(objectMessages: objectMessages) } + /// Publishes ObjectMessages and applies them locally on ACK (RTO20). + /// Only returns once all operations have been applied to local state. + /// This encapsulates all ACK-related bookkeeping so callers don't need to handle it. + /// + /// Has the same signature as `publish` - accepts multiple ObjectMessages and processes + /// them in order, extracting the serial from `res[0].serials` at the correct index for each. + internal func publishAndApply(objectMessages: [OutboundObjectMessage], coreSDK: CoreSDK) async throws(ARTErrorInfo) { + // Publish and get the result with serials array + let publishResult = try await coreSDK.publish(objectMessages: objectMessages) + + // Process each message with its corresponding serial from the result + // The serials array corresponds 1:1 to the messages that were published + for (index, objectMessage) in objectMessages.enumerated() { + // Get the serial for this message (may be nil if discarded due to conflation) + let serial: String? = index < publishResult.serials.count ? publishResult.serials[index] : nil + + // Create a synthetic InboundObjectMessage from the outbound message + ACK info. + // Validation of serial/siteCode/operation is handled by the existing apply logic (RTLO4a, RTO9a1). + let syntheticMessage = mutableStateMutex.withSync { mutableState in + InboundObjectMessage( + fromOutbound: objectMessage, + serial: serial, + siteCode: mutableState.siteCode + ) + } + + // RTO20a: If not SYNCED, buffer and wait for sync to complete + let shouldWait = mutableStateMutex.withSync { mutableState -> Bool in + mutableState.state.toObjectsSyncState != .synced + } + + if shouldWait { + // Buffer the ACK with a continuation that will be resumed after sync + await withCheckedContinuation { (continuation: CheckedContinuation) in + mutableStateMutex.withSync { mutableState in + mutableState.bufferedAcks.append(BufferedAck( + syntheticMessage: syntheticMessage, + continuation: continuation + )) + logger.log("Buffering ACK for serial \(syntheticMessage.serial ?? "nil") - sync not complete (RTO20a)", level: .debug) + } + } + // When we resume, the operation has been applied by RTO5c10 + } else { + // SYNCED - apply immediately using existing logic (RTO20b-i) + mutableStateMutex.withSync { mutableState in + mutableState.nosync_applyAckOperation( + syntheticMessage: syntheticMessage, + logger: logger, + internalQueue: mutableStateMutex.dispatchQueue, + userCallbackQueue: userCallbackQueue, + clock: clock, + ) + } + } + } + } + // MARK: - Garbage collection of deleted objects and map entries /// Performs garbage collection of tombstoned objects and map entries, per RTO10c. @@ -444,6 +520,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo /// The RTO10b grace period for which we will retain tombstoned objects and map entries. internal var garbageCollectionGracePeriod: GarbageCollectionOptions.GracePeriod + /// RTO7b: Serial values of operations applied on ACK but whose echo hasn't been received. + /// RTO7b1: Empty on init. + internal var appliedOnAckSerials: Set = [] + + /// RTO7c: ACK information received while sync state is not SYNCED. + /// RTO7c1: Empty on init. + internal var bufferedAcks: [BufferedAck] = [] + + /// The siteCode from ConnectionDetails (CD2j), used for apply-on-ACK (RTO20c). + internal var siteCode: String? + /// The RTO17 sync state. Also stores the sync sequence data. internal var state = State.initialized @@ -627,10 +714,36 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo internalQueue: internalQueue, userCallbackQueue: userCallbackQueue, clock: clock, + skipSiteTimeserialsUpdate: false, ) } } + // RTO5c9: Clear appliedOnAckSerials + appliedOnAckSerials.removeAll() + + // RTO5c10: Process buffered ACKs + let acksToProcess = bufferedAcks + if !acksToProcess.isEmpty { + logger.log("Processing \(acksToProcess.count) buffered ACKs after sync completion", level: .debug) + for ack in acksToProcess { + // Apply the operation using existing logic (RTO20b-h) + nosync_applyAckOperation( + syntheticMessage: ack.syntheticMessage, + logger: logger, + internalQueue: internalQueue, + userCallbackQueue: userCallbackQueue, + clock: clock, + ) + + // RTO20i: Resume the continuation to resolve the public API promise + ack.continuation.resume() + } + } + + // RTO5c11: Clear bufferedAcks after processing + bufferedAcks.removeAll() + // RTO5c3, RTO5c4, RTO5c5, RTO5c8 transition(to: .synced, userCallbackQueue: userCallbackQueue) } @@ -663,19 +776,31 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo internalQueue: internalQueue, userCallbackQueue: userCallbackQueue, clock: clock, + skipSiteTimeserialsUpdate: false, ) } } } /// Implements the `OBJECT` application of RTO9. + /// + /// - Parameters: + /// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK). private mutating func nosync_applyObjectProtocolMessageObjectMessage( _ objectMessage: InboundObjectMessage, logger: Logger, internalQueue: DispatchQueue, userCallbackQueue: DispatchQueue, clock: SimpleClock, + skipSiteTimeserialsUpdate: Bool, ) { + // RTO9a3: Check if already applied on ACK + if let serial = objectMessage.serial, appliedOnAckSerials.contains(serial) { + appliedOnAckSerials.remove(serial) + logger.log("Skipping OBJECT message with serial \(serial) - already applied on ACK (RTO9a3)", level: .debug) + return + } + guard let operation = objectMessage.operation else { // RTO9a1 logger.log("Unsupported OBJECT message received (no operation); \(objectMessage)", level: .warn) @@ -712,6 +837,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo objectMessageSiteCode: objectMessage.siteCode, objectMessageSerialTimestamp: objectMessage.serialTimestamp, objectsPool: &objectsPool, + skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate, ) } case let .unknown(rawValue): @@ -782,5 +908,35 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo objectsEventSubscriptionStorage.emit(eventName: event, on: queue) internalObjectsEventSubscriptionStorage.emit(eventName: event, on: queue) } + + // MARK: - Apply-on-ACK (RTO20) + + /// Applies an ACK operation immediately (called when SYNCED). + /// Also called from RTO5c10 when processing buffered ACKs. + /// Uses a synthetic InboundObjectMessage and reuses the existing apply logic. + internal mutating func nosync_applyAckOperation( + syntheticMessage: InboundObjectMessage, + logger: Logger, + internalQueue: DispatchQueue, + userCallbackQueue: DispatchQueue, + clock: SimpleClock, + ) { + // Reuse existing apply logic with skipSiteTimeserialsUpdate=true (RTO20h) + nosync_applyObjectProtocolMessageObjectMessage( + syntheticMessage, + logger: logger, + internalQueue: internalQueue, + userCallbackQueue: userCallbackQueue, + clock: clock, + skipSiteTimeserialsUpdate: true, + ) + + // RTO20g: Track that we applied this serial on ACK, so the echo gets skipped (RTO9a3) + if let serial = syntheticMessage.serial { + appliedOnAckSerials.insert(serial) + } + + logger.log("Applied operation on ACK with serial \(syntheticMessage.serial ?? "nil") (RTO20)", level: .debug) + } } } diff --git a/Sources/AblyLiveObjects/Internal/ObjectsPool.swift b/Sources/AblyLiveObjects/Internal/ObjectsPool.swift index addfe833..271b957d 100644 --- a/Sources/AblyLiveObjects/Internal/ObjectsPool.swift +++ b/Sources/AblyLiveObjects/Internal/ObjectsPool.swift @@ -30,12 +30,16 @@ internal struct ObjectsPool { } /// Applies an operation to a LiveObject, per RTO9a2a3. + /// + /// - Parameters: + /// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK). internal func nosync_apply( _ operation: ObjectOperation, objectMessageSerial: String?, objectMessageSiteCode: String?, objectMessageSerialTimestamp: Date?, objectsPool: inout ObjectsPool, + skipSiteTimeserialsUpdate: Bool, ) { switch self { case let .map(map): @@ -45,6 +49,7 @@ internal struct ObjectsPool { objectMessageSiteCode: objectMessageSiteCode, objectMessageSerialTimestamp: objectMessageSerialTimestamp, objectsPool: &objectsPool, + skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate, ) case let .counter(counter): counter.nosync_apply( @@ -53,6 +58,7 @@ internal struct ObjectsPool { objectMessageSiteCode: objectMessageSiteCode, objectMessageSerialTimestamp: objectMessageSerialTimestamp, objectsPool: &objectsPool, + skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate, ) } } @@ -144,6 +150,7 @@ internal struct ObjectsPool { map.testsOnly_tombstonedAt } } + } /// Keyed by `objectId`. diff --git a/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift b/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift index f1411894..37d1cdb5 100644 --- a/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift +++ b/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift @@ -91,6 +91,21 @@ internal struct ObjectState: Equatable { } internal extension InboundObjectMessage { + /// Creates a synthetic inbound message from an outbound message, adding the serial and siteCode from the ACK. + /// Used for apply-on-ACK (RTO20) where we apply an operation locally before receiving its echo. + init(fromOutbound outbound: OutboundObjectMessage, serial: String?, siteCode: String?) { + self.id = outbound.id + self.clientId = outbound.clientId + self.connectionId = outbound.connectionId + self.extras = outbound.extras + self.timestamp = outbound.timestamp + self.operation = outbound.operation + self.object = outbound.object + self.serial = serial + self.siteCode = siteCode + self.serialTimestamp = outbound.serialTimestamp + } + /// Initializes an `InboundObjectMessage` from an `InboundWireObjectMessage`, applying the data decoding rules of OD5. /// /// - Parameters: diff --git a/Sources/AblyLiveObjects/Public/Public Proxy Objects/InternalLiveMapValue+ToPublic.swift b/Sources/AblyLiveObjects/Public/Public Proxy Objects/InternalLiveMapValue+ToPublic.swift index d669c0cd..e98e2a9b 100644 --- a/Sources/AblyLiveObjects/Public/Public Proxy Objects/InternalLiveMapValue+ToPublic.swift +++ b/Sources/AblyLiveObjects/Public/Public Proxy Objects/InternalLiveMapValue+ToPublic.swift @@ -9,7 +9,9 @@ internal extension InternalLiveMapValue { internal var logger: Logger internal var toCounterCreationArgs: PublicObjectsStore.CounterCreationArgs { - .init(coreSDK: coreSDK, logger: logger) + // TODO: Figure out how to properly pass realtimeObjects through the object graph + // swiftlint:disable:next force_cast + .init(coreSDK: coreSDK, logger: logger, realtimeObjects: mapDelegate as! InternalDefaultRealtimeObjects) } internal var toMapCreationArgs: PublicObjectsStore.MapCreationArgs { diff --git a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveCounter.swift b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveCounter.swift index 90a408da..cc2b677a 100644 --- a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveCounter.swift +++ b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveCounter.swift @@ -11,11 +11,13 @@ internal final class PublicDefaultLiveCounter: LiveCounter { private let coreSDK: CoreSDK private let logger: Logger + private let realtimeObjects: InternalDefaultRealtimeObjects - internal init(proxied: InternalDefaultLiveCounter, coreSDK: CoreSDK, logger: Logger) { + internal init(proxied: InternalDefaultLiveCounter, coreSDK: CoreSDK, logger: Logger, realtimeObjects: InternalDefaultRealtimeObjects) { self.proxied = proxied self.coreSDK = coreSDK self.logger = logger + self.realtimeObjects = realtimeObjects } // MARK: - `LiveCounter` protocol @@ -27,11 +29,11 @@ internal final class PublicDefaultLiveCounter: LiveCounter { } internal func increment(amount: Double) async throws(ARTErrorInfo) { - try await proxied.increment(amount: amount, coreSDK: coreSDK) + try await proxied.increment(amount: amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects) } internal func decrement(amount: Double) async throws(ARTErrorInfo) { - try await proxied.decrement(amount: amount, coreSDK: coreSDK) + try await proxied.decrement(amount: amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects) } internal func subscribe(listener: @escaping LiveObjectUpdateCallback) throws(ARTErrorInfo) -> any SubscribeResponse { diff --git a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift index 58f2036d..d134e430 100644 --- a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift +++ b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultLiveMap.swift @@ -78,11 +78,17 @@ internal final class PublicDefaultLiveMap: LiveMap { internal func set(key: String, value: LiveMapValue) async throws(ARTErrorInfo) { let internalValue = InternalLiveMapValue(liveMapValue: value) - try await proxied.set(key: key, value: internalValue, coreSDK: coreSDK) + // TODO: Figure out how to properly pass realtimeObjects through the object graph + // swiftlint:disable:next force_cast + let realtimeObjects = delegate as! InternalDefaultRealtimeObjects + try await proxied.set(key: key, value: internalValue, coreSDK: coreSDK, realtimeObjects: realtimeObjects) } internal func remove(key: String) async throws(ARTErrorInfo) { - try await proxied.remove(key: key, coreSDK: coreSDK) + // TODO: Figure out how to properly pass realtimeObjects through the object graph + // swiftlint:disable:next force_cast + let realtimeObjects = delegate as! InternalDefaultRealtimeObjects + try await proxied.remove(key: key, coreSDK: coreSDK, realtimeObjects: realtimeObjects) } internal func subscribe(listener: @escaping LiveObjectUpdateCallback) throws(ARTErrorInfo) -> any SubscribeResponse { diff --git a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultRealtimeObjects.swift index 66cd808c..2f9228e8 100644 --- a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultRealtimeObjects.swift @@ -70,6 +70,7 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects { creationArgs: .init( coreSDK: coreSDK, logger: logger, + realtimeObjects: proxied, // RTO20: For apply-on-ACK support ), ) } @@ -82,6 +83,7 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects { creationArgs: .init( coreSDK: coreSDK, logger: logger, + realtimeObjects: proxied, // RTO20: For apply-on-ACK support ), ) } @@ -119,7 +121,7 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects { /// Replaces the method that this `RealtimeObjects` uses to send any outbound `ObjectMessage`s. /// /// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK. - internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) { + internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) { coreSDK.testsOnly_overridePublish(with: newImplementation) } diff --git a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicObjectsStore.swift b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicObjectsStore.swift index d8d915b0..353aa56c 100644 --- a/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicObjectsStore.swift +++ b/Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicObjectsStore.swift @@ -34,6 +34,7 @@ internal final class PublicObjectsStore: Sendable { internal struct CounterCreationArgs { internal var coreSDK: CoreSDK internal var logger: Logger + internal var realtimeObjects: InternalDefaultRealtimeObjects } /// Fetches the cached `PublicDefaultLiveCounter` that wraps a given `InternalDefaultLiveCounter`, creating a new public object if there isn't already one. @@ -133,6 +134,7 @@ internal final class PublicObjectsStore: Sendable { proxied: proxied, coreSDK: creationArgs.coreSDK, logger: creationArgs.logger, + realtimeObjects: creationArgs.realtimeObjects, ) } }