diff --git a/AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved b/AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved index 4807fd4..82e1637 100644 --- a/AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,13 +1,12 @@ { - "originHash" : "d2622c7dbe5a5b55d2e9d411424ad63f4ff8431f36d6cdf3f0a79a62cafd7f3c", + "originHash" : "9ea403c00595358af830ab51461660f07512189e5befab77918ff271d7de6433", "pins" : [ { "identity" : "ably-cocoa", "kind" : "remoteSourceControl", "location" : "https://github.com/ably/ably-cocoa", "state" : { - "revision" : "aa7db56bfda49595835d7f9248dacd40c5f3faf0", - "version" : "1.2.55" + "revision" : "ee0b1fabf2e30377d6a1d90f3d2b6614d16951d8" } }, { @@ -15,8 +14,7 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/ably/ably-cocoa-plugin-support", "state" : { - "revision" : "dd118432a6e023c3d2c8a051299e8081a06db036", - "version" : "1.0.0" + "revision" : "2e640ce2ef422cd5ef693a9e1111400e9985e088" } }, { diff --git a/Package.resolved b/Package.resolved index 09520cd..5cb7b47 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,13 +1,12 @@ { - "originHash" : "786c7c01be692ceb19bda5333dd720f2afb1cc13314dabce1d4c6101c9c6700b", + "originHash" : "3e9e0e2513db25869bbf8b6b55bc1e1d01a5782a2e5c7855f52ec47b1334c477", "pins" : [ { "identity" : "ably-cocoa", "kind" : "remoteSourceControl", "location" : "https://github.com/ably/ably-cocoa", "state" : { - "revision" : "aa7db56bfda49595835d7f9248dacd40c5f3faf0", - "version" : "1.2.55" + "revision" : "ee0b1fabf2e30377d6a1d90f3d2b6614d16951d8" } }, { @@ -15,8 +14,7 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/ably/ably-cocoa-plugin-support", "state" : { - "revision" : "dd118432a6e023c3d2c8a051299e8081a06db036", - "version" : "1.0.0" + "revision" : "2e640ce2ef422cd5ef693a9e1111400e9985e088" } }, { diff --git a/Package.swift b/Package.swift index 294f0e8..f7901c3 100644 --- a/Package.swift +++ b/Package.swift @@ -18,13 +18,15 @@ let package = Package( ), ], dependencies: [ + // TODO: Unpin before release .package( url: "https://github.com/ably/ably-cocoa", - from: "1.2.55", + revision: "ee0b1fabf2e30377d6a1d90f3d2b6614d16951d8", ), + // TODO: Unpin before release .package( url: "https://github.com/ably/ably-cocoa-plugin-support", - from: "1.0.0", + revision: "2e640ce2ef422cd5ef693a9e1111400e9985e088", ), .package( url: "https://github.com/apple/swift-argument-parser", diff --git a/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift b/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift index e9f801d..0da597f 100644 --- a/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift +++ b/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift @@ -114,8 +114,12 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate. return wireObjectMessage.toWireObject.toPluginSupportDataDictionary } - internal func nosync_onChannelAttached(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects: Bool) { - nosync_realtimeObjects(for: channel).nosync_onChannelAttached(hasObjects: hasObjects) + internal func nosync_onChannelAttached(_: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects _: Bool) { + preconditionFailure("Expected a version of ably-cocoa that calls the variant that accepts a `resumed` argument") + } + + internal func nosync_onChannelAttached(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects: Bool, resumed: Bool) { + nosync_realtimeObjects(for: channel).nosync_onChannelAttached(hasObjects: hasObjects, resumed: resumed) } internal func nosync_handleObjectProtocolMessage(withObjectMessages publicObjectMessages: [any _AblyPluginSupportPrivate.ObjectMessageProtocol], channel: _AblyPluginSupportPrivate.RealtimeChannel) { diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 373a9ea..a86b99d 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -302,10 +302,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo } } - internal func nosync_onChannelAttached(hasObjects: Bool) { + internal func nosync_onChannelAttached(hasObjects: Bool, resumed: Bool) { mutableStateMutex.withoutSync { mutableState in mutableState.nosync_onChannelAttached( hasObjects: hasObjects, + resumed: resumed, logger: logger, userCallbackQueue: userCallbackQueue, ) @@ -500,13 +501,20 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo internal mutating func nosync_onChannelAttached( hasObjects: Bool, + resumed: Bool, logger: Logger, userCallbackQueue: DispatchQueue, ) { - logger.log("onChannelAttached(hasObjects: \(hasObjects)", level: .debug) + logger.log("onChannelAttached(hasObjects: \(hasObjects), resumed: \(resumed)", level: .debug) onChannelAttachedHasObjects = hasObjects + // RTO4d: If the RESUMED flag is not set, clear any buffered operations + if !resumed, case let .syncing(syncingData) = state { + logger.log("Clearing buffered operations due to ATTACHED without RESUMED flag", level: .debug) + syncingData.bufferedObjectOperations = [] + } + // We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below if state.toObjectsSyncState != .syncing { // RTO4c @@ -558,9 +566,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo // Figure out whether to continue any existing sync sequence or start a new one let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID if isNewSyncSequence { - // RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3 + // RTO5a2a: new sequence started, discard previous SyncObjectsPool. Else we continue the existing sequence per RTO5a3 syncingData.syncSequence = nil - syncingData.bufferedObjectOperations = [] } } diff --git a/Tests/AblyLiveObjectsTests/AblyLiveObjectsTests.swift b/Tests/AblyLiveObjectsTests/AblyLiveObjectsTests.swift index bb5a419..e3b7630 100644 --- a/Tests/AblyLiveObjectsTests/AblyLiveObjectsTests.swift +++ b/Tests/AblyLiveObjectsTests/AblyLiveObjectsTests.swift @@ -3,6 +3,9 @@ import Ably @testable import AblyLiveObjects import Testing +@Suite( + .tags(.integration), +) struct AblyLiveObjectsTests { @Test func objectsProperty() async throws { diff --git a/Tests/AblyLiveObjectsTests/Helpers/Tags.swift b/Tests/AblyLiveObjectsTests/Helpers/Tags.swift new file mode 100644 index 0000000..106d79e --- /dev/null +++ b/Tests/AblyLiveObjectsTests/Helpers/Tags.swift @@ -0,0 +1,6 @@ +import Testing + +extension Tag { + /// Any test that is not a unit test. This usually implies that it has a non-trivial execution time. + @Tag static var integration: Self +} diff --git a/Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift b/Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift index 01975cc..dfaef69 100644 --- a/Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift +++ b/Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift @@ -126,7 +126,6 @@ struct InternalDefaultRealtimeObjectsTests { // @spec RTO5a2 // @spec RTO5a2a - // @spec RTO5a2b @Test func newSequenceIdDiscardsInFlightSync() async throws { let internalQueue = TestFactories.createInternalQueue() @@ -145,13 +144,6 @@ struct InternalDefaultRealtimeObjectsTests { #expect(realtimeObjects.testsOnly_hasSyncSequence) - // Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b - internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [ - TestFactories.mapCreateOperationMessage(objectId: "map:3@789"), - ]) - } - // Start new sequence with different ID (RTO5a2) let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")] internalQueue.ably_syncNoDeadlock { @@ -172,10 +164,9 @@ struct InternalDefaultRealtimeObjectsTests { ) } - // Verify only the second sequence's objects were applied (RTO5a2a - previous cleared) + // Verify the second sequence's objects were applied (RTO5a2a - SyncObjectsPool was cleared) let pool = realtimeObjects.testsOnly_objectsPool - #expect(pool.entries["map:1@123"] == nil) // From discarded first sequence - #expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b) + #expect(pool.entries["map:1@123"] == nil) // From discarded first sequence's SyncObjectsPool #expect(pool.entries["map:2@456"] != nil) // From completed second sequence #expect(!realtimeObjects.testsOnly_hasSyncSequence) } @@ -373,7 +364,7 @@ struct InternalDefaultRealtimeObjectsTests { // When: onChannelAttached is called with hasObjects = true internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: true) + realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false /* value of `resumed` is arbitrary */ ) } // Then: Nothing should be modified @@ -389,6 +380,61 @@ struct InternalDefaultRealtimeObjectsTests { #expect(realtimeObjects.testsOnly_hasSyncSequence) } + // MARK: - RTO4d Tests + + // @spec RTO4d + @Test(arguments: [ + // resumed = false: buffered operations should be cleared + (resumed: false, expectClearBuffer: true), + // resumed = true: buffered operations should be preserved + (resumed: true, expectClearBuffer: false), + ]) + func bufferedOperationsClearedBasedOnResumedFlag(resumed: Bool, expectClearBuffer: Bool) { + let internalQueue = TestFactories.createInternalQueue() + let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue) + + // Start a sync sequence + internalQueue.ably_syncNoDeadlock { + realtimeObjects.nosync_handleObjectSyncProtocolMessage( + objectMessages: [ + TestFactories.mapObjectMessage(objectId: "map:sync@123"), + ], + protocolMessageChannelSerial: "seq1:cursor1", + ) + } + + #expect(realtimeObjects.testsOnly_hasSyncSequence) + + // Buffer some OBJECT operations during the sync + internalQueue.ably_syncNoDeadlock { + realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [ + TestFactories.mapCreateOperationMessage(objectId: "map:buffered@456"), + ]) + } + + // When: onChannelAttached is called with the given resumed flag + internalQueue.ably_syncNoDeadlock { + realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: resumed) + } + + // Continue and complete the sync sequence + internalQueue.ably_syncNoDeadlock { + realtimeObjects.nosync_handleObjectSyncProtocolMessage( + objectMessages: [], + protocolMessageChannelSerial: "seq1:", + ) + } + + // Then: The buffered OBJECT operation should be cleared or preserved based on the resumed flag + let pool = realtimeObjects.testsOnly_objectsPool + #expect(pool.entries["map:sync@123"] != nil) // From the sync sequence + if expectClearBuffer { + #expect(pool.entries["map:buffered@456"] == nil) + } else { + #expect(pool.entries["map:buffered@456"] != nil) + } + } + // MARK: - RTO4b Tests // @spec RTO4b1 @@ -440,7 +486,7 @@ struct InternalDefaultRealtimeObjectsTests { // When: onChannelAttached is called with hasObjects = false internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // Then: Verify the expected behavior per RTO4b @@ -476,7 +522,7 @@ struct InternalDefaultRealtimeObjectsTests { // First call with hasObjects = true (should do nothing) internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: true) + realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false /* value of `resumed` is arbitrary */ ) } #expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true) let originalPool = realtimeObjects.testsOnly_objectsPool @@ -484,7 +530,7 @@ struct InternalDefaultRealtimeObjectsTests { // Second call with hasObjects = false (should reset) internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } #expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == false) let newPool = realtimeObjects.testsOnly_objectsPool @@ -493,7 +539,7 @@ struct InternalDefaultRealtimeObjectsTests { // Third call with hasObjects = true again (should do nothing) internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: true) + realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false /* value of `resumed` is arbitrary */ ) } #expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true) let finalPool = realtimeObjects.testsOnly_objectsPool @@ -530,7 +576,7 @@ struct InternalDefaultRealtimeObjectsTests { // When: onChannelAttached is called with hasObjects = false internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // Then: All sync data should be discarded @@ -565,7 +611,7 @@ struct InternalDefaultRealtimeObjectsTests { // When: onChannelAttached is called with hasObjects = false internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // Then: Should still reset the pool correctly @@ -583,7 +629,7 @@ struct InternalDefaultRealtimeObjectsTests { // When: onChannelAttached is called with hasObjects = false internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // Then: The new root should be properly initialized @@ -611,7 +657,7 @@ struct InternalDefaultRealtimeObjectsTests { // Complete sync via ATTACHED with HAS_OBJECTS false (RTO4b) internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // getRoot should now complete @@ -736,7 +782,7 @@ struct InternalDefaultRealtimeObjectsTests { // Complete sync first internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // getRoot should return @@ -761,7 +807,7 @@ struct InternalDefaultRealtimeObjectsTests { // Complete sync first internalQueue.ably_syncNoDeadlock { - realtimeObjects.nosync_onChannelAttached(hasObjects: false) + realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ ) } // Call getRoot @@ -1602,7 +1648,8 @@ struct InternalDefaultRealtimeObjectsTests { for channelEvent in scenario.channelEvents { switch channelEvent { case let .attached(hasObjects): - realtimeObjects.nosync_onChannelAttached(hasObjects: hasObjects) + // The value of `resumed` is arbitrary for these tests; they're focused on sync state transitions + realtimeObjects.nosync_onChannelAttached(hasObjects: hasObjects, resumed: true) case let .objectSync(channelSerial): realtimeObjects.nosync_handleObjectSyncProtocolMessage( objectMessages: [], diff --git a/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift b/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift index fcab1b6..f2d7116 100644 --- a/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift +++ b/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift @@ -39,6 +39,20 @@ private func lexicoTimeserial(seriesId: String, timestamp: Int64, counter: Int, return result } +/// Helper function to inject an ATTACHED protocol message +private func injectAttachedMessage(channel: ARTRealtimeChannel, flags: Int64) async { + await withCheckedContinuation { continuation in + channel.internal.queue.async { + let pm = ARTProtocolMessage() + pm.action = .attached + pm.channel = channel.name + pm.flags = flags + channel.internal.onChannelMessage(pm) + continuation.resume() + } + } +} + func monitorConnectionThenCloseAndFinishAsync(_ realtime: ARTRealtime, action: @escaping @Sendable () async throws -> Void) async throws { defer { realtime.connection.close() } @@ -338,6 +352,7 @@ class MainActorStorage { // MARK: - Test suite @Suite( + .tags(.integration), .objectsFixtures, // These tests exhibit flakiness (hanging, timeouts, occasional Realtime // connection limits) when run concurrently, where I think that we had up to @@ -2064,12 +2079,11 @@ private struct ObjectsIntegrationTests { .init( disabled: false, allTransportsAndProtocols: false, - description: "buffered object operation messages are discarded when new OBJECT_SYNC sequence starts", - action: { ctx in + description: "buffered object operation messages are discarded on non-resume ATTACHED message", + action: { ctx throws in let root = ctx.root let objectsHelper = ctx.objectsHelper let channel = ctx.channel - let client = ctx.client // Start new sync sequence with a cursor so client will wait for the next OBJECT_SYNC messages await objectsHelper.processObjectStateMessageOnChannel( @@ -2077,51 +2091,179 @@ private struct ObjectsIntegrationTests { syncSerial: "serial:cursor", ) - // Inject operations, expect them to be discarded when sync with new sequence id starts - // Note that unlike in the JS test we do not perform this concurrently because if we were to do that in Swift Concurrency we would not be able to guarantee that the operations are applied in the correct order (if they're not then messages will be discarded due to serials being out of order) - for (i, keyData) in primitiveKeyData.enumerated() { - var wireData = keyData.data.mapValues { WireValue(jsonValue: $0) } + // Inject operation during first sync sequence, expect it to be discarded when non-resume ATTACHED arrives + await objectsHelper.processObjectOperationMessageOnChannel( + channel: channel, + serial: lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0), + siteCode: "aaa", + state: [objectsHelper.mapSetOp(objectId: "root", key: "foo", data: .object(["string": .string("bar")]))], + ) - if let bytesValue = wireData["bytes"], client.internal.options.useBinaryProtocol { - let bytesString = try #require(bytesValue.stringValue) - wireData["bytes"] = try .data(#require(.init(base64Encoded: bytesString))) - } + // Simulate non-resume ATTACHED message (RESUMED flag absent). + // The realtime server caches the object state at the moment of channel attachment, + // so buffered operations from before the attach must be discarded. + await injectAttachedMessage(channel: channel, flags: 1 << 7) // HAS_OBJECTS flag is bit 7 - await objectsHelper.processObjectOperationMessageOnChannel( - channel: channel, - serial: lexicoTimeserial(seriesId: "aaa", timestamp: Int64(i), counter: 0), - siteCode: "aaa", - state: [objectsHelper.mapSetOp(objectId: "root", key: keyData.key, data: .object(wireData))], - ) - } + // Inject another operation that should be applied when sync ends + await objectsHelper.processObjectOperationMessageOnChannel( + channel: channel, + serial: lexicoTimeserial(seriesId: "bbb", timestamp: 0, counter: 0), + siteCode: "bbb", + state: [objectsHelper.mapSetOp(objectId: "root", key: "baz", data: .object(["string": .string("qux")]))], + ) + + // End sync + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "serial:", + ) - // Start new sync with new sequence id + // Check root doesn't have data from operations received before non-resume ATTACHED + #expect(try root.get(key: "foo") == nil, "Check buffered ops before non-resume ATTACHED were discarded and not applied on root") + + // Check root has data from operations received after non-resume ATTACHED + #expect(try #require(root.get(key: "baz")?.stringValue) == "qux", "Check root has data from operations received after non-resume ATTACHED") + }, + ), + .init( + disabled: false, + allTransportsAndProtocols: false, + description: "buffered object operation messages are NOT discarded on ATTACHED message with RESUMED flag", + action: { ctx throws in + let root = ctx.root + let objectsHelper = ctx.objectsHelper + let channel = ctx.channel + + // Start new sync sequence with a cursor so client will wait for the next OBJECT_SYNC messages await objectsHelper.processObjectStateMessageOnChannel( channel: channel, - syncSerial: "otherserial:cursor", + syncSerial: "serial:cursor", + ) + + // Inject operations, expect them to be kept when resume ATTACHED arrives + await objectsHelper.processObjectOperationMessageOnChannel( + channel: channel, + serial: lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0), + siteCode: "aaa", + state: [objectsHelper.mapSetOp(objectId: "root", key: "foo", data: .object(["string": .string("bar")]))], ) - // Inject another operation that should be applied when latest sync ends + // Simulate resume ATTACHED message (RESUMED flag set). + // When resuming, the client continues from where it left off, so buffered operations + // must be preserved as they were received after the original attachment point. + await injectAttachedMessage(channel: channel, flags: (1 << 7) | (1 << 2)) // HAS_OBJECTS flag and RESUMED flag + + // Inject another operation after resume ATTACHED await objectsHelper.processObjectOperationMessageOnChannel( channel: channel, serial: lexicoTimeserial(seriesId: "bbb", timestamp: 0, counter: 0), siteCode: "bbb", + state: [objectsHelper.mapSetOp(objectId: "root", key: "baz", data: .object(["string": .string("qux")]))], + ) + + // End sync + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "serial:", + ) + + // Check root has data from both operations - before and after resume ATTACHED + #expect(try #require(root.get(key: "foo")?.stringValue) == "bar", "Check root has data from operations received and buffered before resume ATTACHED") + #expect(try #require(root.get(key: "baz")?.stringValue) == "qux", "Check root has data from operations received after resume ATTACHED") + }, + ), + .init( + disabled: false, + allTransportsAndProtocols: false, + description: "buffered object operation messages are NOT discarded on new OBJECT_SYNC sequence", + action: { ctx throws in + let root = ctx.root + let objectsHelper = ctx.objectsHelper + let channel = ctx.channel + + // Start new sync sequence with a cursor so client will wait for the next OBJECT_SYNC messages + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "serial:cursor", + ) + + // Inject operation during first sync sequence + await objectsHelper.processObjectOperationMessageOnChannel( + channel: channel, + serial: lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0), + siteCode: "aaa", state: [objectsHelper.mapSetOp(objectId: "root", key: "foo", data: .object(["string": .string("bar")]))], ) + // Start new sync with new sequence id - buffered operations should NOT be discarded. + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "otherserial:cursor", + ) + + // Inject another operation during second sync sequence + await objectsHelper.processObjectOperationMessageOnChannel( + channel: channel, + serial: lexicoTimeserial(seriesId: "bbb", timestamp: 0, counter: 0), + siteCode: "bbb", + state: [objectsHelper.mapSetOp(objectId: "root", key: "baz", data: .object(["string": .string("qux")]))], + ) + // End sync await objectsHelper.processObjectStateMessageOnChannel( channel: channel, syncSerial: "otherserial:", ) - // Check root doesn't have data from operations received during first sync - for keyData in primitiveKeyData { - #expect(try root.get(key: keyData.key) == nil, "Check \"\(keyData.key)\" key doesn't exist on root when OBJECT_SYNC has ended") - } + // Check root has data from operations received during first sync sequence + #expect(try #require(root.get(key: "foo")?.stringValue) == "bar", "Check root has data from operations received during first OBJECT_SYNC sequence") // Check root has data from operations received during second sync - #expect(try #require(root.get(key: "foo")?.stringValue) == "bar", "Check root has data from operations received during second OBJECT_SYNC sequence") + #expect(try #require(root.get(key: "baz")?.stringValue) == "qux", "Check root has data from operations received during second OBJECT_SYNC sequence") + }, + ), + .init( + disabled: false, + allTransportsAndProtocols: false, + description: "operations are buffered when OBJECT_SYNC is received after completed sync without expected preceding ATTACHED", + action: { ctx throws in + let root = ctx.root + let objectsHelper = ctx.objectsHelper + let channel = ctx.channel + + // Complete an initial sync sequence first + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "serial:", + ) + + // Simulate receiving OBJECT_SYNC without preceding ATTACHED. + // Normally, for server-initiated resync the server is expected to send ATTACHED with RESUMED=false first. + // However, if that doesn't happen, the client handles it as a best-effort case by starting to buffer from this point. + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "resync:cursor", + ) + + // Inject operations during this server-initiated resync - they should be buffered + await objectsHelper.processObjectOperationMessageOnChannel( + channel: channel, + serial: lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0), + siteCode: "aaa", + state: [objectsHelper.mapSetOp(objectId: "root", key: "foo", data: .object(["string": .string("bar")]))], + ) + + // Check root doesn't have data yet - operations should be buffered during resync + #expect(try root.get(key: "foo") == nil, "Check \"foo\" key doesn't exist during server-initiated resync") + + // End the resync + await objectsHelper.processObjectStateMessageOnChannel( + channel: channel, + syncSerial: "resync:", + ) + + // Check buffered operations are now applied + #expect(try #require(root.get(key: "foo")?.stringValue) == "bar", "Check root has correct value for \"foo\" key after server-initiated resync completed") }, ), .init( diff --git a/Tests/AblyLiveObjectsTests/ObjectLifetimesTests.swift b/Tests/AblyLiveObjectsTests/ObjectLifetimesTests.swift index 9c83e4d..db7b4ce 100644 --- a/Tests/AblyLiveObjectsTests/ObjectLifetimesTests.swift +++ b/Tests/AblyLiveObjectsTests/ObjectLifetimesTests.swift @@ -2,6 +2,9 @@ import Ably.Private @testable import AblyLiveObjects import Testing +@Suite( + .tags(.integration), +) struct ObjectLifetimesTests { @Test("LiveObjects functionality works with only a strong reference to channel's public objects property") func withStrongReferenceToPublicObjectsProperty() async throws {