diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 86f1757..373a9ea 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -85,9 +85,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo /// The `ObjectMessage`s gathered during this sync sequence. internal var syncObjectsPool: [SyncObjectsPoolEntry] - - /// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a. - internal var bufferedObjectOperations: [InboundObjectMessage] } internal init( @@ -459,12 +456,16 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo /// Note: We follow the same pattern as used in the WIP ably-swift: a state's associated data is a class instance and the convention is that to update the associated data for the current state you mutate the existing instance instead of creating a new one. enum AssociatedData { class Syncing { + /// `OBJECT` ProtocolMessages that were received whilst SYNCING, to be applied once the sync sequence is complete, per RTO7a. + var bufferedObjectOperations: [InboundObjectMessage] + /// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases. /// /// It is optional because there are times that we transition to SYNCING even when the sync data is contained in a single ProtocolMessage. var syncSequence: SyncSequence? - init(syncSequence: SyncSequence?) { + init(bufferedObjectOperations: [InboundObjectMessage], syncSequence: SyncSequence?) { + self.bufferedObjectOperations = bufferedObjectOperations self.syncSequence = syncSequence } } @@ -509,7 +510,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo // We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below if state.toObjectsSyncState != .syncing { // RTO4c - transition(to: .syncing(.init(syncSequence: nil)), userCallbackQueue: userCallbackQueue) + transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue) } // We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a) @@ -540,15 +541,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages) - // If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool. - let completedSyncObjectsPool: [SyncObjectsPoolEntry]? - // If populated, this contains a set of buffered inbound OBJECT messages that should be applied. - let completedSyncBufferedObjectOperations: [InboundObjectMessage]? - // The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC. - let syncSequenceForSyncingState: SyncSequence? - + let syncCursor: SyncCursor? if let protocolMessageChannelSerial { - let syncCursor: SyncCursor do { // RTO5a syncCursor = try SyncCursor(channelSerial: protocolMessageChannelSerial) @@ -556,47 +550,48 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo logger.log("Failed to parse sync cursor: \(error)", level: .error) return } + } else { + syncCursor = nil + } + if case let .syncing(syncingData) = state { // Figure out whether to continue any existing sync sequence or start a new one - var updatedSyncSequence: SyncSequence = if case let .syncing(syncingData) = state, let syncSequence = syncingData.syncSequence { - if syncCursor.sequenceID == syncSequence.id { - // RTO5a3: Continue existing sync sequence - syncSequence - } else { - // RTO5a2a, RTO5a2b: new sequence started, discard previous - .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) - } + let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID + if isNewSyncSequence { + // RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3 + syncingData.syncSequence = nil + syncingData.bufferedObjectOperations = [] + } + } + + let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in + if let object = objectMessage.object { + SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp) } else { - // There's no current sync sequence; start one - .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) + nil } + } - // RTO5b - updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap { objectMessage in - if let object = objectMessage.object { - .init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp) - } else { - nil - } - }) + // If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool. + let completedSyncObjectsPool: [SyncObjectsPoolEntry]? + // The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC. + let syncSequenceForSyncingState: SyncSequence? - (completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence { - (updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations) + if let syncCursor { + let syncSequenceToContinue: SyncSequence? = if case let .syncing(syncingData) = state { + syncingData.syncSequence } else { - (nil, nil) + nil } - + var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: []) + // RTO5b + updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries) syncSequenceForSyncingState = updatedSyncSequence + + completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil } else { // RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC - completedSyncObjectsPool = objectMessages.compactMap { objectMessage in - if let object = objectMessage.object { - .init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp) - } else { - nil - } - } - completedSyncBufferedObjectOperations = nil + completedSyncObjectsPool = syncObjectsPoolEntries syncSequenceForSyncingState = nil } @@ -604,7 +599,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo syncingData.syncSequence = syncSequenceForSyncingState } else { // RTO5e - transition(to: .syncing(.init(syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue) + transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue) } if let completedSyncObjectsPool { @@ -618,9 +613,14 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo ) // RTO5c6 - if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty { - logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug) - for objectMessage in completedSyncBufferedObjectOperations { + guard case let .syncing(syncingData) = state else { + // We put ourselves into SYNCING above + preconditionFailure() + } + let bufferedObjectOperations = syncingData.bufferedObjectOperations + if !bufferedObjectOperations.isEmpty { + logger.log("Applying \(bufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug) + for objectMessage in bufferedObjectOperations { nosync_applyObjectProtocolMessageObjectMessage( objectMessage, logger: logger, @@ -649,12 +649,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo logger.log("handleObjectProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug) - if case let .syncing(syncingData) = state, let existingSyncSequence = syncingData.syncSequence { + if case let .syncing(syncingData) = state { // RTO8a: Buffer the OBJECT message, to be handled once the sync completes + // Note that RTO8a says to buffer if "not SYNCED" (i.e. it includes the INITIALIZED state). But, "if SYNCING" is an equivalent check since we will only receive operations once attached, and we become SYNCING upon receipt of ATTACHED logger.log("Buffering OBJECT message due to in-progress sync", level: .debug) - var newSyncSequence = existingSyncSequence - newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages) - syncingData.syncSequence = newSyncSequence + syncingData.bufferedObjectOperations.append(contentsOf: objectMessages) } else { // RTO8b: Handle the OBJECT message immediately for objectMessage in objectMessages {