Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 88 additions & 4 deletions Sources/AblyLiveObjects/DefaultRealtimeObjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
mutableState = .init(objectsPool: .init(rootDelegate: self, rootCoreSDK: coreSDK))
mutableState = .init(objectsPool: .init(rootDelegate: self, rootCoreSDK: coreSDK, logger: logger))
}

// MARK: - LiveMapObjectPoolDelegate
Expand Down Expand Up @@ -166,8 +166,17 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
receivedObjectProtocolMessages
}

/// Implements the `OBJECT` handling of RTO8.
internal func handleObjectProtocolMessage(objectMessages: [InboundObjectMessage]) {
receivedObjectProtocolMessagesContinuation.yield(objectMessages)
mutex.withLock {
mutableState.handleObjectProtocolMessage(
objectMessages: objectMessages,
logger: logger,
receivedObjectProtocolMessagesContinuation: receivedObjectProtocolMessagesContinuation,
mapDelegate: self,
coreSDK: coreSDK,
)
}
}

internal var testsOnly_receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]> {
Expand All @@ -193,7 +202,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
/// Intended as a way for tests to populate the object pool.
internal func testsOnly_createZeroValueLiveObject(forObjectID objectID: String, coreSDK: CoreSDK) -> ObjectsPool.Entry? {
mutex.withLock {
mutableState.objectsPool.createZeroValueObject(forObjectID: objectID, mapDelegate: self, coreSDK: coreSDK)
mutableState.objectsPool.createZeroValueObject(forObjectID: objectID, mapDelegate: self, coreSDK: coreSDK, logger: logger)
}
}

Expand Down Expand Up @@ -242,7 +251,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD

// RTO4b1, RTO4b2: Reset the ObjectsPool to have a single empty root object
// TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458
objectsPool = .init(rootDelegate: mapDelegate, rootCoreSDK: coreSDK)
objectsPool = .init(rootDelegate: mapDelegate, rootCoreSDK: coreSDK, logger: logger)

// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.

Expand Down Expand Up @@ -320,5 +329,80 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
syncStatus.signalSyncComplete()
}
}

/// Implements the `OBJECT` handling of RTO8.
internal mutating func handleObjectProtocolMessage(
objectMessages: [InboundObjectMessage],
logger: Logger,
receivedObjectProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation,
mapDelegate: LiveMapObjectPoolDelegate,
coreSDK: CoreSDK,
) {
receivedObjectProtocolMessagesContinuation.yield(objectMessages)

logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug)

// TODO: RTO8a's buffering

// RTO8b
for objectMessage in objectMessages {
applyObjectProtocolMessageObjectMessage(
objectMessage,
logger: logger,
mapDelegate: mapDelegate,
coreSDK: coreSDK,
)
}
}

/// Implements the `OBJECT` application of RTO9.
private mutating func applyObjectProtocolMessageObjectMessage(
_ objectMessage: InboundObjectMessage,
logger: Logger,
mapDelegate: LiveMapObjectPoolDelegate,
coreSDK: CoreSDK,
) {
guard let operation = objectMessage.operation else {
// RTO9a1
logger.log("Unsupported OBJECT message received (no operation); \(objectMessage)", level: .warn)
return
}

// RTO9a2a1, RTO9a2a2
let entry: ObjectsPool.Entry
if let existingEntry = objectsPool.entries[operation.objectId] {
entry = existingEntry
} else {
guard let newEntry = objectsPool.createZeroValueObject(
forObjectID: operation.objectId,
mapDelegate: mapDelegate,
coreSDK: coreSDK,
logger: logger,
) else {
logger.log("Unable to create zero-value object for \(operation.objectId) when processing OBJECT message; dropping", level: .warn)
return
}

entry = newEntry
}

switch operation.action {
case let .known(action):
switch action {
case .mapCreate, .mapSet, .mapRemove, .counterCreate, .counterInc, .objectDelete:
// RTO9a2a3
entry.apply(
operation,
objectMessageSerial: objectMessage.serial,
objectMessageSiteCode: objectMessage.siteCode,
objectsPool: &objectsPool,
)
}
case let .unknown(rawValue):
// RTO9a2b
logger.log("Unsupported OBJECT operation action \(rawValue) received", level: .warn)
return
}
}
}
}
165 changes: 134 additions & 31 deletions Sources/AblyLiveObjects/Internal/DefaultLiveCounter.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Ably
internal import AblyPlugin
import Foundation

/// Our default implementation of ``LiveCounter``.
Expand All @@ -8,57 +9,63 @@ internal final class DefaultLiveCounter: LiveCounter {

private nonisolated(unsafe) var mutableState: MutableState

internal var testsOnly_siteTimeserials: [String: String]? {
internal var testsOnly_siteTimeserials: [String: String] {
mutex.withLock {
mutableState.siteTimeserials
mutableState.liveObject.siteTimeserials
}
}

internal var testsOnly_createOperationIsMerged: Bool? {
internal var testsOnly_createOperationIsMerged: Bool {
mutex.withLock {
mutableState.createOperationIsMerged
mutableState.liveObject.createOperationIsMerged
}
}

internal var testsOnly_objectID: String? {
internal var testsOnly_objectID: String {
mutex.withLock {
mutableState.objectID
mutableState.liveObject.objectID
}
}

private let coreSDK: CoreSDK
private let logger: AblyPlugin.Logger

// MARK: - Initialization

internal convenience init(
testsOnly_data data: Double,
objectID: String?,
coreSDK: CoreSDK
objectID: String,
coreSDK: CoreSDK,
logger: AblyPlugin.Logger
) {
self.init(data: data, objectID: objectID, coreSDK: coreSDK)
self.init(data: data, objectID: objectID, coreSDK: coreSDK, logger: logger)
}

private init(
data: Double,
objectID: String?,
coreSDK: CoreSDK
objectID: String,
coreSDK: CoreSDK,
logger: AblyPlugin.Logger
) {
mutableState = .init(data: data, objectID: objectID)
mutableState = .init(liveObject: .init(objectID: objectID), data: data)
self.coreSDK = coreSDK
self.logger = logger
}

/// Creates a "zero-value LiveCounter", per RTLC4.
///
/// - Parameters:
/// - objectID: The value for the "private objectId field" of RTO5c1b1a.
internal static func createZeroValued(
objectID: String? = nil,
objectID: String,
coreSDK: CoreSDK,
logger: AblyPlugin.Logger,
) -> Self {
.init(
data: 0,
objectID: objectID,
coreSDK: coreSDK,
logger: logger,
)
}

Expand Down Expand Up @@ -116,41 +123,137 @@ internal final class DefaultLiveCounter: LiveCounter {
}
}

/// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
internal func testsOnly_mergeInitialValue(from operation: ObjectOperation) {
mutex.withLock {
mutableState.mergeInitialValue(from: operation)
}
}

/// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
internal func testsOnly_applyCounterCreateOperation(_ operation: ObjectOperation) {
mutex.withLock {
mutableState.applyCounterCreateOperation(operation, logger: logger)
}
}

/// Test-only method to apply a COUNTER_INC operation, per RTLC9.
internal func testsOnly_applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
mutex.withLock {
mutableState.applyCounterIncOperation(operation)
}
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
internal func apply(
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectsPool: inout ObjectsPool,
) {
mutex.withLock {
mutableState.apply(
operation,
objectMessageSerial: objectMessageSerial,
objectMessageSiteCode: objectMessageSiteCode,
objectsPool: &objectsPool,
logger: logger,
)
}
}

// MARK: - Mutable state and the operations that affect it

private struct MutableState {
/// The mutable state common to all LiveObjects.
internal var liveObject: LiveObjectMutableState

/// The internal data that this map holds, per RTLC3.
internal var data: Double

/// The site timeserials for this counter, per RTLC6a.
internal var siteTimeserials: [String: String]?

/// Whether the create operation has been merged, per RTLC6b and RTLC6d2.
internal var createOperationIsMerged: Bool?

/// The "private `objectId` field" of RTO5c1b1a.
internal var objectID: String?

/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
internal mutating func replaceData(using state: ObjectState) {
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
siteTimeserials = state.siteTimeserials
liveObject.siteTimeserials = state.siteTimeserials

// RTLC6b: Set the private flag createOperationIsMerged to false
createOperationIsMerged = false
liveObject.createOperationIsMerged = false

// RTLC6c: Set data to the value of ObjectState.counter.count, or to 0 if it does not exist
data = state.counter?.count?.doubleValue ?? 0

// RTLC6d: If ObjectState.createOp is present
// RTLC6d: If ObjectState.createOp is present, merge the initial value into the LiveCounter as described in RTLC10
if let createOp = state.createOp {
// RTLC6d1: Add ObjectState.createOp.counter.count to data, if it exists
if let createOpCount = createOp.counter?.count?.doubleValue {
data += createOpCount
}
// RTLC6d2: Set the private flag createOperationIsMerged to true
createOperationIsMerged = true
mergeInitialValue(from: createOp)
}
}

/// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
internal mutating func mergeInitialValue(from operation: ObjectOperation) {
// RTLC10a: Add ObjectOperation.counter.count to data, if it exists
if let operationCount = operation.counter?.count?.doubleValue {
data += operationCount
}
// RTLC10b: Set the private flag createOperationIsMerged to true
liveObject.createOperationIsMerged = true
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
internal mutating func apply(
_ operation: ObjectOperation,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectsPool: inout ObjectsPool,
logger: Logger,
) {
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
// RTLC7b
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
return
}

// RTLC7c
liveObject.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial

switch operation.action {
case .known(.counterCreate):
// RTLC7d1
applyCounterCreateOperation(
operation,
logger: logger,
)
case .known(.counterInc):
// RTLC7d2
applyCounterIncOperation(operation.counterOp)
default:
// RTLC7d3
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
}
}

/// Applies a `COUNTER_CREATE` operation, per RTLC8.
internal mutating func applyCounterCreateOperation(
_ operation: ObjectOperation,
logger: Logger,
) {
if liveObject.createOperationIsMerged {
// RTLC8b
logger.log("Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied", level: .warn)
return
}

// RTLC8c
mergeInitialValue(from: operation)
}

/// Applies a `COUNTER_INC` operation, per RTLC9.
internal mutating func applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
guard let operation else {
return
}

// RTLC9b
data += operation.amount.doubleValue
}
}
}
Loading