From 4d4ee4daf0419b42220d0c9bce1471006c8976ed Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Thu, 15 Jan 2026 10:07:48 +0000 Subject: [PATCH 1/2] refactor: Simplify handling of object sync Grab the buffered ops from the SYNCING associated data, dedupe creation of syncObjectsPoolEntries, and handle changed sync sequence upfront. --- .../InternalDefaultRealtimeObjects.swift | 79 +++++++++---------- 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 86f1757..4a03672 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -540,15 +540,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages) - // If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool. - let completedSyncObjectsPool: [SyncObjectsPoolEntry]? - // If populated, this contains a set of buffered inbound OBJECT messages that should be applied. - let completedSyncBufferedObjectOperations: [InboundObjectMessage]? - // The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC. - let syncSequenceForSyncingState: SyncSequence? - + let syncCursor: SyncCursor? if let protocolMessageChannelSerial { - let syncCursor: SyncCursor do { // RTO5a syncCursor = try SyncCursor(channelSerial: protocolMessageChannelSerial) @@ -556,47 +549,47 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo logger.log("Failed to parse sync cursor: \(error)", level: .error) return } + } else { + syncCursor = nil + } + if case let .syncing(syncingData) = state { // Figure out whether to continue any existing sync sequence or start a new one - var updatedSyncSequence: SyncSequence = if case let .syncing(syncingData) = state, let syncSequence = syncingData.syncSequence { - if syncCursor.sequenceID == syncSequence.id { - // RTO5a3: Continue existing sync sequence - syncSequence - } else { - // RTO5a2a, RTO5a2b: new sequence started, discard previous - .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) - } + let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID + if isNewSyncSequence { + // RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3 + syncingData.syncSequence = nil + } + } + + let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in + if let object = objectMessage.object { + SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp) } else { - // There's no current sync sequence; start one - .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) + nil } + } - // RTO5b - updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap { objectMessage in - if let object = objectMessage.object { - .init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp) - } else { - nil - } - }) + // If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool. + let completedSyncObjectsPool: [SyncObjectsPoolEntry]? + // The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC. + let syncSequenceForSyncingState: SyncSequence? - (completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence { - (updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations) + if let syncCursor { + let syncSequenceToContinue: SyncSequence? = if case let .syncing(syncingData) = state { + syncingData.syncSequence } else { - (nil, nil) + nil } - + var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) + // RTO5b + updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries) syncSequenceForSyncingState = updatedSyncSequence + + completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil } else { // RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC - completedSyncObjectsPool = objectMessages.compactMap { objectMessage in - if let object = objectMessage.object { - .init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp) - } else { - nil - } - } - completedSyncBufferedObjectOperations = nil + completedSyncObjectsPool = syncObjectsPoolEntries syncSequenceForSyncingState = nil } @@ -618,9 +611,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo ) // RTO5c6 - if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty { - logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug) - for objectMessage in completedSyncBufferedObjectOperations { + guard case let .syncing(syncingData) = state else { + // We put ourselves into SYNCING above + preconditionFailure() + } + if let bufferedObjectOperations = syncingData.syncSequence?.bufferedObjectOperations, !bufferedObjectOperations.isEmpty { + logger.log("Applying \(bufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug) + for objectMessage in bufferedObjectOperations { nosync_applyObjectProtocolMessageObjectMessage( objectMessage, logger: logger, From 6a3493857fa11b2e495f91257319cc7c2e175453 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Thu, 15 Jan 2026 11:13:34 +0000 Subject: [PATCH 2/2] Buffer when not SYNCED per updated RTO8a Implement the behaviour described in the RTO8a modification made in spec commit 1956e2f. That is, buffer messages received from the point at which we become ATTACHED, not just whilst we're receiving a multi-OBJECT_SYNC sync sequence. Note that currently, these messages will incorrectly get dropped upon receipt of the sync sequence; this is a bug in all implementations and will be addressed in [1]. [1] https://ably.atlassian.net/browse/AIT-287 --- .../InternalDefaultRealtimeObjects.swift | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 4a03672..373a9ea 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -85,9 +85,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo /// The `ObjectMessage`s gathered during this sync sequence. internal var syncObjectsPool: [SyncObjectsPoolEntry] - - /// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a. - internal var bufferedObjectOperations: [InboundObjectMessage] } internal init( @@ -459,12 +456,16 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo /// Note: We follow the same pattern as used in the WIP ably-swift: a state's associated data is a class instance and the convention is that to update the associated data for the current state you mutate the existing instance instead of creating a new one. enum AssociatedData { class Syncing { + /// `OBJECT` ProtocolMessages that were received whilst SYNCING, to be applied once the sync sequence is complete, per RTO7a. + var bufferedObjectOperations: [InboundObjectMessage] + /// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases. /// /// It is optional because there are times that we transition to SYNCING even when the sync data is contained in a single ProtocolMessage. var syncSequence: SyncSequence? - init(syncSequence: SyncSequence?) { + init(bufferedObjectOperations: [InboundObjectMessage], syncSequence: SyncSequence?) { + self.bufferedObjectOperations = bufferedObjectOperations self.syncSequence = syncSequence } } @@ -509,7 +510,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo // We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below if state.toObjectsSyncState != .syncing { // RTO4c - transition(to: .syncing(.init(syncSequence: nil)), userCallbackQueue: userCallbackQueue) + transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue) } // We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a) @@ -559,6 +560,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo if isNewSyncSequence { // RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3 syncingData.syncSequence = nil + syncingData.bufferedObjectOperations = [] } } @@ -581,7 +583,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo } else { nil } - var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) + var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: []) // RTO5b updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries) syncSequenceForSyncingState = updatedSyncSequence @@ -597,7 +599,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo syncingData.syncSequence = syncSequenceForSyncingState } else { // RTO5e - transition(to: .syncing(.init(syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue) + transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue) } if let completedSyncObjectsPool { @@ -615,7 +617,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo // We put ourselves into SYNCING above preconditionFailure() } - if let bufferedObjectOperations = syncingData.syncSequence?.bufferedObjectOperations, !bufferedObjectOperations.isEmpty { + let bufferedObjectOperations = syncingData.bufferedObjectOperations + if !bufferedObjectOperations.isEmpty { logger.log("Applying \(bufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug) for objectMessage in bufferedObjectOperations { nosync_applyObjectProtocolMessageObjectMessage( @@ -646,12 +649,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo logger.log("handleObjectProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug) - if case let .syncing(syncingData) = state, let existingSyncSequence = syncingData.syncSequence { + if case let .syncing(syncingData) = state { // RTO8a: Buffer the OBJECT message, to be handled once the sync completes + // Note that RTO8a says to buffer if "not SYNCED" (i.e. it includes the INITIALIZED state). But, "if SYNCING" is an equivalent check since we will only receive operations once attached, and we become SYNCING upon receipt of ATTACHED logger.log("Buffering OBJECT message due to in-progress sync", level: .debug) - var newSyncSequence = existingSyncSequence - newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages) - syncingData.syncSequence = newSyncSequence + syncingData.bufferedObjectOperations.append(contentsOf: objectMessages) } else { // RTO8b: Handle the OBJECT message immediately for objectMessage in objectMessages {