From 22c71cf6dbaceb70e86737d1b16514024c763007 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Wed, 9 Jul 2025 09:41:13 -0300 Subject: [PATCH] Buffer OBJECT ProtocolMessages during a sync Based on [1] at 29276a5. I wrote the implementation, and for the tests followed the development approach described in cb427d8. [1] https://github.com/ably/specification/pull/343 --- .../DefaultRealtimeObjects.swift | 65 ++++++++---- .../DefaultRealtimeObjectsTests.swift | 100 +++++++++++++++++- 2 files changed, 144 insertions(+), 21 deletions(-) diff --git a/Sources/AblyLiveObjects/DefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/DefaultRealtimeObjects.swift index b96c82b0..730f519d 100644 --- a/Sources/AblyLiveObjects/DefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/DefaultRealtimeObjects.swift @@ -23,7 +23,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD } } - /// If this returns false, it means that there is currently no stored sync sequence ID or SyncObjectsPool + /// If this returns false, it means that there is currently no stored sync sequence ID, SyncObjectsPool, or BufferedObjectOperations. internal var testsOnly_hasSyncSequence: Bool { mutex.withLock { mutableState.syncSequence != nil @@ -45,6 +45,9 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD /// The `ObjectMessage`s gathered during this sync sequence. internal var syncObjectsPool: [ObjectState] + + /// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a. + internal var bufferedObjectOperations: [InboundObjectMessage] } /// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c. @@ -230,6 +233,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD private struct MutableState { internal var objectsPool: ObjectsPool + /// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases. internal var syncSequence: SyncSequence? internal var syncStatus = SyncStatus() internal var onChannelAttachedHasObjects: Bool? @@ -255,7 +259,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD // I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex. - // RTO4b3, RTO4b4, RTO5c3, RTO5c4 + // RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5 syncSequence = nil syncStatus.signalSyncComplete() } @@ -275,6 +279,8 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD // If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool. let completedSyncObjectsPool: [ObjectState]? + // If populated, this contains a set of buffered inbound OBJECT messages that should be applied. + let completedSyncBufferedObjectOperations: [InboundObjectMessage]? if let protocolMessageChannelSerial { let syncCursor: SyncCursor @@ -292,12 +298,12 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD // RTO5a3: Continue existing sync sequence syncSequence } else { - // RTO5a2: new sequence started, discard previous - .init(id: syncCursor.sequenceID, syncObjectsPool: []) + // RTO5a2a, RTO5a2b: new sequence started, discard previous + .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) } } else { // There's no current sync sequence; start one - .init(id: syncCursor.sequenceID, syncObjectsPool: []) + .init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: []) } // RTO5b @@ -305,14 +311,15 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD syncSequence = updatedSyncSequence - completedSyncObjectsPool = if syncCursor.isEndOfSequence { - updatedSyncSequence.syncObjectsPool + (completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence { + (updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations) } else { - nil + (nil, nil) } } else { // RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC completedSyncObjectsPool = objectMessages.compactMap(\.object) + completedSyncBufferedObjectOperations = nil } if let completedSyncObjectsPool { @@ -323,7 +330,21 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD coreSDK: coreSDK, logger: logger, ) - // RTO5c3, RTO5c4 + + // RTO5c6 + if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty { + logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug) + for objectMessage in completedSyncBufferedObjectOperations { + applyObjectProtocolMessageObjectMessage( + objectMessage, + logger: logger, + mapDelegate: mapDelegate, + coreSDK: coreSDK, + ) + } + } + + // RTO5c3, RTO5c4, RTO5c5 syncSequence = nil syncStatus.signalSyncComplete() @@ -342,16 +363,22 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug) - // TODO: RTO8a's buffering - - // RTO8b - for objectMessage in objectMessages { - applyObjectProtocolMessageObjectMessage( - objectMessage, - logger: logger, - mapDelegate: mapDelegate, - coreSDK: coreSDK, - ) + if let existingSyncSequence = syncSequence { + // RTO8a: Buffer the OBJECT message, to be handled once the sync completes + logger.log("Buffering OBJECT message due to in-progress sync", level: .debug) + var newSyncSequence = existingSyncSequence + newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages) + syncSequence = newSyncSequence + } else { + // RTO8b: Handle the OBJECT message immediately + for objectMessage in objectMessages { + applyObjectProtocolMessageObjectMessage( + objectMessage, + logger: logger, + mapDelegate: mapDelegate, + coreSDK: coreSDK, + ) + } } } diff --git a/Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift b/Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift index 35804f78..a7b6a5cd 100644 --- a/Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift +++ b/Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift @@ -53,6 +53,7 @@ struct DefaultRealtimeObjectsTests { // @spec RTO5b // @spec RTO5c3 // @spec RTO5c4 + // @spec RTO5c5 @Test func handlesMultiProtocolMessageSync() async throws { let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects() @@ -94,7 +95,7 @@ struct DefaultRealtimeObjectsTests { protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end ) - // Verify sync sequence is cleared and there is no SyncObjectsPool (RTO5c3, RTO5c4) + // Verify sync sequence is cleared and there is no SyncObjectsPool or BufferedObjectOperations (RTO5c3, RTO5c4, RTO5c5) #expect(!realtimeObjects.testsOnly_hasSyncSequence) // Verify all objects were applied to pool (side effect of applySyncObjectsPool per RTO5c1b1b) @@ -108,6 +109,7 @@ struct DefaultRealtimeObjectsTests { // @spec RTO5a2 // @spec RTO5a2a + // @spec RTO5a2b @Test func newSequenceIdDiscardsInFlightSync() async throws { let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects() @@ -123,6 +125,11 @@ struct DefaultRealtimeObjectsTests { #expect(realtimeObjects.testsOnly_hasSyncSequence) + // Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b + realtimeObjects.handleObjectProtocolMessage(objectMessages: [ + TestFactories.mapCreateOperationMessage(objectId: "map:3@789"), + ]) + // Start new sequence with different ID (RTO5a2) let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")] realtimeObjects.handleObjectSyncProtocolMessage( @@ -142,6 +149,7 @@ struct DefaultRealtimeObjectsTests { // Verify only the second sequence's objects were applied (RTO5a2a - previous cleared) let pool = realtimeObjects.testsOnly_objectsPool #expect(pool.entries["map:1@123"] == nil) // From discarded first sequence + #expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b) #expect(pool.entries["map:2@456"] != nil) // From completed second sequence #expect(!realtimeObjects.testsOnly_hasSyncSequence) } @@ -336,6 +344,7 @@ struct DefaultRealtimeObjectsTests { // @spec RTO4b2 // @spec RTO4b3 // @spec RTO4b4 + // @spec RTO4b5 @Test func handlesHasObjectsFalse() { let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects() @@ -381,7 +390,7 @@ struct DefaultRealtimeObjectsTests { #expect(newRoot as AnyObject !== originalPool.root as AnyObject) // Should be a new instance #expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty) - // RTO4b3, RTO4b4: SyncObjectsPool must be cleared, sync sequence cleared + // RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared #expect(!realtimeObjects.testsOnly_hasSyncSequence) } @@ -922,5 +931,92 @@ struct DefaultRealtimeObjectsTests { #expect(counter.testsOnly_siteTimeserials["site1"] == "ts2") } } + + // Tests that when an OBJECT ProtocolMessage is received during a sync sequence, its operations are buffered per RTO8a and applied after sync completion per RTO5c6. + struct BufferOperationTests { + // @spec RTO8a + // @spec RTO5c6 + @Test + func buffersObjectOperationsDuringSyncAndAppliesAfterCompletion() async throws { + let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects() + let sequenceId = "seq123" + + // Start sync sequence with first OBJECT_SYNC message + let (entryKey, entry) = TestFactories.stringMapEntry(key: "existingKey", value: "existingValue") + let firstSyncMessages = [ + TestFactories.mapObjectMessage( + objectId: "map:1@123", + siteTimeserials: ["site1": "ts1"], // Explicit sync data siteCode and serial + entries: [entryKey: entry], + ), + ] + realtimeObjects.handleObjectSyncProtocolMessage( + objectMessages: firstSyncMessages, + protocolMessageChannelSerial: "\(sequenceId):cursor1", + ) + + // Verify sync sequence is active + #expect(realtimeObjects.testsOnly_hasSyncSequence) + + // Inject first OBJECT ProtocolMessage during sync (RTO8a) + let firstObjectMessage = TestFactories.mapSetOperationMessage( + objectId: "map:1@123", + key: "key1", + value: "value1", + serial: "ts3", // Higher than sync data "ts1" + siteCode: "site1", + ) + realtimeObjects.handleObjectProtocolMessage(objectMessages: [firstObjectMessage]) + + // Verify the operation was buffered and not applied yet + let poolAfterFirstObject = realtimeObjects.testsOnly_objectsPool + #expect(poolAfterFirstObject.entries["map:1@123"] == nil) // Object not yet created from sync + + // Inject second OBJECT ProtocolMessage during sync (RTO8a) + let secondObjectMessage = TestFactories.counterIncOperationMessage( + objectId: "counter:1@456", + amount: 10, + serial: "ts4", // Higher than sync data "ts2" + siteCode: "site1", + ) + realtimeObjects.handleObjectProtocolMessage(objectMessages: [secondObjectMessage]) + + // Verify the second operation was also buffered and not applied yet + let poolAfterSecondObject = realtimeObjects.testsOnly_objectsPool + #expect(poolAfterSecondObject.entries["counter:1@456"] == nil) // Object not yet created from sync + + // Complete sync sequence with final OBJECT_SYNC message + let finalSyncMessages = [ + TestFactories.counterObjectMessage( + objectId: "counter:1@456", + siteTimeserials: ["site1": "ts2"], + count: 5, + ), + ] + realtimeObjects.handleObjectSyncProtocolMessage( + objectMessages: finalSyncMessages, + protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end + ) + + // Verify sync sequence is cleared + #expect(!realtimeObjects.testsOnly_hasSyncSequence) + + // Verify all objects were applied to pool from sync + let finalPool = realtimeObjects.testsOnly_objectsPool + let map = try #require(finalPool.entries["map:1@123"]?.mapValue) + let counter = try #require(finalPool.entries["counter:1@456"]?.counterValue) + + // Verify the buffered operations were applied after sync completion (RTO5c6) + // Check that MAP_SET operation was applied to the map + let mapValue = try #require(map.get(key: "key1")?.stringValue) + #expect(mapValue == "value1") + #expect(map.testsOnly_siteTimeserials["site1"] == "ts3") + + // Check that COUNTER_INC operation was applied to the counter + let counterValue = try counter.value + #expect(counterValue == 15) // 5 (from sync) + 10 (from buffered operation) + #expect(counter.testsOnly_siteTimeserials["site1"] == "ts4") + } + } } }