Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Sources/AblyLiveObjects/Internal/ARTClientOptions+Objects.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
internal import AblyPlugin

internal extension ARTClientOptions {
private class Box<T> {
internal let boxed: T

internal init(boxed: T) {
self.boxed = boxed
}
}

private static let garbageCollectionOptionsKey = "Objects.garbageCollectionOptions"

/// Can be overriden for testing purposes.
var garbageCollectionOptions: InternalDefaultRealtimeObjects.GarbageCollectionOptions? {
get {
let optionsValue = PluginAPI.sharedInstance().pluginOptionsValue(
forKey: Self.garbageCollectionOptionsKey,
clientOptions: self,
)

guard let optionsValue else {
return nil
}

guard let box = optionsValue as? Box<InternalDefaultRealtimeObjects.GarbageCollectionOptions> else {
preconditionFailure("Expected GarbageCollectionOptionsBox, got \(optionsValue)")
}

return box.boxed
}

set {
guard let newValue else {
preconditionFailure("Not implemented the ability to un-set GC options")
}

PluginAPI.sharedInstance().setPluginOptionsValue(
Box<InternalDefaultRealtimeObjects.GarbageCollectionOptions>(boxed: newValue),
forKey: Self.garbageCollectionOptionsKey,
clientOptions: self,
)
}
}
}
37 changes: 37 additions & 0 deletions Sources/AblyLiveObjects/Internal/CoreSDK.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,31 @@ internal protocol CoreSDK: AnyObject, Sendable {
/// Implements the internal `#publish` method of RTO15.
func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError)

/// Replaces the implementation of ``publish(objectMessages:)``.
///
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(InternalError) -> Void)

/// Returns the current state of the Realtime channel that this wraps.
var channelState: ARTRealtimeChannelState { get }
}

internal final class DefaultCoreSDK: CoreSDK {
/// Used to synchronize access to internal mutable state.
private let mutex = NSLock()

private let channel: AblyPlugin.RealtimeChannel
private let client: AblyPlugin.RealtimeClient
private let pluginAPI: PluginAPIProtocol
private let logger: AblyPlugin.Logger

/// If set to true, ``publish(objectMessages:)`` will behave like a no-op.
///
/// This enables the `testsOnly_overridePublish(with:)` test hook.
///
/// - Note: This should be `throws(InternalError)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?

internal init(
channel: AblyPlugin.RealtimeChannel,
client: AblyPlugin.RealtimeClient,
Expand All @@ -35,6 +50,22 @@ internal final class DefaultCoreSDK: CoreSDK {
internal func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)

// Use the overridden implementation if supplied
let overriddenImplementation = mutex.withLock {
overriddenPublishImplementation
}
if let overriddenImplementation {
do {
try await overriddenImplementation(objectMessages)
} catch {
guard let internalError = error as? InternalError else {
preconditionFailure("Expected InternalError, got \(error)")
}
throw internalError
}
return
}

// TODO: Implement the full spec of RTO15 (https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/47)
try await DefaultInternalPlugin.sendObject(
objectMessages: objectMessages,
Expand All @@ -44,6 +75,12 @@ internal final class DefaultCoreSDK: CoreSDK {
)
}

internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(InternalError) -> Void) {
mutex.withLock {
overriddenPublishImplementation = newImplementation
}
}

internal var channelState: ARTRealtimeChannelState {
channel.state
}
Expand Down
8 changes: 7 additions & 1 deletion Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ internal final class DefaultInternalPlugin: NSObject, AblyPlugin.LiveObjectsInte
internal func prepare(_ channel: AblyPlugin.RealtimeChannel, client: AblyPlugin.RealtimeClient) {
let logger = pluginAPI.logger(for: channel)
let callbackQueue = pluginAPI.callbackQueue(for: client)
let options = pluginAPI.options(for: client)

logger.log("LiveObjects.DefaultInternalPlugin received prepare(_:)", level: .debug)
let liveObjects = InternalDefaultRealtimeObjects(logger: logger, userCallbackQueue: callbackQueue, clock: DefaultSimpleClock())
let liveObjects = InternalDefaultRealtimeObjects(
logger: logger,
userCallbackQueue: callbackQueue,
clock: DefaultSimpleClock(),
garbageCollectionOptions: options.garbageCollectionOptions ?? .init(),
)
pluginAPI.setPluginDataValue(liveObjects, forKey: Self.pluginDataKey, channel: channel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
private nonisolated(unsafe) var garbageCollectionTask: Task<Void, Never>!

/// Parameters used to control the garbage collection of tombstoned objects and map entries, as described in RTO10.
internal struct GarbageCollectionOptions {
internal struct GarbageCollectionOptions: Encodable, Hashable {
/// The RTO10a interval at which we will perform garbage collection.
///
/// The default value comes from the suggestion in RTO10a.
Expand Down Expand Up @@ -98,6 +98,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
(completedGarbageCollectionEvents, completedGarbageCollectionsEventsContinuation) = AsyncStream.makeStream()
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock))
garbageCollectionInterval = garbageCollectionOptions.interval
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod
Expand Down Expand Up @@ -331,21 +332,32 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
gracePeriod: garbageCollectionGracePeriod,
clock: clock,
logger: logger,
eventsContinuation: completedGarbageCollectionsEventsContinuation,
)
}
}

// These drive the testsOnly_completedGarbageCollectionEvents property that informs the test suite when a garbage collection cycle has completed.
private let completedGarbageCollectionEvents: AsyncStream<Void>
private let completedGarbageCollectionsEventsContinuation: AsyncStream<Void>.Continuation
/// Emits an element whenever a garbage collection cycle has completed.
internal var testsOnly_completedGarbageCollectionEvents: AsyncStream<Void> {
completedGarbageCollectionEvents
}

// MARK: - Testing

/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:
///
/// - testsOnly_receivedObjectProtocolMessages
/// - testsOnly_receivedObjectStateProtocolMessages
/// - testsOnly_waitingForSyncEvents
/// - testsOnly_completedGarbageCollectionEvents
internal func testsOnly_finishAllTestHelperStreams() {
receivedObjectProtocolMessagesContinuation.finish()
receivedObjectSyncProtocolMessagesContinuation.finish()
waitingForSyncEventsContinuation.finish()
completedGarbageCollectionsEventsContinuation.finish()
}

// MARK: - Mutable state and the operations that affect it
Expand Down
9 changes: 8 additions & 1 deletion Sources/AblyLiveObjects/Internal/ObjectsPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,12 @@ internal struct ObjectsPool {
}

/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) {
internal mutating func performGarbageCollection(
gracePeriod: TimeInterval,
clock: SimpleClock,
logger: Logger,
eventsContinuation: AsyncStream<Void>.Continuation,
) {
logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug)

let now = clock.now
Expand All @@ -433,5 +438,7 @@ internal struct ObjectsPool {
}
return !shouldRelease
}

eventsContinuation.yield()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,13 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects {
internal var testsOnly_receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]> {
proxied.testsOnly_receivedObjectSyncProtocolMessages
}

// These are used by the integration tests.

/// Replaces the method that this `RealtimeObjects` uses to send any outbound `ObjectMessage`s.
///
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(InternalError) -> Void) {
coreSDK.testsOnly_overridePublish(with: newImplementation)
}
}
2 changes: 1 addition & 1 deletion Sources/AblyLiveObjects/Utility/ExtendedJSONValue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal extension ExtendedJSONValue {

/// Converts an `ExtendedJSONValue` to an object.
///
/// The contract for what this will return are the same as those of `JSONValue.toJSONSerializationInputElemtn`, with one addition: any values in the input of case `.extra` will be passed to the `serializeExtraValue` function, and the result of this function call will be inserted into the output object.
/// The contract for what this will return are the same as those of `JSONValue.toJSONSerializationInputElement`, with one addition: any values in the input of case `.extra` will be passed to the `serializeExtraValue` function, and the result of this function call will be inserted into the output object.
func serialized(serializeExtraValue: (Extra) -> Any) -> Any {
switch self {
case let .object(underlying):
Expand Down
6 changes: 5 additions & 1 deletion Tests/AblyLiveObjectsTests/Helpers/ClientHelper.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Ably
import AblyLiveObjects
@testable import AblyLiveObjects

/// Helper for creating ably-cocoa objects, for use in integration tests.
enum ClientHelper {
Expand All @@ -26,6 +26,9 @@ enum ClientHelper {
let logger = PrefixedLogger(prefix: "(\(logIdentifier)) ")
clientOptions.logHandler = logger
}
if let garbageCollectionOptions = options.garbageCollectionOptions {
clientOptions.garbageCollectionOptions = garbageCollectionOptions
}

return ARTRealtime(options: clientOptions)
}
Expand Down Expand Up @@ -69,6 +72,7 @@ enum ClientHelper {
struct PartialClientOptions: Encodable, Hashable {
var useBinaryProtocol: Bool?
var autoConnect: Bool?
var garbageCollectionOptions: InternalDefaultRealtimeObjects.GarbageCollectionOptions?

/// A prefix for all log messages emitted by the client. Allows clients to be distinguished in log messages for tests which use multiple clients.
var logIdentifier: String?
Expand Down
40 changes: 39 additions & 1 deletion Tests/AblyLiveObjectsTests/Helpers/Subscriber.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ final class Subscriber<each CallbackArg: Sendable>: Sendable {
// Used to synchronize access to the nonisolated(unsafe) mutable state.
private let mutex = NSLock()
private nonisolated(unsafe) var invocations: [(repeat each CallbackArg)] = []
private nonisolated(unsafe) var listeners: [CallbackWrapper] = []

/// Creates a `Subscriber`.
///
Expand Down Expand Up @@ -39,13 +40,50 @@ final class Subscriber<each CallbackArg: Sendable>: Sendable {
guard let self else {
return
}
mutex.withLock {
let callListeners = mutex.withLock {
let invocation = (repeat each arg)
invocations.append(invocation)

return { [listeners] in
for listener in listeners {
listener.callAsFunction(repeat each invocation)
}
}
}
if let action {
action(repeat each arg)
}
callListeners()
}
}

/// A wrapper that allows us to store a callback that takes variadic args.
///
/// This allows us to avoid the error "Cannot fully abstract a value of variadic function type '@Sendable (repeat each CallbackArg) -> ()' because different contexts will not be able to reliably agree on a calling convention; try wrapping it in a struct" that we get if we try to directly store the callback in an array. Claude suggested this solution.
private struct CallbackWrapper {
let callback: @Sendable (repeat each CallbackArg) -> Void

func callAsFunction(_ args: repeat each CallbackArg) {
callback(repeat each args)
}
}

/// Adds a listener which replays all previously buffered and future invocations of any function previously created by ``createListener(_:)``.
///
/// This is useful for the scenario where you want to set up a subscription synchronously (so as not to miss any events) but then in an `async` context perform actions as a result of the invocation of the listener. (You could equally use the SDK's `AsyncSequence` interface but the approach here is a closer mapping of the ported JS integration tests that call `subscribe`.)
func addListener(_ listener: @escaping (@Sendable (repeat each CallbackArg) -> Void)) {
let performInvocations = mutex.withLock {
listeners.append(.init(callback: listener))

return { [invocations, callbackQueue] in
for invocation in invocations {
callbackQueue.async {
listener(repeat each invocation)
}
}
}
}

performInvocations()
}
}
Loading