diff --git a/Sources/AblyLiveObjects/Internal/CoreSDK.swift b/Sources/AblyLiveObjects/Internal/CoreSDK.swift index 252e5b0e..bab61f69 100644 --- a/Sources/AblyLiveObjects/Internal/CoreSDK.swift +++ b/Sources/AblyLiveObjects/Internal/CoreSDK.swift @@ -16,24 +16,30 @@ internal final class DefaultCoreSDK: CoreSDK { private let channel: AblyPlugin.RealtimeChannel private let client: AblyPlugin.RealtimeClient private let pluginAPI: PluginAPIProtocol + private let logger: AblyPlugin.Logger internal init( channel: AblyPlugin.RealtimeChannel, client: AblyPlugin.RealtimeClient, - pluginAPI: PluginAPIProtocol + pluginAPI: PluginAPIProtocol, + logger: AblyPlugin.Logger ) { self.channel = channel self.client = client self.pluginAPI = pluginAPI + self.logger = logger } // MARK: - CoreSDK conformance internal func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError) { + logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug) + // TODO: Implement the full spec of RTO15 (https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/47) try await DefaultInternalPlugin.sendObject( objectMessages: objectMessages, channel: channel, + client: client, pluginAPI: pluginAPI, ) } diff --git a/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift b/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift index 9dd56e0f..3ca858be 100644 --- a/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift +++ b/Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift @@ -128,19 +128,24 @@ internal final class DefaultInternalPlugin: NSObject, AblyPlugin.LiveObjectsInte internal static func sendObject( objectMessages: [OutboundObjectMessage], channel: AblyPlugin.RealtimeChannel, + client: AblyPlugin.RealtimeClient, pluginAPI: PluginAPIProtocol, ) async throws(InternalError) { let objectMessageBoxes: [ObjectMessageBox] = objectMessages.map { .init(objectMessage: $0) } try await withCheckedContinuation { (continuation: CheckedContinuation, _>) in - pluginAPI.sendObject( - withObjectMessages: objectMessageBoxes, - channel: channel, - ) { error in - if let error { - continuation.resume(returning: .failure(error.toInternalError())) - } else { - continuation.resume(returning: .success(())) + let internalQueue = pluginAPI.internalQueue(for: client) + + internalQueue.async { + pluginAPI.sendObject( + withObjectMessages: objectMessageBoxes, + channel: channel, + ) { error in + if let error { + continuation.resume(returning: .failure(error.toInternalError())) + } else { + continuation.resume(returning: .success(())) + } } } }.get() diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift index 5cd3fd27..a5d7058a 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift @@ -80,12 +80,8 @@ internal final class InternalDefaultLiveCounter: Sendable { // MARK: - Internal methods that back LiveCounter conformance internal func value(coreSDK: CoreSDK) throws(ARTErrorInfo) -> Double { - // RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001 - try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveCounter.value") - - return mutex.withLock { - // RTLC5c - mutableState.data + try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in + try mutableState.value(coreSDK: coreSDK) } } @@ -419,5 +415,13 @@ internal final class InternalDefaultLiveCounter: Sendable { // RTLC4 data = 0 } + + internal func value(coreSDK: CoreSDK) throws(ARTErrorInfo) -> Double { + // RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001 + try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveCounter.value") + + // RTLC5c + return data + } } } diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift index 09546909..53321db9 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift @@ -110,56 +110,20 @@ internal final class InternalDefaultLiveMap: Sendable { /// Returns the value associated with a given key, following RTLM5d specification. internal func get(key: String, coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> InternalLiveMapValue? { - // RTLM5c: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001 - try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.get") - - // RTLM5e - Return nil if self is tombstone - if isTombstone { - return nil - } - - let entry = mutex.withLock { - mutableState.data[key] - } - - // RTLM5d1: If no ObjectsMapEntry exists at the key, return undefined/null - guard let entry else { - return nil + try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in + try mutableState.get(key: key, coreSDK: coreSDK, delegate: delegate) } - - // RTLM5d2: If a ObjectsMapEntry exists at the key, convert it using the shared logic - return convertEntryToLiveMapValue(entry, delegate: delegate) } internal func size(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> Int { - // RTLM10c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001 - try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.size") - - return mutex.withLock { - // RTLM10d: Returns the number of non-tombstoned entries (per RTLM14) in the internal data map - mutableState.data.values.count { entry in - !Self.isEntryTombstoned(entry, delegate: delegate) - } + try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in + try mutableState.size(coreSDK: coreSDK, delegate: delegate) } } internal func entries(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> [(key: String, value: InternalLiveMapValue)] { - // RTLM11c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001 - try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.entries") - - return mutex.withLock { - // RTLM11d: Returns key-value pairs from the internal data map - // RTLM11d1: Pairs with tombstoned entries (per RTLM14) are not returned - var result: [(key: String, value: InternalLiveMapValue)] = [] - - for (key, entry) in mutableState.data where !Self.isEntryTombstoned(entry, delegate: delegate) { - // Convert entry to LiveMapValue using the same logic as get(key:) - if let value = convertEntryToLiveMapValue(entry, delegate: delegate) { - result.append((key: key, value: value)) - } - } - - return result + try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in + try mutableState.entries(coreSDK: coreSDK, delegate: delegate) } } @@ -655,7 +619,7 @@ internal final class InternalDefaultLiveMap: Sendable { internal mutating func applyMapSetOperation( key: String, operationTimeserial: String?, - operationData: ObjectData, + operationData: ObjectData?, objectsPool: inout ObjectsPool, logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, @@ -684,7 +648,7 @@ internal final class InternalDefaultLiveMap: Sendable { } // RTLM7c: If the operation has a non-empty ObjectData.objectId attribute - if let objectId = operationData.objectId, !objectId.isEmpty { + if let objectId = operationData?.objectId, !objectId.isEmpty { // RTLM7c1: Create a zero-value LiveObject in the internal ObjectsPool per RTO6 _ = objectsPool.createZeroValueObject(forObjectID: objectId, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock) } @@ -721,7 +685,7 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM8a2c: Set ObjectsMapEntry.tombstone to true (equivalent to next point) // RTLM8a2d: Set ObjectsMapEntry.tombstonedAt per RTLM8a2d var updatedEntry = existingEntry - updatedEntry.data = ObjectData() + updatedEntry.data = nil updatedEntry.timeserial = operationTimeserial updatedEntry.tombstonedAt = tombstonedAt data[key] = updatedEntry @@ -730,7 +694,7 @@ internal final class InternalDefaultLiveMap: Sendable { // RTLM8b1: Create a new entry in data for the specified key, with ObjectsMapEntry.data set to undefined/null and the operation's serial // RTLM8b2: Set ObjectsMapEntry.tombstone for the new entry to true // RTLM8b3: Set ObjectsMapEntry.tombstonedAt per RTLM8f - data[key] = InternalObjectsMapEntry(tombstonedAt: tombstonedAt, timeserial: operationTimeserial, data: ObjectData()) + data[key] = InternalObjectsMapEntry(tombstonedAt: tombstonedAt, timeserial: operationTimeserial, data: nil) } return .update(DefaultLiveMapUpdate(update: [key: .removed])) @@ -843,90 +807,137 @@ internal final class InternalDefaultLiveMap: Sendable { return !shouldRelease } } - } - // MARK: - Helper Methods + /// Returns the value associated with a given key, following RTLM5d specification. + internal func get(key: String, coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> InternalLiveMapValue? { + // RTLM5c: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001 + try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.get") - /// Returns whether a map entry should be considered tombstoned, per the check described in RTLM14. - private static func isEntryTombstoned(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> Bool { - // RTLM14a - if entry.tombstone { - return true - } + // RTLM5e - Return nil if self is tombstone + if liveObjectMutableState.isTombstone { + return nil + } - // RTLM14c - if let objectId = entry.data.objectId { - if let poolEntry = delegate.getObjectFromPool(id: objectId), poolEntry.isTombstone { - return true + // RTLM5d1: If no ObjectsMapEntry exists at the key, return undefined/null + guard let entry = data[key] else { + return nil } + + // RTLM5d2: If a ObjectsMapEntry exists at the key, convert it using the shared logic + return convertEntryToLiveMapValue(entry, delegate: delegate) } - // RTLM14b - return false - } + internal func size(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> Int { + // RTLM10c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001 + try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.size") - /// Converts an InternalObjectsMapEntry to LiveMapValue using the same logic as get(key:) - /// This is used by entries to ensure consistent value conversion - private func convertEntryToLiveMapValue(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> InternalLiveMapValue? { - // RTLM5d2a: If ObjectsMapEntry.tombstone is true, return undefined/null - if entry.tombstone == true { - return nil + // RTLM10d: Returns the number of non-tombstoned entries (per RTLM14) in the internal data map + return data.values.count { entry in + !Self.isEntryTombstoned(entry, delegate: delegate) + } } - // Handle primitive values in the order specified by RTLM5d2b through RTLM5d2e + internal func entries(coreSDK: CoreSDK, delegate: LiveMapObjectPoolDelegate) throws(ARTErrorInfo) -> [(key: String, value: InternalLiveMapValue)] { + // RTLM11c: If the channel is in the DETACHED or FAILED state, the library should throw an ErrorInfo error with statusCode 400 and code 90001 + try coreSDK.validateChannelState(notIn: [.detached, .failed], operationDescription: "LiveMap.entries") - // RTLM5d2b: If ObjectsMapEntry.data.boolean exists, return it - if let boolean = entry.data.boolean { - return .primitive(.bool(boolean)) - } + // RTLM11d: Returns key-value pairs from the internal data map + // RTLM11d1: Pairs with tombstoned entries (per RTLM14) are not returned + var result: [(key: String, value: InternalLiveMapValue)] = [] - // RTLM5d2c: If ObjectsMapEntry.data.bytes exists, return it - if let bytes = entry.data.bytes { - return .primitive(.data(bytes)) - } + for (key, entry) in data where !Self.isEntryTombstoned(entry, delegate: delegate) { + // Convert entry to LiveMapValue using the same logic as get(key:) + if let value = convertEntryToLiveMapValue(entry, delegate: delegate) { + result.append((key: key, value: value)) + } + } - // RTLM5d2d: If ObjectsMapEntry.data.number exists, return it - if let number = entry.data.number { - return .primitive(.number(number.doubleValue)) + return result } - // RTLM5d2e: If ObjectsMapEntry.data.string exists, return it - if let string = entry.data.string { - return .primitive(.string(string)) - } + // MARK: - Helper Methods - // TODO: Needs specification (see https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/46) - if let json = entry.data.json { - switch json { - case let .array(array): - return .primitive(.jsonArray(array)) - case let .object(object): - return .primitive(.jsonObject(object)) + /// Returns whether a map entry should be considered tombstoned, per the check described in RTLM14. + private static func isEntryTombstoned(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> Bool { + // RTLM14a + if entry.tombstone { + return true } + + // RTLM14c + if let objectId = entry.data?.objectId { + if let poolEntry = delegate.getObjectFromPool(id: objectId), poolEntry.isTombstone { + return true + } + } + + // RTLM14b + return false } - // RTLM5d2f: If ObjectsMapEntry.data.objectId exists, get the object stored at that objectId from the internal ObjectsPool - if let objectId = entry.data.objectId { - // RTLM5d2f1: If an object with id objectId does not exist, return undefined/null - guard let poolEntry = delegate.getObjectFromPool(id: objectId) else { + /// Converts an InternalObjectsMapEntry to LiveMapValue using the same logic as get(key:) + /// This is used by entries to ensure consistent value conversion + private func convertEntryToLiveMapValue(_ entry: InternalObjectsMapEntry, delegate: LiveMapObjectPoolDelegate) -> InternalLiveMapValue? { + // RTLM5d2a: If ObjectsMapEntry.tombstone is true, return undefined/null + if entry.tombstone == true { return nil } - // RTLM5d2f3: If referenced object is tombstoned, return nil - if poolEntry.isTombstone { - return nil + // Handle primitive values in the order specified by RTLM5d2b through RTLM5d2e + + // RTLM5d2b: If ObjectsMapEntry.data.boolean exists, return it + if let boolean = entry.data?.boolean { + return .primitive(.bool(boolean)) } - // RTLM5d2f2: Return referenced object - switch poolEntry { - case let .map(map): - return .liveMap(map) - case let .counter(counter): - return .liveCounter(counter) + // RTLM5d2c: If ObjectsMapEntry.data.bytes exists, return it + if let bytes = entry.data?.bytes { + return .primitive(.data(bytes)) + } + + // RTLM5d2d: If ObjectsMapEntry.data.number exists, return it + if let number = entry.data?.number { + return .primitive(.number(number.doubleValue)) + } + + // RTLM5d2e: If ObjectsMapEntry.data.string exists, return it + if let string = entry.data?.string { + return .primitive(.string(string)) + } + + // TODO: Needs specification (see https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/46) + if let json = entry.data?.json { + switch json { + case let .array(array): + return .primitive(.jsonArray(array)) + case let .object(object): + return .primitive(.jsonObject(object)) + } } - } - // RTLM5d2g: Otherwise, return undefined/null - return nil + // RTLM5d2f: If ObjectsMapEntry.data.objectId exists, get the object stored at that objectId from the internal ObjectsPool + if let objectId = entry.data?.objectId { + // RTLM5d2f1: If an object with id objectId does not exist, return undefined/null + guard let poolEntry = delegate.getObjectFromPool(id: objectId) else { + return nil + } + + // RTLM5d2f3: If referenced object is tombstoned, return nil + if poolEntry.isTombstone { + return nil + } + + // RTLM5d2f2: Return referenced object + switch poolEntry { + case let .map(map): + return .liveMap(map) + case let .counter(counter): + return .liveCounter(counter) + } + } + + // RTLM5d2g: Otherwise, return undefined/null + return nil + } } } diff --git a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift index 91486f3d..2f8b494e 100644 --- a/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift +++ b/Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift @@ -389,7 +389,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool clock: SimpleClock, receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation, ) { - logger.log("handleObjectSyncProtocolMessage(objectMessages: \(objectMessages), protocolMessageChannelSerial: \(String(describing: protocolMessageChannelSerial)))", level: .debug) + logger.log("handleObjectSyncProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)), protocolMessageChannelSerial: \(String(describing: protocolMessageChannelSerial)))", level: .debug) receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages) @@ -489,7 +489,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool ) { receivedObjectProtocolMessagesContinuation.yield(objectMessages) - logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug) + logger.log("handleObjectProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug) if let existingSyncSequence = syncSequence { // RTO8a: Buffer the OBJECT message, to be handled once the sync completes diff --git a/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift b/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift index 40478ee3..4922ddfc 100644 --- a/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift +++ b/Sources/AblyLiveObjects/Internal/InternalObjectsMapEntry.swift @@ -9,7 +9,7 @@ internal struct InternalObjectsMapEntry: Equatable { } internal var timeserial: String? // OME2b - internal var data: ObjectData // OME2c + internal var data: ObjectData? // OME2c } internal extension InternalObjectsMapEntry { diff --git a/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift b/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift index 23719a19..0a240199 100644 --- a/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift +++ b/Sources/AblyLiveObjects/Protocol/ObjectMessage.swift @@ -71,7 +71,7 @@ internal struct ObjectsMapOp: Equatable { internal struct ObjectsMapEntry: Equatable { internal var tombstone: Bool? // OME2a internal var timeserial: String? // OME2b - internal var data: ObjectData // OME2c + internal var data: ObjectData? // OME2c internal var serialTimestamp: Date? // OME2d } @@ -386,7 +386,11 @@ internal extension ObjectsMapEntry { ) throws(InternalError) { tombstone = wireObjectsMapEntry.tombstone timeserial = wireObjectsMapEntry.timeserial - data = try .init(wireObjectData: wireObjectsMapEntry.data, format: format) + data = if let wireObjectData = wireObjectsMapEntry.data { + try .init(wireObjectData: wireObjectData, format: format) + } else { + nil + } serialTimestamp = wireObjectsMapEntry.serialTimestamp } @@ -398,7 +402,7 @@ internal extension ObjectsMapEntry { .init( tombstone: tombstone, timeserial: timeserial, - data: data.toWire(format: format), + data: data?.toWire(format: format), ) } } @@ -468,3 +472,132 @@ internal extension ObjectState { ) } } + +// MARK: - CustomDebugStringConvertible + +extension InboundObjectMessage: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + if let id { parts.append("id: \(id)") } + if let clientId { parts.append("clientId: \(clientId)") } + if let connectionId { parts.append("connectionId: \(connectionId)") } + if let extras { parts.append("extras: \(extras)") } + if let timestamp { parts.append("timestamp: \(timestamp)") } + if let operation { parts.append("operation: \(operation)") } + if let object { parts.append("object: \(object)") } + if let serial { parts.append("serial: \(serial)") } + if let siteCode { parts.append("siteCode: \(siteCode)") } + if let serialTimestamp { parts.append("serialTimestamp: \(serialTimestamp)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension OutboundObjectMessage: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + if let id { parts.append("id: \(id)") } + if let clientId { parts.append("clientId: \(clientId)") } + if let connectionId { parts.append("connectionId: \(connectionId)") } + if let extras { parts.append("extras: \(extras)") } + if let timestamp { parts.append("timestamp: \(timestamp)") } + if let operation { parts.append("operation: \(operation)") } + if let object { parts.append("object: \(object)") } + if let serial { parts.append("serial: \(serial)") } + if let siteCode { parts.append("siteCode: \(siteCode)") } + if let serialTimestamp { parts.append("serialTimestamp: \(serialTimestamp)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension ObjectOperation: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + parts.append("action: \(action)") + parts.append("objectId: \(objectId)") + if let mapOp { parts.append("mapOp: \(mapOp)") } + if let counterOp { parts.append("counterOp: \(counterOp)") } + if let map { parts.append("map: \(map)") } + if let counter { parts.append("counter: \(counter)") } + if let nonce { parts.append("nonce: \(nonce)") } + if let initialValue { parts.append("initialValue: \(initialValue)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension ObjectState: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + parts.append("objectId: \(objectId)") + parts.append("siteTimeserials: \(siteTimeserials)") + parts.append("tombstone: \(tombstone)") + if let createOp { parts.append("createOp: \(createOp)") } + if let map { parts.append("map: \(map)") } + if let counter { parts.append("counter: \(counter)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension ObjectsMapOp: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + parts.append("key: \(key)") + if let data { parts.append("data: \(data)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension ObjectsMap: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + parts.append("semantics: \(semantics)") + if let entries { + let formattedEntries = entries + .map { key, entry in + "\(key): \(entry)" + } + .joined(separator: ", ") + parts.append("entries: { \(formattedEntries) }") + } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension ObjectsMapEntry: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + if let tombstone { parts.append("tombstone: \(tombstone)") } + if let timeserial { parts.append("timeserial: \(timeserial)") } + if let data { parts.append("data: \(data)") } + if let serialTimestamp { parts.append("serialTimestamp: \(serialTimestamp)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension ObjectData: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + if let objectId { parts.append("objectId: \(objectId)") } + if let boolean { parts.append("boolean: \(boolean)") } + if let bytes { parts.append("bytes: \(bytes.count) bytes") } + if let number { parts.append("number: \(number)") } + if let string { parts.append("string: \(string)") } + if let json { parts.append("json: \(json)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} diff --git a/Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift b/Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift index d7316fc5..fad740a3 100644 --- a/Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift +++ b/Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift @@ -431,7 +431,7 @@ extension WireObjectsCounter: WireObjectCodable { internal struct WireObjectsMapEntry { internal var tombstone: Bool? // OME2a internal var timeserial: String? // OME2b - internal var data: WireObjectData // OME2c + internal var data: WireObjectData? // OME2c internal var serialTimestamp: Date? // OME2d } @@ -446,15 +446,16 @@ extension WireObjectsMapEntry: WireObjectCodable { internal init(wireObject: [String: WireValue]) throws(InternalError) { tombstone = try wireObject.optionalBoolValueForKey(WireKey.tombstone.rawValue) timeserial = try wireObject.optionalStringValueForKey(WireKey.timeserial.rawValue) - data = try wireObject.decodableValueForKey(WireKey.data.rawValue) + data = try wireObject.optionalDecodableValueForKey(WireKey.data.rawValue) serialTimestamp = try wireObject.optionalAblyProtocolDateValueForKey(WireKey.serialTimestamp.rawValue) } internal var toWireObject: [String: WireValue] { - var result: [String: WireValue] = [ - WireKey.data.rawValue: .object(data.toWireObject), - ] + var result: [String: WireValue] = [:] + if let data { + result[WireKey.data.rawValue] = .object(data.toWireObject) + } if let tombstone { result[WireKey.tombstone.rawValue] = .bool(tombstone) } @@ -555,3 +556,49 @@ internal enum StringOrData: WireCodable { } } } + +// MARK: - CustomDebugStringConvertible + +extension WireObjectsCounter: CustomDebugStringConvertible { + internal var debugDescription: String { + if let count { + "{ count: \(count) }" + } else { + "{ count: nil }" + } + } +} + +extension WireObjectsCounterOp: CustomDebugStringConvertible { + internal var debugDescription: String { + "{ amount: \(amount) }" + } +} + +extension WireObjectsMapEntry: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + if let tombstone { parts.append("tombstone: \(tombstone)") } + if let timeserial { parts.append("timeserial: \(timeserial)") } + if let data { parts.append("data: \(data)") } + if let serialTimestamp { parts.append("serialTimestamp: \(serialTimestamp)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} + +extension WireObjectData: CustomDebugStringConvertible { + internal var debugDescription: String { + var parts: [String] = [] + + if let objectId { parts.append("objectId: \(objectId)") } + if let boolean { parts.append("boolean: \(boolean)") } + if let bytes { parts.append("bytes: \(bytes)") } + if let number { parts.append("number: \(number)") } + if let string { parts.append("string: \(string)") } + if let json { parts.append("json: \(json)") } + + return "{ " + parts.joined(separator: ", ") + " }" + } +} diff --git a/Sources/AblyLiveObjects/Public/ARTRealtimeChannel+Objects.swift b/Sources/AblyLiveObjects/Public/ARTRealtimeChannel+Objects.swift index d36b5ccf..da479178 100644 --- a/Sources/AblyLiveObjects/Public/ARTRealtimeChannel+Objects.swift +++ b/Sources/AblyLiveObjects/Public/ARTRealtimeChannel+Objects.swift @@ -12,14 +12,15 @@ public extension ARTRealtimeChannel { let underlyingObjects = pluginAPI.underlyingObjects(forPublicRealtimeChannel: self) let internalObjects = DefaultInternalPlugin.realtimeObjects(for: underlyingObjects.channel, pluginAPI: pluginAPI) + let logger = pluginAPI.logger(for: underlyingObjects.channel) + let coreSDK = DefaultCoreSDK( channel: underlyingObjects.channel, client: underlyingObjects.client, pluginAPI: Plugin.defaultPluginAPI, + logger: logger, ) - let logger = pluginAPI.logger(for: underlyingObjects.channel) - return PublicObjectsStore.shared.getOrCreateRealtimeObjects( proxying: internalObjects, creationArgs: .init( diff --git a/Sources/AblyLiveObjects/Utility/LoggingUtilities.swift b/Sources/AblyLiveObjects/Utility/LoggingUtilities.swift new file mode 100644 index 00000000..c1939abd --- /dev/null +++ b/Sources/AblyLiveObjects/Utility/LoggingUtilities.swift @@ -0,0 +1,14 @@ +import Foundation + +internal enum LoggingUtilities { + /// Formats an array of object messages for logging with one message per line. + /// - Parameter objectMessages: The array of object messages to format + /// - Returns: A formatted string with one message per line + internal static func formatObjectMessagesForLogging(_ objectMessages: [some CustomDebugStringConvertible]) -> String { + guard !objectMessages.isEmpty else { + return "[]" + } + + return "[\n" + objectMessages.map { " \($0)" }.joined(separator: ",\n") + "\n]" + } +} diff --git a/Tests/AblyLiveObjectsTests/Helpers/ClientHelper.swift b/Tests/AblyLiveObjectsTests/Helpers/ClientHelper.swift index ff3dcb71..f1de7d64 100644 --- a/Tests/AblyLiveObjectsTests/Helpers/ClientHelper.swift +++ b/Tests/AblyLiveObjectsTests/Helpers/ClientHelper.swift @@ -22,10 +22,43 @@ enum ClientHelper { if let autoConnect = options.autoConnect { clientOptions.autoConnect = autoConnect } + if let logIdentifier = options.logIdentifier { + let logger = PrefixedLogger(prefix: "(\(logIdentifier)) ") + clientOptions.logHandler = logger + } return ARTRealtime(options: clientOptions) } + /// An ably-cocoa logger that adds a given prefix to all emitted log messages. + private class PrefixedLogger: ARTLog { + // This dance of using an implicitly unwrapped optional instead of a `let` is because we can't write a custom designated initializer (see comment below). + var _prefix: String! + var prefix: String { + get { + _prefix + } + + set { + if _prefix != nil { + fatalError("PrefixedLogger prefix cannot be changed after initialization") + } + _prefix = newValue + } + } + + // We use a convenience initializer because it's not clear to a consumer of the public API how to implement a custom designated initializer (super.init delegates to the non-public init(capturingOutput:). + convenience init(prefix: String) { + self.init() + self.prefix = prefix + } + + override public func log(_ message: String, with level: ARTLogLevel) { + let newMessage = "\(prefix)\(message)" + super.log(newMessage, with: level) + } + } + /// Creates channel options that include the channel modes needed for LiveObjects. static func channelOptionsWithObjects() -> ARTRealtimeChannelOptions { let options = ARTRealtimeChannelOptions() @@ -36,5 +69,8 @@ enum ClientHelper { struct PartialClientOptions: Encodable, Hashable { var useBinaryProtocol: Bool? var autoConnect: Bool? + + /// A prefix for all log messages emitted by the client. Allows clients to be distinguished in log messages for tests which use multiple clients. + var logIdentifier: String? } } diff --git a/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift b/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift index 3d6966c3..441bf12f 100644 --- a/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift +++ b/Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift @@ -393,7 +393,7 @@ struct TestFactories { static func mapEntry( tombstone: Bool? = false, timeserial: String? = "ts1", - data: ObjectData, + data: ObjectData?, ) -> ObjectsMapEntry { ObjectsMapEntry( tombstone: tombstone, diff --git a/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift b/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift index 98982166..daa2f503 100644 --- a/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift +++ b/Tests/AblyLiveObjectsTests/InternalDefaultLiveMapTests.swift @@ -534,8 +534,8 @@ struct InternalDefaultLiveMapTests { } // RTLM7a2a: Set ObjectsMapEntry.data to the ObjectData from the operation - #expect(map.testsOnly_data["key1"]?.data.number == operationData.number) - #expect(map.testsOnly_data["key1"]?.data.objectId == operationData.objectId) + #expect(map.testsOnly_data["key1"]?.data?.number == operationData.number) + #expect(map.testsOnly_data["key1"]?.data?.objectId == operationData.objectId) // RTLM7a2b: Set ObjectsMapEntry.timeserial to the operation's serial #expect(map.testsOnly_data["key1"]?.timeserial == "ts2") @@ -717,12 +717,7 @@ struct InternalDefaultLiveMapTests { #expect(try map.get(key: "key1", coreSDK: coreSDK, delegate: delegate) == nil) // RTLM8a2a: Set ObjectsMapEntry.data to undefined/null - let entry = map.testsOnly_data["key1"] - #expect(entry?.data.string == nil) - #expect(entry?.data.number == nil) - #expect(entry?.data.boolean == nil) - #expect(entry?.data.bytes == nil) - #expect(entry?.data.objectId == nil) + #expect(map.testsOnly_data["key1"]?.data == nil) // RTLM8a2b: Set ObjectsMapEntry.timeserial to the operation's serial #expect(map.testsOnly_data["key1"]?.timeserial == "ts2") @@ -751,11 +746,7 @@ struct InternalDefaultLiveMapTests { let entry = map.testsOnly_data["newKey"] #expect(entry != nil) #expect(entry?.timeserial == "ts1") - #expect(entry?.data.string == nil) - #expect(entry?.data.number == nil) - #expect(entry?.data.boolean == nil) - #expect(entry?.data.bytes == nil) - #expect(entry?.data.objectId == nil) + #expect(entry?.data == nil) // RTLM8e: Check return value #expect(try #require(update.update).update == ["newKey": .removed]) diff --git a/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift b/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift index f4e388a4..32b095ed 100644 --- a/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift +++ b/Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift @@ -88,32 +88,12 @@ func waitFixtureChannelIsReady(_: ARTRealtime) async throws { try await Task.sleep(nanoseconds: 5 * NSEC_PER_SEC) } -func waitForMapKeyUpdate(_ map: any LiveMap, _ key: String) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - do { - try map.subscribe { update, subscription in - if update.update[key] != nil { - subscription.unsubscribe() - continuation.resume() - } - } - } catch { - continuation.resume(throwing: error) - } - } +func waitForMapKeyUpdate(_ updates: AsyncStream, _ key: String) async { + _ = await updates.first { $0.update[key] != nil } } -func waitForCounterUpdate(_ counter: any LiveCounter) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - do { - try counter.subscribe { _, subscription in - subscription.unsubscribe() - continuation.resume() - } - } catch { - continuation.resume(throwing: error) - } - } +func waitForCounterUpdate(_ updates: AsyncStream) async { + _ = await updates.first { _ in true } } // I added this @MainActor as an "I don't understand what's going on there; let's try this" when observing that for some reason the setter of setListenerAfterProcessingIncomingMessage was hanging inside `-[ARTSRDelegateController dispatchQueue]`. This seems to avoid it and I have not investigated more deeply 🤷 @@ -177,17 +157,21 @@ private struct TestScenario { private func forScenarios(_ scenarios: [TestScenario]) -> [TestCase] { scenarios.map { scenario -> [TestCase] in + var clientOptions = ClientHelper.PartialClientOptions(logIdentifier: "client1") + if scenario.allTransportsAndProtocols { - [true, false].map { useBinaryProtocol -> TestCase in - .init( + return [true, false].map { useBinaryProtocol -> TestCase in + clientOptions.useBinaryProtocol = useBinaryProtocol + + return .init( disabled: scenario.disabled, scenario: scenario, - options: .init(useBinaryProtocol: useBinaryProtocol), + options: clientOptions, channelName: "\(scenario.description) \(useBinaryProtocol ? "binary" : "text")", ) } } else { - [.init(disabled: scenario.disabled, scenario: scenario, options: .init(), channelName: scenario.description)] + return [.init(disabled: scenario.disabled, scenario: scenario, options: clientOptions, channelName: scenario.description)] } } .flatMap(\.self) @@ -222,6 +206,7 @@ private actor ObjectsFixturesTrait: SuiteTrait, TestScoping { try await helper.initForChannel(objectsFixturesChannel) } } + self.setupTask = setupTask try await setupTask.value } @@ -308,7 +293,7 @@ private struct ObjectsIntegrationTests { }, ), .init( - disabled: true, // Uses LiveMap.set which we haven't implemented yet + disabled: false, allTransportsAndProtocols: true, description: "OBJECT_SYNC sequence builds object tree with all operations applied", action: { ctx in @@ -316,12 +301,14 @@ private struct ObjectsIntegrationTests { let objects = ctx.objects // Create the promise first, before the operations that will trigger it + let objectsCreatedPromiseUpdates1 = try root.updates() + let objectsCreatedPromiseUpdates2 = try root.updates() async let objectsCreatedPromise: Void = withThrowingTaskGroup(of: Void.self) { group in group.addTask { - try await waitForMapKeyUpdate(root, "counter") + await waitForMapKeyUpdate(objectsCreatedPromiseUpdates1, "counter") } group.addTask { - try await waitForMapKeyUpdate(root, "map") + await waitForMapKeyUpdate(objectsCreatedPromiseUpdates2, "map") } while try await group.next() != nil {} } @@ -337,15 +324,18 @@ private struct ObjectsIntegrationTests { _ = try await (setMapPromise, setCounterPromise, objectsCreatedPromise) // Create the promise first, before the operations that will trigger it + let operationsAppliedPromiseUpdates1 = try map.updates() + let operationsAppliedPromiseUpdates2 = try map.updates() + let operationsAppliedPromiseUpdates3 = try counter.updates() async let operationsAppliedPromise: Void = withThrowingTaskGroup(of: Void.self) { group in group.addTask { - try await waitForMapKeyUpdate(map, "anotherKey") + await waitForMapKeyUpdate(operationsAppliedPromiseUpdates1, "anotherKey") } group.addTask { - try await waitForMapKeyUpdate(map, "shouldDelete") + await waitForMapKeyUpdate(operationsAppliedPromiseUpdates2, "shouldDelete") } group.addTask { - try await waitForCounterUpdate(counter) + await waitForCounterUpdate(operationsAppliedPromiseUpdates3) } while try await group.next() != nil {} } @@ -378,7 +368,7 @@ private struct ObjectsIntegrationTests { }, ), .init( - disabled: true, // Uses LiveMap.set which we haven't implemented yet + disabled: false, allTransportsAndProtocols: false, description: "OBJECT_SYNC sequence does not change references to existing objects", action: { ctx in @@ -388,12 +378,14 @@ private struct ObjectsIntegrationTests { let client = ctx.client // Create the promise first, before the operations that will trigger it + let objectsCreatedPromiseUpdates1 = try root.updates() + let objectsCreatedPromiseUpdates2 = try root.updates() async let objectsCreatedPromise: Void = withThrowingTaskGroup(of: Void.self) { group in group.addTask { - try await waitForMapKeyUpdate(root, "counter") + await waitForMapKeyUpdate(objectsCreatedPromiseUpdates1, "counter") } group.addTask { - try await waitForMapKeyUpdate(root, "map") + await waitForMapKeyUpdate(objectsCreatedPromiseUpdates2, "map") } while try await group.next() != nil {} } @@ -518,7 +510,7 @@ private struct ObjectsIntegrationTests { }, ), .init( - disabled: true, // This relies on the LiveMap.get returning `nil` when the referenced object's internal `tombstone` flag is true; this is not yet specified, have asked in https://ably-real-time.slack.com/archives/D067YAXGYQ5/p1751376526929339 + disabled: false, allTransportsAndProtocols: false, description: "OBJECT_SYNC sequence with object state \"tombstone\" property creates tombstoned object", action: { ctx in @@ -574,7 +566,7 @@ private struct ObjectsIntegrationTests { }, ), .init( - disabled: true, // Uses LiveMap.subscribe (through waitForMapKeyUpdate) which we haven't implemented yet. It also seems to rely on the same internal `tombstone` flag as the previous test. + disabled: false, allTransportsAndProtocols: true, description: "OBJECT_SYNC sequence with object state \"tombstone\" property deletes existing object", action: { ctx in @@ -583,14 +575,15 @@ private struct ObjectsIntegrationTests { let channelName = ctx.channelName let channel = ctx.channel - async let counterCreatedPromise: Void = waitForMapKeyUpdate(root, "counter") + let counterCreatedPromiseUpdates = try root.updates() + async let counterCreatedPromise: Void = waitForMapKeyUpdate(counterCreatedPromiseUpdates, "counter") let counterResult = try await objectsHelper.createAndSetOnMap( channelName: channelName, mapObjectId: "root", key: "counter", createOp: objectsHelper.counterCreateRestOp(number: 1), ) - _ = try await counterCreatedPromise + _ = await counterCreatedPromise #expect(try root.get(key: "counter") != nil, "Check counter exists on root before OBJECT_SYNC sequence with \"tombstone=true\"") @@ -628,7 +621,7 @@ private struct ObjectsIntegrationTests { }, ), .init( - disabled: true, // Uses LiveMap.subscribe (through waitForMapKeyUpdate) which we haven't implemented yet + disabled: false, allTransportsAndProtocols: true, description: "OBJECT_SYNC sequence with object state \"tombstone\" property triggers subscription callback for existing object", action: { ctx in @@ -637,25 +630,21 @@ private struct ObjectsIntegrationTests { let channelName = ctx.channelName let channel = ctx.channel - async let counterCreatedPromise: Void = waitForMapKeyUpdate(root, "counter") + let counterCreatedPromiseUpdates = try root.updates() + async let counterCreatedPromise: Void = waitForMapKeyUpdate(counterCreatedPromiseUpdates, "counter") let counterResult = try await objectsHelper.createAndSetOnMap( channelName: channelName, mapObjectId: "root", key: "counter", createOp: objectsHelper.counterCreateRestOp(number: 1), ) - _ = try await counterCreatedPromise - - async let counterSubPromise: Void = withCheckedThrowingContinuation { continuation in - do { - try #require(root.get(key: "counter")?.liveCounterValue).subscribe { update, _ in - #expect(update.amount == -1, "Check counter subscription callback is called with an expected update object after OBJECT_SYNC sequence with \"tombstone=true\"") - continuation.resume() - } - } catch { - continuation.resume(throwing: error) - } - } + _ = await counterCreatedPromise + + let counterSubPromiseUpdates = try #require(root.get(key: "counter")?.liveCounterValue).updates() + async let counterSubPromise: Void = { + let update = try await #require(counterSubPromiseUpdates.first { _ in true }) + #expect(update.amount == -1, "Check counter subscription callback is called with an expected update object after OBJECT_SYNC sequence with \"tombstone=true\"") + }() // inject an OBJECT_SYNC message where a counter is now tombstoned try await objectsHelper.processObjectStateMessageOnChannel( diff --git a/Tests/AblyLiveObjectsTests/WireObjectMessageTests.swift b/Tests/AblyLiveObjectsTests/WireObjectMessageTests.swift index 7b63b9ba..16c28f12 100644 --- a/Tests/AblyLiveObjectsTests/WireObjectMessageTests.swift +++ b/Tests/AblyLiveObjectsTests/WireObjectMessageTests.swift @@ -180,7 +180,7 @@ enum WireObjectMessageTests { #expect(op.mapOp?.data?.string == "value1") #expect(op.counterOp?.amount == 42) #expect(op.map?.semantics == .known(.lww)) - #expect(op.map?.entries?["key1"]?.data.string == "value1") + #expect(op.map?.entries?["key1"]?.data?.string == "value1") #expect(op.map?.entries?["key1"]?.tombstone == false) #expect(op.counter?.count == 42) @@ -282,7 +282,7 @@ enum WireObjectMessageTests { #expect(state.createOp?.action == .known(.mapCreate)) #expect(state.createOp?.objectId == "obj1") #expect(state.map?.semantics == .known(.lww)) - #expect(state.map?.entries?["key1"]?.data.string == "value1") + #expect(state.map?.entries?["key1"]?.data?.string == "value1") #expect(state.map?.entries?["key1"]?.tombstone == false) #expect(state.counter?.count == 42) } @@ -488,10 +488,10 @@ enum WireObjectMessageTests { ] let map = try WireObjectsMap(wireObject: json) #expect(map.semantics == .known(.lww)) - #expect(map.entries?["key1"]?.data.string == "value1") + #expect(map.entries?["key1"]?.data?.string == "value1") #expect(map.entries?["key1"]?.tombstone == false) #expect(map.entries?["key1"]?.timeserial == "ts1") - #expect(map.entries?["key2"]?.data.string == "value2") + #expect(map.entries?["key2"]?.data?.string == "value2") #expect(map.entries?["key2"]?.tombstone == true) #expect(map.entries?["key2"]?.timeserial == nil) } @@ -584,7 +584,7 @@ enum WireObjectMessageTests { "timeserial": "ts1", ] let entry = try WireObjectsMapEntry(wireObject: json) - #expect(entry.data.string == "value1") + #expect(entry.data?.string == "value1") #expect(entry.tombstone == true) #expect(entry.timeserial == "ts1") } @@ -593,7 +593,7 @@ enum WireObjectMessageTests { func decodesWithOptionalFieldsAbsent() throws { let json: [String: WireValue] = ["data": ["string": "value1"]] let entry = try WireObjectsMapEntry(wireObject: json) - #expect(entry.data.string == "value1") + #expect(entry.data?.string == "value1") #expect(entry.tombstone == nil) #expect(entry.timeserial == nil) } diff --git a/ably-cocoa b/ably-cocoa index c72e1d74..bc193685 160000 --- a/ably-cocoa +++ b/ably-cocoa @@ -1 +1 @@ -Subproject commit c72e1d7498bfcd0562f67442601e5056c2592631 +Subproject commit bc19368559fe1b860d18f375e421e8fd2526572f