Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
51e3f9f
Switch to a new internal type for LiveMap data values
lawrence-forooghian Jul 18, 2025
1998057
Inject a clock into our LiveObjects
lawrence-forooghian Jul 18, 2025
94efe5a
Add new serialTimestamp fields
lawrence-forooghian Jul 18, 2025
4724a05
Add placeholder protocol for polymorphic LiveObject functionality
lawrence-forooghian Jul 18, 2025
ec37cca
Make initialValue a string
lawrence-forooghian Jul 24, 2025
842d9e4
Expose LiveObjects' ID internally
lawrence-forooghian Jul 24, 2025
c74a23b
Expose mergeInitialValue methods internally
lawrence-forooghian Jul 24, 2025
f62a457
Expose public types' proxied objects internally
lawrence-forooghian Jul 24, 2025
25455fd
Add an AsyncSequence API for subscribing to LiveObject updates
lawrence-forooghian Jul 29, 2025
37e14d7
Add Equatable conformance to (Internal)LiveMapValue
lawrence-forooghian Jul 29, 2025
76ca498
Add logging to PublicObjectsStore
lawrence-forooghian Jul 29, 2025
116ee63
Allow SyncObjectsPool entries to contain additional data
lawrence-forooghian Jul 21, 2025
29aa9fc
Accept a serialTimestamp in replaceData and applySyncObjectsPool
lawrence-forooghian Jul 21, 2025
951473c
Pull the RTLM14d "is map entry tombstoned?" check into method
lawrence-forooghian Jul 22, 2025
83de58a
Bump ably-cocoa to latest commit on LiveObjects integration branch
lawrence-forooghian Jul 29, 2025
b701b46
Create a PartialObjectOperation type
lawrence-forooghian Jul 30, 2025
2d132ef
Extract operations' channel state validation to helper
lawrence-forooghian Jul 30, 2025
bbf8229
Add placeholder for spec's #publish method
lawrence-forooghian Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions Sources/AblyLiveObjects/Internal/CoreSDK.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ internal import AblyPlugin
///
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to AblyPlugin's Objective-C API (i.e. boxing).
internal protocol CoreSDK: AnyObject, Sendable {
func sendObject(objectMessages: [OutboundObjectMessage]) async throws(InternalError)
/// Implements the internal `#publish` method of RTO15.
func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError)

/// Returns the current state of the Realtime channel that this wraps.
var channelState: ARTRealtimeChannelState { get }
Expand All @@ -28,7 +29,8 @@ internal final class DefaultCoreSDK: CoreSDK {

// MARK: - CoreSDK conformance

internal func sendObject(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
internal func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
// TODO: Implement the full spec of RTO15 (https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/47)
try await DefaultInternalPlugin.sendObject(
objectMessages: objectMessages,
channel: channel,
Expand All @@ -40,3 +42,28 @@ internal final class DefaultCoreSDK: CoreSDK {
channel.state
}
}

// MARK: - Channel State Validation

/// Extension on CoreSDK to provide channel state validation utilities.
internal extension CoreSDK {
/// Validates that the channel is not in any of the specified invalid states.
///
/// - Parameters:
/// - invalidStates: Array of channel states that are considered invalid for the operation
/// - operationDescription: A description of the operation being performed, used in error messages
/// - Throws: `ARTErrorInfo` with code 90001 and statusCode 400 if the channel is in any of the invalid states
func validateChannelState(
notIn invalidStates: [ARTRealtimeChannelState],
operationDescription: String,
) throws(ARTErrorInfo) {
let currentChannelState = channelState
if invalidStates.contains(currentChannelState) {
throw LiveObjectsError.objectsOperationFailedInvalidChannelState(
operationDescription: operationDescription,
channelState: currentChannelState,
)
.toARTErrorInfo()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal final class DefaultInternalPlugin: NSObject, AblyPlugin.LiveObjectsInte
let callbackQueue = pluginAPI.callbackQueue(for: client)

logger.log("LiveObjects.DefaultInternalPlugin received prepare(_:)", level: .debug)
let liveObjects = InternalDefaultRealtimeObjects(logger: logger, userCallbackQueue: callbackQueue)
let liveObjects = InternalDefaultRealtimeObjects(logger: logger, userCallbackQueue: callbackQueue, clock: DefaultSimpleClock())
pluginAPI.setPluginDataValue(liveObjects, forKey: Self.pluginDataKey, channel: channel)
}

Expand Down
94 changes: 55 additions & 39 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,43 @@ internal final class InternalDefaultLiveCounter: Sendable {

internal var testsOnly_siteTimeserials: [String: String] {
mutex.withLock {
mutableState.liveObject.siteTimeserials
mutableState.liveObjectMutableState.siteTimeserials
}
}

internal var testsOnly_createOperationIsMerged: Bool {
mutex.withLock {
mutableState.liveObject.createOperationIsMerged
}
}

internal var testsOnly_objectID: String {
mutex.withLock {
mutableState.liveObject.objectID
mutableState.liveObjectMutableState.createOperationIsMerged
}
}

private let logger: AblyPlugin.Logger
private let userCallbackQueue: DispatchQueue
private let clock: SimpleClock

// MARK: - Initialization

internal convenience init(
testsOnly_data data: Double,
objectID: String,
logger: AblyPlugin.Logger,
userCallbackQueue: DispatchQueue
userCallbackQueue: DispatchQueue,
clock: SimpleClock
) {
self.init(data: data, objectID: objectID, logger: logger, userCallbackQueue: userCallbackQueue)
self.init(data: data, objectID: objectID, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
}

private init(
data: Double,
objectID: String,
logger: AblyPlugin.Logger,
userCallbackQueue: DispatchQueue
userCallbackQueue: DispatchQueue,
clock: SimpleClock
) {
mutableState = .init(liveObject: .init(objectID: objectID), data: data)
mutableState = .init(liveObjectMutableState: .init(objectID: objectID), data: data)
self.logger = logger
self.userCallbackQueue = userCallbackQueue
self.clock = clock
}

/// Creates a "zero-value LiveCounter", per RTLC4.
Expand All @@ -60,27 +58,30 @@ internal final class InternalDefaultLiveCounter: Sendable {
objectID: String,
logger: AblyPlugin.Logger,
userCallbackQueue: DispatchQueue,
clock: SimpleClock,
) -> Self {
.init(
data: 0,
objectID: objectID,
logger: logger,
userCallbackQueue: userCallbackQueue,
clock: clock,
)
}

// MARK: - Data access

internal var objectID: String {
mutex.withLock {
mutableState.liveObjectMutableState.objectID
}
}

// MARK: - Internal methods that back LiveCounter conformance

internal func value(coreSDK: CoreSDK) throws(ARTErrorInfo) -> Double {
// RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
let currentChannelState = coreSDK.channelState
if currentChannelState == .detached || currentChannelState == .failed {
throw LiveObjectsError.objectsOperationFailedInvalidChannelState(
operationDescription: "LiveCounter.value",
channelState: currentChannelState,
)
.toARTErrorInfo()
}
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveCounter.value")

return mutex.withLock {
// RTLC5c
Expand All @@ -100,21 +101,21 @@ internal final class InternalDefaultLiveCounter: Sendable {
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<DefaultLiveCounterUpdate>, coreSDK: CoreSDK) throws(ARTErrorInfo) -> any SubscribeResponse {
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
// swiftlint:disable:next trailing_closure
try mutableState.liveObject.subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] action in
try mutableState.liveObjectMutableState.subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] action in
guard let self else {
return
}

mutex.withLock {
action(&mutableState.liveObject)
action(&mutableState.liveObjectMutableState)
}
})
}
}

internal func unsubscribeAll() {
mutex.withLock {
mutableState.liveObject.unsubscribeAll()
mutableState.liveObjectMutableState.unsubscribeAll()
}
}

Expand All @@ -134,21 +135,27 @@ internal final class InternalDefaultLiveCounter: Sendable {
/// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
internal func emit(_ update: LiveObjectUpdate<DefaultLiveCounterUpdate>) {
mutex.withLock {
mutableState.liveObject.emit(update, on: userCallbackQueue)
mutableState.liveObjectMutableState.emit(update, on: userCallbackQueue)
}
}

// MARK: - Data manipulation

/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
internal func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
///
/// - Parameters:
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this counter.
internal func replaceData(
using state: ObjectState,
objectMessageSerialTimestamp: Date?,
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
mutex.withLock {
mutableState.replaceData(using: state)
mutableState.replaceData(using: state, objectMessageSerialTimestamp: objectMessageSerialTimestamp)
}
}

/// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
internal func testsOnly_mergeInitialValue(from operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
/// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
internal func mergeInitialValue(from operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
mutex.withLock {
mutableState.mergeInitialValue(from: operation)
}
Expand All @@ -173,13 +180,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
) {
mutex.withLock {
mutableState.apply(
operation,
objectMessageSerial: objectMessageSerial,
objectMessageSiteCode: objectMessageSiteCode,
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
objectsPool: &objectsPool,
logger: logger,
userCallbackQueue: userCallbackQueue,
Expand All @@ -189,20 +198,26 @@ internal final class InternalDefaultLiveCounter: Sendable {

// MARK: - Mutable state and the operations that affect it

private struct MutableState {
private struct MutableState: InternalLiveObject {
/// The mutable state common to all LiveObjects.
internal var liveObject: LiveObjectMutableState<DefaultLiveCounterUpdate>
internal var liveObjectMutableState: LiveObjectMutableState<DefaultLiveCounterUpdate>

/// The internal data that this map holds, per RTLC3.
internal var data: Double

/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
internal mutating func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
///
/// - Parameters:
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this counter.
internal mutating func replaceData(
using state: ObjectState,
objectMessageSerialTimestamp: Date?,
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
liveObject.siteTimeserials = state.siteTimeserials
liveObjectMutableState.siteTimeserials = state.siteTimeserials

// RTLC6b: Set the private flag createOperationIsMerged to false
liveObject.createOperationIsMerged = false
liveObjectMutableState.createOperationIsMerged = false

// RTLC6c: Set data to the value of ObjectState.counter.count, or to 0 if it does not exist
data = state.counter?.count?.doubleValue ?? 0
Expand Down Expand Up @@ -231,7 +246,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
}

// RTLC10b: Set the private flag createOperationIsMerged to true
liveObject.createOperationIsMerged = true
liveObjectMutableState.createOperationIsMerged = true

return update
}
Expand All @@ -241,18 +256,19 @@ internal final class InternalDefaultLiveCounter: Sendable {
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
logger: Logger,
userCallbackQueue: DispatchQueue,
) {
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
// RTLC7b
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
return
}

// RTLC7c
liveObject.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial

switch operation.action {
case .known(.counterCreate):
Expand All @@ -262,12 +278,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
logger: logger,
)
// RTLC7d1a
liveObject.emit(update, on: userCallbackQueue)
liveObjectMutableState.emit(update, on: userCallbackQueue)
case .known(.counterInc):
// RTLC7d2
let update = applyCounterIncOperation(operation.counterOp)
// RTLC7d2a
liveObject.emit(update, on: userCallbackQueue)
liveObjectMutableState.emit(update, on: userCallbackQueue)
default:
// RTLC7d3
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
Expand All @@ -279,7 +295,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
_ operation: ObjectOperation,
logger: Logger,
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
if liveObject.createOperationIsMerged {
if liveObjectMutableState.createOperationIsMerged {
// RTLC8b
logger.log("Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied", level: .warn)
return .noop
Expand Down
Loading