Skip to content
Merged
8 changes: 7 additions & 1 deletion Sources/AblyLiveObjects/Internal/CoreSDK.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@ internal final class DefaultCoreSDK: CoreSDK {
private let channel: AblyPlugin.RealtimeChannel
private let client: AblyPlugin.RealtimeClient
private let pluginAPI: PluginAPIProtocol
private let logger: AblyPlugin.Logger

internal init(
channel: AblyPlugin.RealtimeChannel,
client: AblyPlugin.RealtimeClient,
pluginAPI: PluginAPIProtocol
pluginAPI: PluginAPIProtocol,
logger: AblyPlugin.Logger
) {
self.channel = channel
self.client = client
self.pluginAPI = pluginAPI
self.logger = logger
}

// MARK: - CoreSDK conformance

internal func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)

// TODO: Implement the full spec of RTO15 (https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/47)
try await DefaultInternalPlugin.sendObject(
objectMessages: objectMessages,
channel: channel,
client: client,
pluginAPI: pluginAPI,
)
}
Expand Down
21 changes: 13 additions & 8 deletions Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,24 @@ internal final class DefaultInternalPlugin: NSObject, AblyPlugin.LiveObjectsInte
internal static func sendObject(
objectMessages: [OutboundObjectMessage],
channel: AblyPlugin.RealtimeChannel,
client: AblyPlugin.RealtimeClient,
pluginAPI: PluginAPIProtocol,
) async throws(InternalError) {
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }

try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, InternalError>, _>) in
pluginAPI.sendObject(
withObjectMessages: objectMessageBoxes,
channel: channel,
) { error in
if let error {
continuation.resume(returning: .failure(error.toInternalError()))
} else {
continuation.resume(returning: .success(()))
let internalQueue = pluginAPI.internalQueue(for: client)

internalQueue.async {
pluginAPI.sendObject(
withObjectMessages: objectMessageBoxes,
channel: channel,
) { error in
if let error {
continuation.resume(returning: .failure(error.toInternalError()))
} else {
continuation.resume(returning: .success(()))
}
}
}
}.get()
Expand Down
16 changes: 10 additions & 6 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
// MARK: - Internal methods that back LiveCounter conformance

internal func value(coreSDK: CoreSDK) throws(ARTErrorInfo) -> Double {
// RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveCounter.value")

return mutex.withLock {
// RTLC5c
mutableState.data
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
try mutableState.value(coreSDK: coreSDK)
}
}

Expand Down Expand Up @@ -419,5 +415,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
// RTLC4
data = 0
}

internal func value(coreSDK: CoreSDK) throws(ARTErrorInfo) -> Double {
// RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveCounter.value")

// RTLC5c
return data
}
}
}
223 changes: 117 additions & 106 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,56 +110,20 @@ internal final class InternalDefaultLiveMap: Sendable {

/// Returns the value associated with a given key, following RTLM5d specification.
internal func get(key: String, coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> InternalLiveMapValue? {
// RTLM5c: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.get")

// RTLM5e - Return nil if self is tombstone
if isTombstone {
return nil
}

let entry = mutex.withLock {
mutableState.data[key]
}

// RTLM5d1: If no ObjectsMapEntry exists at the key, return undefined/null
guard let entry else {
return nil
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
try mutableState.get(key: key, coreSDK: coreSDK, delegate: delegate)
}

// RTLM5d2: If a ObjectsMapEntry exists at the key, convert it using the shared logic
return convertEntryToLiveMapValue(entry, delegate: delegate)
}

internal func size(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> Int {
// RTLM10c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.size")

return mutex.withLock {
// RTLM10d: Returns the number of non-tombstoned entries (per RTLM14) in the internal data map
mutableState.data.values.count { entry in
!Self.isEntryTombstoned(entry, delegate: delegate)
}
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
try mutableState.size(coreSDK: coreSDK, delegate: delegate)
}
}

internal func entries(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> [(key: String, value: InternalLiveMapValue)] {
// RTLM11c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.entries")

return mutex.withLock {
// RTLM11d: Returns key-value pairs from the internal data map
// RTLM11d1: Pairs with tombstoned entries (per RTLM14) are not returned
var result: [(key: String, value: InternalLiveMapValue)] = []

for (key, entry) in mutableState.data where !Self.isEntryTombstoned(entry, delegate: delegate) {
// Convert entry to LiveMapValue using the same logic as get(key:)
if let value = convertEntryToLiveMapValue(entry, delegate: delegate) {
result.append((key: key, value: value))
}
}

return result
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
try mutableState.entries(coreSDK: coreSDK, delegate: delegate)
}
}

Expand Down Expand Up @@ -655,7 +619,7 @@ internal final class InternalDefaultLiveMap: Sendable {
internal mutating func applyMapSetOperation(
key: String,
operationTimeserial: String?,
operationData: ObjectData,
operationData: ObjectData?,
objectsPool: inout ObjectsPool,
logger: AblyPlugin.Logger,
userCallbackQueue: DispatchQueue,
Expand Down Expand Up @@ -684,7 +648,7 @@ internal final class InternalDefaultLiveMap: Sendable {
}

// RTLM7c: If the operation has a non-empty ObjectData.objectId attribute
if let objectId = operationData.objectId, !objectId.isEmpty {
if let objectId = operationData?.objectId, !objectId.isEmpty {
// RTLM7c1: Create a zero-value LiveObject in the internal ObjectsPool per RTO6
_ = objectsPool.createZeroValueObject(forObjectID: objectId, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
}
Expand Down Expand Up @@ -721,7 +685,7 @@ internal final class InternalDefaultLiveMap: Sendable {
// RTLM8a2c: Set ObjectsMapEntry.tombstone to true (equivalent to next point)
// RTLM8a2d: Set ObjectsMapEntry.tombstonedAt per RTLM8a2d
var updatedEntry = existingEntry
updatedEntry.data = ObjectData()
updatedEntry.data = nil
updatedEntry.timeserial = operationTimeserial
updatedEntry.tombstonedAt = tombstonedAt
data[key] = updatedEntry
Expand All @@ -730,7 +694,7 @@ internal final class InternalDefaultLiveMap: Sendable {
// RTLM8b1: Create a new entry in data for the specified key, with ObjectsMapEntry.data set to undefined/null and the operation's serial
// RTLM8b2: Set ObjectsMapEntry.tombstone for the new entry to true
// RTLM8b3: Set ObjectsMapEntry.tombstonedAt per RTLM8f
data[key] = InternalObjectsMapEntry(tombstonedAt: tombstonedAt, timeserial: operationTimeserial, data: ObjectData())
data[key] = InternalObjectsMapEntry(tombstonedAt: tombstonedAt, timeserial: operationTimeserial, data: nil)
}

return .update(DefaultLiveMapUpdate(update: [key: .removed]))
Expand Down Expand Up @@ -843,90 +807,137 @@ internal final class InternalDefaultLiveMap: Sendable {
return !shouldRelease
}
}
}

// MARK: - Helper Methods
/// Returns the value associated with a given key, following RTLM5d specification.
internal func get(key: String, coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> InternalLiveMapValue? {
// RTLM5c: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.get")

/// Returns whether a map entry should be considered tombstoned, per the check described in RTLM14.
private static func isEntryTombstoned(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> Bool {
// RTLM14a
if entry.tombstone {
return true
}
// RTLM5e - Return nil if self is tombstone
if liveObjectMutableState.isTombstone {
return nil
}

// RTLM14c
if let objectId = entry.data.objectId {
if let poolEntry = delegate.getObjectFromPool(id: objectId), poolEntry.isTombstone {
return true
// RTLM5d1: If no ObjectsMapEntry exists at the key, return undefined/null
guard let entry = data[key] else {
return nil
}

// RTLM5d2: If a ObjectsMapEntry exists at the key, convert it using the shared logic
return convertEntryToLiveMapValue(entry, delegate: delegate)
}

// RTLM14b
return false
}
internal func size(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> Int {
// RTLM10c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.size")

/// Converts an InternalObjectsMapEntry to LiveMapValue using the same logic as get(key:)
/// This is used by entries to ensure consistent value conversion
private func convertEntryToLiveMapValue(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> InternalLiveMapValue? {
// RTLM5d2a: If ObjectsMapEntry.tombstone is true, return undefined/null
if entry.tombstone == true {
return nil
// RTLM10d: Returns the number of non-tombstoned entries (per RTLM14) in the internal data map
return data.values.count { entry in
!Self.isEntryTombstoned(entry, delegate: delegate)
}
}

// Handle primitive values in the order specified by RTLM5d2b through RTLM5d2e
internal func entries(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> [(key: String, value: InternalLiveMapValue)] {
// RTLM11c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001
try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.entries")

// RTLM5d2b: If ObjectsMapEntry.data.boolean exists, return it
if let boolean = entry.data.boolean {
return .primitive(.bool(boolean))
}
// RTLM11d: Returns key-value pairs from the internal data map
// RTLM11d1: Pairs with tombstoned entries (per RTLM14) are not returned
var result: [(key: String, value: InternalLiveMapValue)] = []

// RTLM5d2c: If ObjectsMapEntry.data.bytes exists, return it
if let bytes = entry.data.bytes {
return .primitive(.data(bytes))
}
for (key, entry) in data where !Self.isEntryTombstoned(entry, delegate: delegate) {
// Convert entry to LiveMapValue using the same logic as get(key:)
if let value = convertEntryToLiveMapValue(entry, delegate: delegate) {
result.append((key: key, value: value))
}
}

// RTLM5d2d: If ObjectsMapEntry.data.number exists, return it
if let number = entry.data.number {
return .primitive(.number(number.doubleValue))
return result
}

// RTLM5d2e: If ObjectsMapEntry.data.string exists, return it
if let string = entry.data.string {
return .primitive(.string(string))
}
// MARK: - Helper Methods

// TODO: Needs specification (see https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/46)
if let json = entry.data.json {
switch json {
case let .array(array):
return .primitive(.jsonArray(array))
case let .object(object):
return .primitive(.jsonObject(object))
/// Returns whether a map entry should be considered tombstoned, per the check described in RTLM14.
private static func isEntryTombstoned(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> Bool {
// RTLM14a
if entry.tombstone {
return true
}

// RTLM14c
if let objectId = entry.data?.objectId {
if let poolEntry = delegate.getObjectFromPool(id: objectId), poolEntry.isTombstone {
return true
}
}

// RTLM14b
return false
}

// RTLM5d2f: If ObjectsMapEntry.data.objectId exists, get the object stored at that objectId from the internal ObjectsPool
if let objectId = entry.data.objectId {
// RTLM5d2f1: If an object with id objectId does not exist, return undefined/null
guard let poolEntry = delegate.getObjectFromPool(id: objectId) else {
/// Converts an InternalObjectsMapEntry to LiveMapValue using the same logic as get(key:)
/// This is used by entries to ensure consistent value conversion
private func convertEntryToLiveMapValue(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> InternalLiveMapValue? {
// RTLM5d2a: If ObjectsMapEntry.tombstone is true, return undefined/null
if entry.tombstone == true {
return nil
}

// RTLM5d2f3: If referenced object is tombstoned, return nil
if poolEntry.isTombstone {
return nil
// Handle primitive values in the order specified by RTLM5d2b through RTLM5d2e

// RTLM5d2b: If ObjectsMapEntry.data.boolean exists, return it
if let boolean = entry.data?.boolean {
return .primitive(.bool(boolean))
}

// RTLM5d2f2: Return referenced object
switch poolEntry {
case let .map(map):
return .liveMap(map)
case let .counter(counter):
return .liveCounter(counter)
// RTLM5d2c: If ObjectsMapEntry.data.bytes exists, return it
if let bytes = entry.data?.bytes {
return .primitive(.data(bytes))
}

// RTLM5d2d: If ObjectsMapEntry.data.number exists, return it
if let number = entry.data?.number {
return .primitive(.number(number.doubleValue))
}

// RTLM5d2e: If ObjectsMapEntry.data.string exists, return it
if let string = entry.data?.string {
return .primitive(.string(string))
}

// TODO: Needs specification (see https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/46)
if let json = entry.data?.json {
switch json {
case let .array(array):
return .primitive(.jsonArray(array))
case let .object(object):
return .primitive(.jsonObject(object))
}
}
}

// RTLM5d2g: Otherwise, return undefined/null
return nil
// RTLM5d2f: If ObjectsMapEntry.data.objectId exists, get the object stored at that objectId from the internal ObjectsPool
if let objectId = entry.data?.objectId {
// RTLM5d2f1: If an object with id objectId does not exist, return undefined/null
guard let poolEntry = delegate.getObjectFromPool(id: objectId) else {
return nil
}

// RTLM5d2f3: If referenced object is tombstoned, return nil
if poolEntry.isTombstone {
return nil
}

// RTLM5d2f2: Return referenced object
switch poolEntry {
case let .map(map):
return .liveMap(map)
case let .counter(counter):
return .liveCounter(counter)
}
}

// RTLM5d2g: Otherwise, return undefined/null
return nil
}
}
}
Loading