Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 50 additions & 51 deletions Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo

/// The `ObjectMessage`s gathered during this sync sequence.
internal var syncObjectsPool: [SyncObjectsPoolEntry]

/// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a.
internal var bufferedObjectOperations: [InboundObjectMessage]
}

internal init(
Expand Down Expand Up @@ -459,12 +456,16 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
/// Note: We follow the same pattern as used in the WIP ably-swift: a state's associated data is a class instance and the convention is that to update the associated data for the current state you mutate the existing instance instead of creating a new one.
enum AssociatedData {
class Syncing {
/// `OBJECT` ProtocolMessages that were received whilst SYNCING, to be applied once the sync sequence is complete, per RTO7a.
var bufferedObjectOperations: [InboundObjectMessage]

/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases.
///
/// It is optional because there are times that we transition to SYNCING even when the sync data is contained in a single ProtocolMessage.
var syncSequence: SyncSequence?

init(syncSequence: SyncSequence?) {
init(bufferedObjectOperations: [InboundObjectMessage], syncSequence: SyncSequence?) {
self.bufferedObjectOperations = bufferedObjectOperations
self.syncSequence = syncSequence
}
}
Expand Down Expand Up @@ -509,7 +510,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
if state.toObjectsSyncState != .syncing {
// RTO4c
transition(to: .syncing(.init(syncSequence: nil)), userCallbackQueue: userCallbackQueue)
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue)
}

// We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a)
Expand Down Expand Up @@ -540,71 +541,65 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo

receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)

// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
let syncSequenceForSyncingState: SyncSequence?

let syncCursor: SyncCursor?
if let protocolMessageChannelSerial {
let syncCursor: SyncCursor
do {
// RTO5a
syncCursor = try SyncCursor(channelSerial: protocolMessageChannelSerial)
} catch {
logger.log("Failed to parse sync cursor: \(error)", level: .error)
return
}
} else {
syncCursor = nil
}

if case let .syncing(syncingData) = state {
// Figure out whether to continue any existing sync sequence or start a new one
var updatedSyncSequence: SyncSequence = if case let .syncing(syncingData) = state, let syncSequence = syncingData.syncSequence {
if syncCursor.sequenceID == syncSequence.id {
// RTO5a3: Continue existing sync sequence
syncSequence
} else {
// RTO5a2a, RTO5a2b: new sequence started, discard previous
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
}
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
if isNewSyncSequence {
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
syncingData.syncSequence = nil
syncingData.bufferedObjectOperations = []
}
}

let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in
if let object = objectMessage.object {
SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
} else {
// There's no current sync sequence; start one
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
nil
}
}

// RTO5b
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap { objectMessage in
if let object = objectMessage.object {
.init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
} else {
nil
}
})
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
let syncSequenceForSyncingState: SyncSequence?

(completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence {
(updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations)
if let syncCursor {
let syncSequenceToContinue: SyncSequence? = if case let .syncing(syncingData) = state {
syncingData.syncSequence
} else {
(nil, nil)
nil
}

var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [])
// RTO5b
updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries)
syncSequenceForSyncingState = updatedSyncSequence

completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
} else {
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
completedSyncObjectsPool = objectMessages.compactMap { objectMessage in
if let object = objectMessage.object {
.init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
} else {
nil
}
}
completedSyncBufferedObjectOperations = nil
completedSyncObjectsPool = syncObjectsPoolEntries
syncSequenceForSyncingState = nil
}

if case let .syncing(syncingData) = state {
syncingData.syncSequence = syncSequenceForSyncingState
} else {
// RTO5e
transition(to: .syncing(.init(syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue)
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue)
}

if let completedSyncObjectsPool {
Expand All @@ -618,9 +613,14 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
)

// RTO5c6
if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty {
logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
for objectMessage in completedSyncBufferedObjectOperations {
guard case let .syncing(syncingData) = state else {
// We put ourselves into SYNCING above
preconditionFailure()
}
let bufferedObjectOperations = syncingData.bufferedObjectOperations
if !bufferedObjectOperations.isEmpty {
logger.log("Applying \(bufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
for objectMessage in bufferedObjectOperations {
nosync_applyObjectProtocolMessageObjectMessage(
objectMessage,
logger: logger,
Expand Down Expand Up @@ -649,12 +649,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo

logger.log("handleObjectProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)

if case let .syncing(syncingData) = state, let existingSyncSequence = syncingData.syncSequence {
if case let .syncing(syncingData) = state {
// RTO8a: Buffer the OBJECT message, to be handled once the sync completes
// Note that RTO8a says to buffer if "not SYNCED" (i.e. it includes the INITIALIZED state). But, "if SYNCING" is an equivalent check since we will only receive operations once attached, and we become SYNCING upon receipt of ATTACHED
logger.log("Buffering OBJECT message due to in-progress sync", level: .debug)
var newSyncSequence = existingSyncSequence
newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages)
syncingData.syncSequence = newSyncSequence
syncingData.bufferedObjectOperations.append(contentsOf: objectMessages)
} else {
// RTO8b: Handle the OBJECT message immediately
for objectMessage in objectMessages {
Expand Down
Loading