Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ let package = Package(
),
],
dependencies: [
// TODO: Unpin before release
.package(
url: "https://github.com/ably/ably-cocoa",
from: "1.2.55",
revision: "ee0b1fabf2e30377d6a1d90f3d2b6614d16951d8",
),
// TODO: Unpin before release
.package(
url: "https://github.com/ably/ably-cocoa-plugin-support",
from: "1.0.0",
revision: "2e640ce2ef422cd5ef693a9e1111400e9985e088",
),
.package(
url: "https://github.com/apple/swift-argument-parser",
Expand Down
8 changes: 6 additions & 2 deletions Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,12 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
return wireObjectMessage.toWireObject.toPluginSupportDataDictionary
}

internal func nosync_onChannelAttached(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects: Bool) {
nosync_realtimeObjects(for: channel).nosync_onChannelAttached(hasObjects: hasObjects)
internal func nosync_onChannelAttached(_: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects _: Bool) {
preconditionFailure("Expected a version of ably-cocoa that calls the variant that accepts a `resumed` argument")
}

internal func nosync_onChannelAttached(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects: Bool, resumed: Bool) {
nosync_realtimeObjects(for: channel).nosync_onChannelAttached(hasObjects: hasObjects, resumed: resumed)
}

internal func nosync_handleObjectProtocolMessage(withObjectMessages publicObjectMessages: [any _AblyPluginSupportPrivate.ObjectMessageProtocol], channel: _AblyPluginSupportPrivate.RealtimeChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
}
}

internal func nosync_onChannelAttached(hasObjects: Bool) {
internal func nosync_onChannelAttached(hasObjects: Bool, resumed: Bool) {
mutableStateMutex.withoutSync { mutableState in
mutableState.nosync_onChannelAttached(
hasObjects: hasObjects,
resumed: resumed,
logger: logger,
userCallbackQueue: userCallbackQueue,
)
Expand Down Expand Up @@ -500,13 +501,20 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo

internal mutating func nosync_onChannelAttached(
hasObjects: Bool,
resumed: Bool,
logger: Logger,
userCallbackQueue: DispatchQueue,
) {
logger.log("onChannelAttached(hasObjects: \(hasObjects)", level: .debug)
logger.log("onChannelAttached(hasObjects: \(hasObjects), resumed: \(resumed)", level: .debug)

onChannelAttachedHasObjects = hasObjects

// RTO4d: If the RESUMED flag is not set, clear any buffered operations
if !resumed, case let .syncing(syncingData) = state {
logger.log("Clearing buffered operations due to ATTACHED without RESUMED flag", level: .debug)
syncingData.bufferedObjectOperations = []
}

// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
if state.toObjectsSyncState != .syncing {
// RTO4c
Expand Down Expand Up @@ -558,9 +566,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
// Figure out whether to continue any existing sync sequence or start a new one
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
if isNewSyncSequence {
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
// RTO5a2a: new sequence started, discard previous SyncObjectsPool. Else we continue the existing sequence per RTO5a3
syncingData.syncSequence = nil
syncingData.bufferedObjectOperations = []
}
}

Expand Down
3 changes: 3 additions & 0 deletions Tests/AblyLiveObjectsTests/AblyLiveObjectsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import Ably
@testable import AblyLiveObjects
import Testing

@Suite(
.tags(.integration),
)
struct AblyLiveObjectsTests {
@Test
func objectsProperty() async throws {
Expand Down
6 changes: 6 additions & 0 deletions Tests/AblyLiveObjectsTests/Helpers/Tags.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Testing

extension Tag {
/// Any test that is not a unit test. This usually implies that it has a non-trivial execution time.
@Tag static var integration: Self
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ struct InternalDefaultRealtimeObjectsTests {

// @spec RTO5a2
// @spec RTO5a2a
// @spec RTO5a2b
@Test
func newSequenceIdDiscardsInFlightSync() async throws {
let internalQueue = TestFactories.createInternalQueue()
Expand All @@ -145,13 +144,6 @@ struct InternalDefaultRealtimeObjectsTests {

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
])
}

// Start new sequence with different ID (RTO5a2)
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
internalQueue.ably_syncNoDeadlock {
Expand All @@ -172,10 +164,9 @@ struct InternalDefaultRealtimeObjectsTests {
)
}

// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
// Verify the second sequence's objects were applied (RTO5a2a - SyncObjectsPool was cleared)
let pool = realtimeObjects.testsOnly_objectsPool
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence's SyncObjectsPool
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
}
Expand Down Expand Up @@ -373,7 +364,7 @@ struct InternalDefaultRealtimeObjectsTests {

// When: onChannelAttached is called with hasObjects = true
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false /* value of `resumed` is arbitrary */ )
}

// Then: Nothing should be modified
Expand All @@ -389,6 +380,61 @@ struct InternalDefaultRealtimeObjectsTests {
#expect(realtimeObjects.testsOnly_hasSyncSequence)
}

// MARK: - RTO4d Tests

// @spec RTO4d
@Test(arguments: [
// resumed = false: buffered operations should be cleared
(resumed: false, expectClearBuffer: true),
// resumed = true: buffered operations should be preserved
(resumed: true, expectClearBuffer: false),
])
func bufferedOperationsClearedBasedOnResumedFlag(resumed: Bool, expectClearBuffer: Bool) {
let internalQueue = TestFactories.createInternalQueue()
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)

// Start a sync sequence
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
objectMessages: [
TestFactories.mapObjectMessage(objectId: "map:sync@123"),
],
protocolMessageChannelSerial: "seq1:cursor1",
)
}

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Buffer some OBJECT operations during the sync
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@456"),
])
}

// When: onChannelAttached is called with the given resumed flag
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: resumed)
}

// Continue and complete the sync sequence
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
objectMessages: [],
protocolMessageChannelSerial: "seq1:",
)
}

// Then: The buffered OBJECT operation should be cleared or preserved based on the resumed flag
let pool = realtimeObjects.testsOnly_objectsPool
#expect(pool.entries["map:sync@123"] != nil) // From the sync sequence
if expectClearBuffer {
#expect(pool.entries["map:buffered@456"] == nil)
} else {
#expect(pool.entries["map:buffered@456"] != nil)
}
}

// MARK: - RTO4b Tests

// @spec RTO4b1
Expand Down Expand Up @@ -440,7 +486,7 @@ struct InternalDefaultRealtimeObjectsTests {

// When: onChannelAttached is called with hasObjects = false
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// Then: Verify the expected behavior per RTO4b
Expand Down Expand Up @@ -476,15 +522,15 @@ struct InternalDefaultRealtimeObjectsTests {

// First call with hasObjects = true (should do nothing)
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false /* value of `resumed` is arbitrary */ )
}
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)
let originalPool = realtimeObjects.testsOnly_objectsPool
let originalRoot = originalPool.root

// Second call with hasObjects = false (should reset)
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == false)
let newPool = realtimeObjects.testsOnly_objectsPool
Expand All @@ -493,7 +539,7 @@ struct InternalDefaultRealtimeObjectsTests {

// Third call with hasObjects = true again (should do nothing)
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false /* value of `resumed` is arbitrary */ )
}
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)
let finalPool = realtimeObjects.testsOnly_objectsPool
Expand Down Expand Up @@ -530,7 +576,7 @@ struct InternalDefaultRealtimeObjectsTests {

// When: onChannelAttached is called with hasObjects = false
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// Then: All sync data should be discarded
Expand Down Expand Up @@ -565,7 +611,7 @@ struct InternalDefaultRealtimeObjectsTests {

// When: onChannelAttached is called with hasObjects = false
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// Then: Should still reset the pool correctly
Expand All @@ -583,7 +629,7 @@ struct InternalDefaultRealtimeObjectsTests {

// When: onChannelAttached is called with hasObjects = false
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// Then: The new root should be properly initialized
Expand Down Expand Up @@ -611,7 +657,7 @@ struct InternalDefaultRealtimeObjectsTests {

// Complete sync via ATTACHED with HAS_OBJECTS false (RTO4b)
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// getRoot should now complete
Expand Down Expand Up @@ -736,7 +782,7 @@ struct InternalDefaultRealtimeObjectsTests {

// Complete sync first
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// getRoot should return
Expand All @@ -761,7 +807,7 @@ struct InternalDefaultRealtimeObjectsTests {

// Complete sync first
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false /* value of `resumed` is arbitrary */ )
}

// Call getRoot
Expand Down Expand Up @@ -1602,7 +1648,8 @@ struct InternalDefaultRealtimeObjectsTests {
for channelEvent in scenario.channelEvents {
switch channelEvent {
case let .attached(hasObjects):
realtimeObjects.nosync_onChannelAttached(hasObjects: hasObjects)
// The value of `resumed` is arbitrary for these tests; they're focused on sync state transitions
realtimeObjects.nosync_onChannelAttached(hasObjects: hasObjects, resumed: true)
case let .objectSync(channelSerial):
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
objectMessages: [],
Expand Down
Loading
Loading