Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions Sources/AblyLiveObjects/Internal/CoreSDK.swift
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
internal import _AblyPluginSupportPrivate
import Ably

/// Result of a successful publish operation (RTO15).
/// Contains information from the ACK needed for apply-on-ACK (RTO20).
/// Corresponds to a single `PublishResult` in the ACK's `res` array.
internal struct PublishResult: Sendable {
/// The message serials from the ACK `res[i].serials` property.
/// Each element corresponds 1:1 to the messages that were published.
/// A serial may be nil if the message was discarded due to a configured conflation rule.
let serials: [String?]
}

/// The API that the internal components of the SDK (that is, `DefaultLiveObjects` and down) use to interact with our core SDK (i.e. ably-cocoa).
///
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
internal protocol CoreSDK: AnyObject, Sendable {
/// Implements the internal `#publish` method of RTO15.
func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo)
/// Returns a `PublishResult` containing the serial from the ACK for apply-on-ACK (RTO20).
@discardableResult
func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult

/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
func fetchServerTime() async throws(ARTErrorInfo) -> Date

/// Replaces the implementation of ``publish(objectMessages:)``.
///
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult)

/// Returns the current state of the Realtime channel that this wraps.
var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState { get }
Expand All @@ -34,7 +46,7 @@ internal final class DefaultCoreSDK: CoreSDK {
/// This enables the `testsOnly_overridePublish(with:)` test hook.
///
/// - Note: This should be `throws(ARTErrorInfo)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> PublishResult)?

internal init(
channel: _AblyPluginSupportPrivate.RealtimeChannel,
Expand All @@ -50,7 +62,8 @@ internal final class DefaultCoreSDK: CoreSDK {

// MARK: - CoreSDK conformance

internal func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) {
@discardableResult
internal func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult {
logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)

// Use the overridden implementation if supplied
Expand All @@ -59,26 +72,25 @@ internal final class DefaultCoreSDK: CoreSDK {
}
if let overriddenImplementation {
do {
try await overriddenImplementation(objectMessages)
return try await overriddenImplementation(objectMessages)
} catch {
guard let artErrorInfo = error as? ARTErrorInfo else {
preconditionFailure("Expected ARTErrorInfo, got \(error)")
}
throw artErrorInfo
}
return
}

// TODO: Implement message size checking (https://github.com/ably/ably-liveobjects-swift-plugin/issues/13)
try await DefaultInternalPlugin.sendObject(
return try await DefaultInternalPlugin.sendObject(
objectMessages: objectMessages,
channel: channel,
client: client,
pluginAPI: pluginAPI,
)
}

internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) {
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) {
mutex.withLock {
overriddenPublishImplementation = newImplementation
}
Expand Down
21 changes: 18 additions & 3 deletions Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.

// RTO10b
nosync_realtimeObjects(for: channel).nosync_setGarbageCollectionGracePeriod(gracePeriod)

// CD2j: Store siteCode for apply-on-ACK (RTO20c)
// TODO: Uncomment once siteCode is added to ConnectionDetailsProtocol in ably-cocoa-plugin-support
// and parsed from connectionDetails in ably-cocoa.
// if let siteCode = connectionDetails?.siteCode {
// nosync_realtimeObjects(for: channel).nosync_setSiteCode(siteCode)
// }
}

// MARK: - Sending `OBJECT` ProtocolMessage
Expand All @@ -157,13 +164,18 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
channel: _AblyPluginSupportPrivate.RealtimeChannel,
client: _AblyPluginSupportPrivate.RealtimeClient,
pluginAPI: PluginAPIProtocol,
) async throws(ARTErrorInfo) {
) async throws(ARTErrorInfo) -> PublishResult {
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }

try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
return try await withCheckedContinuation { (continuation: CheckedContinuation<Result<PublishResult, ARTErrorInfo>, _>) in
let internalQueue = pluginAPI.internalQueue(for: client)

internalQueue.async {
// RTO20b: The callback should include `serials` from the ACK `res[0].serials` property.
// TODO: Update ably-cocoa to pass the serials in the callback per the spec prereqs:
// - APPluginAPI.h: Change nosync_sendObject completion handler to include serials array
// - ARTPluginAPI.m: Parse serials from ACK res[0].serials and pass to callback
// Until those changes are made, serials will be an array of nils.
pluginAPI.nosync_sendObject(
withObjectMessages: objectMessageBoxes,
channel: channel,
Expand All @@ -174,7 +186,10 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
if let error {
continuation.resume(returning: .failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
} else {
continuation.resume(returning: .success(()))
// TODO: Extract serials from callback once ably-cocoa is updated
// For now, return an array of nils with the same count as messages sent
let serials: [String?] = Array(repeating: nil, count: objectMessageBoxes.count)
continuation.resume(returning: .success(PublishResult(serials: serials)))
}
}
}
Expand Down
25 changes: 18 additions & 7 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
}
}

internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
internal func increment(amount: Double, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) {
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
// RTLC12c
try coreSDK.nosync_validateChannelState(
Expand All @@ -130,13 +130,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
)
}

// RTLC12f
try await coreSDK.publish(objectMessages: [objectMessage])
// RTO20: Publish and apply locally on ACK (RTLC12f)
try await realtimeObjects.publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK)
}

internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
internal func decrement(amount: Double, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) {
// RTLC13b
try await increment(amount: -amount, coreSDK: coreSDK)
try await increment(amount: -amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects)
}

@discardableResult
Expand Down Expand Up @@ -237,12 +237,16 @@ internal final class InternalDefaultLiveCounter: Sendable {
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
///
/// - Parameters:
/// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK).
internal func nosync_apply(
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
skipSiteTimeserialsUpdate: Bool,
) {
mutableStateMutex.withoutSync { mutableState in
mutableState.apply(
Expand All @@ -251,6 +255,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
objectMessageSiteCode: objectMessageSiteCode,
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
objectsPool: &objectsPool,
skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate,
logger: logger,
clock: clock,
userCallbackQueue: userCallbackQueue,
Expand Down Expand Up @@ -371,12 +376,16 @@ internal final class InternalDefaultLiveCounter: Sendable {
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
///
/// - Parameters:
/// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK).
internal mutating func apply(
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
skipSiteTimeserialsUpdate: Bool,
logger: Logger,
clock: SimpleClock,
userCallbackQueue: DispatchQueue,
Expand All @@ -387,8 +396,10 @@ internal final class InternalDefaultLiveCounter: Sendable {
return
}

// RTLC7c
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
// RTLC7c (RTO20h: skip for apply-on-ACK)
if !skipSiteTimeserialsUpdate {
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
}

// RTLC7e
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
Expand Down
27 changes: 20 additions & 7 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ internal final class InternalDefaultLiveMap: Sendable {
try entries(coreSDK: coreSDK, delegate: delegate).map(\.value)
}

internal func set(key: String, value: InternalLiveMapValue, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
internal func set(key: String, value: InternalLiveMapValue, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) {
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
// RTLM20c
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.set")
Expand All @@ -180,10 +180,11 @@ internal final class InternalDefaultLiveMap: Sendable {
)
}

try await coreSDK.publish(objectMessages: [objectMessage])
// RTO20: Publish and apply locally on ACK
try await realtimeObjects.publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK)
}

internal func remove(key: String, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
internal func remove(key: String, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) {
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
// RTLM21c
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.remove")
Expand All @@ -202,8 +203,8 @@ internal final class InternalDefaultLiveMap: Sendable {
)
}

// RTLM21f
try await coreSDK.publish(objectMessages: [objectMessage])
// RTO20: Publish and apply locally on ACK (RTLM21f)
try await realtimeObjects.publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK)
}

@discardableResult
Expand Down Expand Up @@ -315,12 +316,16 @@ internal final class InternalDefaultLiveMap: Sendable {
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLM15.
///
/// - Parameters:
/// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK).
internal func nosync_apply(
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
skipSiteTimeserialsUpdate: Bool,
) {
mutableStateMutex.withoutSync { mutableState in
mutableState.apply(
Expand All @@ -329,6 +334,7 @@ internal final class InternalDefaultLiveMap: Sendable {
objectMessageSiteCode: objectMessageSiteCode,
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
objectsPool: &objectsPool,
skipSiteTimeserialsUpdate: skipSiteTimeserialsUpdate,
logger: logger,
internalQueue: mutableStateMutex.dispatchQueue,
userCallbackQueue: userCallbackQueue,
Expand Down Expand Up @@ -574,12 +580,16 @@ internal final class InternalDefaultLiveMap: Sendable {
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLM15.
///
/// - Parameters:
/// - skipSiteTimeserialsUpdate: If true, skip updating siteTimeserials (RTO20h for apply-on-ACK).
internal mutating func apply(
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
skipSiteTimeserialsUpdate: Bool,
logger: Logger,
internalQueue: DispatchQueue,
userCallbackQueue: DispatchQueue,
Expand All @@ -591,8 +601,10 @@ internal final class InternalDefaultLiveMap: Sendable {
return
}

// RTLM15c
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
// RTLM15c (RTO20h: skip for apply-on-ACK)
if !skipSiteTimeserialsUpdate {
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
}

// RTLM15e
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
Expand Down Expand Up @@ -1003,5 +1015,6 @@ internal final class InternalDefaultLiveMap: Sendable {
// RTLM5d2g: Otherwise, return undefined/null
return nil
}

}
}
Loading
Loading