From 507c3cd40af25c14dafbfce8bacf9985a854a029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niil=20=C3=96hlin?= Date: Thu, 26 Mar 2020 13:44:51 +0100 Subject: [PATCH 1/4] Remove warnings --- Flow/Callbacker.swift | 24 ++++++++------- Flow/Disposable.swift | 34 ++++++++++++--------- Flow/Future+Combiners.swift | 14 ++++++--- Flow/Future.swift | 16 +++++----- Flow/FutureQueue.swift | 43 ++++++++++++++------------ Flow/Locking.swift | 44 ++++++++++++--------------- Flow/OrderedCallbacker.swift | 30 ++++++++++-------- Flow/Signal+Construction.swift | 24 +++++++++------ Flow/Signal+Scheduling.swift | 29 ++++++++++-------- Flow/Signal+Transforms.swift | 18 ++++++++--- FlowTests/FutureSchedulingTests.swift | 4 ++- FlowTests/SignalProviderTests.swift | 28 +++++++++++------ 12 files changed, 178 insertions(+), 130 deletions(-) diff --git a/Flow/Callbacker.swift b/Flow/Callbacker.swift index 01966e2..a581fc4 100644 --- a/Flow/Callbacker.swift +++ b/Flow/Callbacker.swift @@ -21,20 +21,22 @@ public final class Callbacker { private var callbacks = Callbacks.none private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } public init() { - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } switch callbacks { case .none: return true @@ -46,8 +48,8 @@ public final class Callbacker { /// Register a callback to be called when `callAll` is executed. /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (Value) -> Void) -> Disposable { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } let key = generateKey() @@ -63,8 +65,8 @@ public final class Callbacker { } return NoLockKeyDisposer(key) { key in - self.mutex.lock() - defer { self.mutex.unlock() } + self.withMutex { $0.lock() } + defer { self.withMutex { $0.unlock() } } switch self.callbacks { case .single(let singleKey, _) where singleKey == key: @@ -82,9 +84,9 @@ public final class Callbacker { /// Will call all registered callbacks with `value` public func callAll(with value: Value) { - mutex.lock() + withMutex { $0.lock() } let callbacks = self.callbacks - mutex.unlock() + withMutex { $0.unlock() } switch callbacks { case .none: break diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 9b3bcc3..5fa4b42 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -29,24 +29,26 @@ public struct NilDisposer: Disposable { public final class Disposer: Disposable { private var disposer: (() -> ())? private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } /// Pass a closure to be called when being disposed public init(_ disposer: @escaping () -> () = {}) { self.disposer = disposer - mutex.initialize() + withMutex { $0.initialize() } } deinit { dispose() - mutex.deinitialize() + withMutex { $0.deinitialize() } } public func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposer = self.disposer self.disposer = nil - mutex.unlock() + withMutex { $0.unlock() } disposer?() } } @@ -59,48 +61,50 @@ public final class Disposer: Disposable { public final class DisposeBag: Disposable { private var disposables: [Disposable] private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } /// Create an empty instance public init() { self.disposables = [] - mutex.initialize() + withMutex { $0.initialize() } } /// Create an instance already containing `disposables` public init(_ disposables: S) where S.Iterator.Element == Disposable { self.disposables = Array(disposables) - mutex.initialize() + withMutex { $0.initialize() } } /// Create an instance already containing `disposables` public init(_ disposables: Disposable...) { self.disposables = disposables - mutex.initialize() + withMutex { $0.initialize() } } deinit { dispose() - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// Returns true if there is currently no disposables to dispose. public var isEmpty: Bool { - return mutex.protect { disposables.isEmpty } + return withMutex { $0.protect { disposables.isEmpty } } } public func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. self.disposables = [] - mutex.unlock() + withMutex { $0.unlock() } for disposable in disposables { disposable.dispose() } } /// Add `disposable` to `self` public func add(_ disposable: Disposable) { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } disposables.append(disposable) } } diff --git a/Flow/Future+Combiners.swift b/Flow/Future+Combiners.swift index 26dcc41..1d39dd5 100644 --- a/Flow/Future+Combiners.swift +++ b/Flow/Future+Combiners.swift @@ -65,9 +65,9 @@ public func join(_ futures: [Future], cancelNonCompleted: Bool = true) -> var results = [T?](repeating: nil, count: futures.count) let mutex = Mutex() func onValue(_ i: Int, _ val: T) { - mutex.protect { - results[i] = val - } + mutex.lock() + results[i] = val + mutex.unlock() } var future = futures.first!.onValue(on: .none) { onValue(0, $0) } @@ -220,7 +220,9 @@ public final class SingleTaskPerformer { mutex.unlock() // unlock while calling out as we might either recurs or always might execute at once. let singleFuture = function().always(on: .none) { - self.mutex.protect { self.future = nil } + self.mutex.lock() + self.future = nil + self.mutex.unlock() } mutex.lock() @@ -233,7 +235,9 @@ public final class SingleTaskPerformer { } public var isPerforming: Bool { - return mutex.protect { self.future != nil } + mutex.lock() + defer { mutex.unlock() } + return self.future != nil } } diff --git a/Flow/Future.swift b/Flow/Future.swift index 19fad38..26a0b80 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -107,7 +107,7 @@ public final class Future { try onResult(completion, Mover(shouldClone: true)) } } - mutex.initialize() + withMutex { $0.initialize() } scheduler.async { do { @@ -143,13 +143,13 @@ public final class Future { state = .completed(result) clone = { Future(result: result) } - mutex.initialize() + withMutex { $0.initialize() } } deinit { OSAtomicDecrement32(&futureUnitTestAliveCount) memPrint("Future deinit", futureUnitTestAliveCount) - mutex.deinitialize() + withMutex { $0.deinitialize() } } } @@ -327,18 +327,20 @@ func memPrint(_ str: String, _ count: Int32) { } private extension Future { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } private var protectedState: State { - return mutex.protect { state } + return withMutex { $0.protect { state } } } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } func completeWithResult(_ result: Result) { diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index 931f9e2..f058111 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -61,9 +61,9 @@ public extension FutureQueue { return Future { completion in let item = QueueItem(operation: operation, completion: completion) - self.mutex.protect { - self.items.append(item) - } + self.withMutex { $0.lock() } + self.items.append(item) + self.withMutex { $0.unlock() } self.executeNextItem() @@ -119,7 +119,7 @@ public extension FutureQueue { public extension FutureQueue { /// Do we have any enqueued operations? var isEmpty: Bool { - return mutex.protect { items.isEmpty } + return withMutex { $0.protect { items.isEmpty } } } /// Returns a signal that will signal when `isEmpty` is changed. @@ -164,19 +164,22 @@ public extension FutureQueue { /// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true. var closedError: Error? { - return mutex.protect { _closedError } + return withMutex { $0.protect { _closedError } } } } private extension FutureQueue { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } - func lock() { mutex.lock() } - func unlock() { mutex.unlock() } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } + + func lock() { withMutex { $0.lock() } } + func unlock() { withMutex { $0.unlock() } } func removeItem(_ item: Executable) { - mutex.protect { - _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } - } + withMutex { $0.lock() } + _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } + withMutex { $0.unlock() } } func executeNextItem() { @@ -188,9 +191,9 @@ private extension FutureQueue { unlock() item.execute(on: queueScheduler) { - self.mutex.protect { - self.concurrentCount -= 1 - } + self.lock() + self.concurrentCount -= 1 + self.unlock() self.removeItem(item) self.executeNextItem() } @@ -215,25 +218,27 @@ private final class QueueItem: Executable { private weak var future: Future? private var hasBeenCancelled = false private var _mutex = pthread_mutex_t() + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } init(operation: @escaping () throws -> Future, completion: @escaping (Result) -> ()) { self.completion = completion self.operation = operation - mutex.initialize() + withMutex { $0.initialize() } OSAtomicIncrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item init", queueItemUnitTestAliveCount) } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } OSAtomicDecrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item deinit", queueItemUnitTestAliveCount) } - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } - private func lock() { mutex.lock() } - private func unlock() { mutex.unlock() } + private func lock() { withMutex { $0.lock() } } + private func unlock() { withMutex { $0.unlock() } } private func complete(_ result: (Result)) { lock() diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 416be1b..8ca8472 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -11,32 +11,26 @@ import Foundation /// A reference wrapper around a POSIX thread mutex public final class Mutex { private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } public init() { - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// Attempt to acquire the lock, blocking a thread’s execution until the lock can be acquired. public func lock() { - mutex.lock() + withMutex { $0.lock() } } /// Releases a previously acquired lock. public func unlock() { - mutex.unlock() - } - - /// Will lock `self`, call `block`, then unlock `self` - @discardableResult - public func protect(_ block: () throws -> T) rethrows -> T { - mutex.lock() - defer { mutex.unlock() } - return try block() + withMutex { $0.unlock() } } } @@ -87,16 +81,18 @@ final class StateAndCallback: Disposable { var val: State fileprivate var disposables = [Disposable]() private var _mutex = pthread_mutex_t() - fileprivate var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + fileprivate func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } init(state: State, callback: @escaping (Value) -> ()) { val = state self.callback = callback - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } dispose() } @@ -106,27 +102,27 @@ final class StateAndCallback: Disposable { } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } @discardableResult func protect(_ block: () throws -> T) rethrows -> T { - mutex.lock() - defer { mutex.unlock() } + withMutex { $0.lock() } + defer { withMutex { $0.unlock() } } return try block() } func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. callback = nil exclusiveQueue = [] self.disposables = [] - mutex.unlock() + withMutex { $0.unlock() } for disposable in disposables { disposable.dispose() } } @@ -192,12 +188,12 @@ extension StateAndCallback where Value == () { func +=(bag: StateAndCallback, disposable: Disposable?) { guard let disposable = disposable else { return } - bag.mutex.lock() + bag.withMutex { $0.lock() } let hasBeenDisposed = bag.callback == nil if !hasBeenDisposed { bag.disposables.append(disposable) } - bag.mutex.unlock() + bag.withMutex { $0.unlock() } if hasBeenDisposed { disposable.dispose() } diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index 04e63ac..6c83100 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -15,19 +15,21 @@ import Foundation public final class OrderedCallbacker { private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:] private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } public init() { - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - return mutex.protect { callbacks.isEmpty } + return withMutex { $0.protect { callbacks.isEmpty } } } /// Register a callback and orderedValue to be called when `callAll` is executed. @@ -35,11 +37,13 @@ public final class OrderedCallbacker { /// - Parameter orderedValue: The value used to order this callback /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable { - return mutex.protect { - let key = generateKey() - callbacks[key] = (orderedValue, callback) - return Disposer { - self.mutex.protect { self.callbacks[key] = nil } + return withMutex { + $0.protect { + let key = generateKey() + callbacks[key] = (orderedValue, callback) + return Disposer { + self.withMutex { $0.protect { self.callbacks[key] = nil } } + } } } } @@ -48,9 +52,11 @@ public final class OrderedCallbacker { /// - Returns: A `Future` that will complete when all callbacks has been called. @discardableResult public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> { - return mutex.protect { - callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } - }.mapToFuture { $0(value) }.toVoid() + return withMutex { + $0.protect { + callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } + }.mapToFuture { $0(value) }.toVoid() + } } } diff --git a/Flow/Signal+Construction.swift b/Flow/Signal+Construction.swift index ee9697e..fc98885 100644 --- a/Flow/Signal+Construction.swift +++ b/Flow/Signal+Construction.swift @@ -113,27 +113,29 @@ private final class CallbackState: Disposable { let sharedKey: Key private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } init(shared: SharedState? = nil, getValue: (() -> Value)?, callback: @escaping (EventType) -> Void) { self.shared = shared self.sharedKey = shared == nil ? 0 : generateKey() self.getValue = getValue self.callback = callback - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } shared?.remove(key: sharedKey) } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } // For efficiency `Self` could also also behave as a `NoLockKeyDisposer``, saving us an allocation for each listener. @@ -293,7 +295,9 @@ private final class CallbackState: Disposable { final class SharedState { private let getValue: (() -> Value)? private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } typealias Callback = (EventType) -> Void var firstCallback: (key: Key, value: Callback)? @@ -303,19 +307,19 @@ final class SharedState { init(getValue: (() -> Value)? = nil) { self.getValue = getValue - mutex.initialize() + withMutex { $0.initialize() } } deinit { - mutex.deinitialize() + withMutex { $0.deinitialize() } } func lock() { - mutex.lock() + withMutex { $0.lock() } } func unlock() { - mutex.unlock() + withMutex { $0.unlock() } } func remove(key: Key) { diff --git a/Flow/Signal+Scheduling.swift b/Flow/Signal+Scheduling.swift index 36bc088..14e959b 100644 --- a/Flow/Signal+Scheduling.swift +++ b/Flow/Signal+Scheduling.swift @@ -120,48 +120,51 @@ internal extension CoreSignal { private final class OnEventTypeDisposer: Disposable { private var disposable: Disposable? private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &_mutex, body) + } + private let scheduler: Scheduler private var callback: ((EventType) -> Void)? init(on scheduler: Scheduler, callback: @escaping (EventType) -> Void, onEventType: @escaping (@escaping (EventType) -> Void) -> Disposable) { self.scheduler = scheduler self.callback = callback - mutex.initialize() + withMutex { $0.initialize() } let disposable = onEventType { [weak self] in self?.handleEventType($0) } - mutex.lock() + withMutex { $0.lock() } if self.callback == nil { disposable.dispose() } else { self.disposable = disposable } - mutex.unlock() + withMutex { $0.unlock() } } deinit { dispose() - mutex.deinitialize() + withMutex { $0.deinitialize() } } public func dispose() { - mutex.lock() + withMutex { $0.lock() } let disposable = self.disposable self.disposable = nil callback = nil - mutex.unlock() + withMutex { $0.unlock() } disposable?.dispose() } func handleEventType(_ eventType: EventType) { - mutex.lock() + withMutex { $0.lock() } guard let callback = self.callback else { - return mutex.unlock() + return withMutex { $0.unlock() } } - mutex.unlock() + withMutex { $0.unlock() } if scheduler.isImmediate { validate(eventType: eventType) @@ -176,14 +179,14 @@ private final class OnEventTypeDisposer: Disposable { scheduler.async { [weak self] in guard let `self` = self else { return } // At the time we are scheduled, we might already been disposed - self.mutex.lock() + self.withMutex { $0.lock() } guard let callback = self.callback else { - return self.mutex.unlock() + return self.withMutex { $0.unlock() } } self.validate(eventType: eventType) - self.mutex.unlock() + self.withMutex { $0.unlock() } callback(eventType) if case .event(.end) = eventType { self.dispose() diff --git a/Flow/Signal+Transforms.swift b/Flow/Signal+Transforms.swift index 8877b10..80359e3 100644 --- a/Flow/Signal+Transforms.swift +++ b/Flow/Signal+Transforms.swift @@ -741,14 +741,20 @@ private extension SignalProvider { let mutex = Mutex() var setter: ((T) -> ())? func setValue(_ value: T) { - let setValue = mutex.protect { setter ?? transform(signal.getter()!).setter! } + mutex.lock() + let setValue = setter ?? transform(signal.getter()!).setter! + mutex.unlock() setValue(value) } return CoreSignal(setValue: setValue, onEventType: { callback in let latestBag = DisposeBag() let bag = DisposeBag(latestBag) - bag += { mutex.protect { setter = nil } } + bag += { + mutex.lock() + setter = nil + mutex.unlock() + } bag += signal.onEventType(on: scheduler) { eventType in switch eventType { @@ -756,13 +762,17 @@ private extension SignalProvider { callback(.initial(nil)) case .initial(let val?): let signal = scheduler.sync { transform(val) } - mutex.protect { setter = signal.setter } + mutex.lock() + setter = signal.setter + mutex.unlock() latestBag += signal.onEventType(callback) case let .event(.value(val)): let isFirstEvent = latestBag.isEmpty latestBag.dispose() let signal = transform(val) - mutex.protect { setter = signal.setter } + mutex.lock() + setter = signal.setter + mutex.unlock() latestBag += signal.onEventType { eventType in switch eventType { case .initial(let val?) where KO.isReadable: diff --git a/FlowTests/FutureSchedulingTests.swift b/FlowTests/FutureSchedulingTests.swift index 17ecc64..16767ce 100644 --- a/FlowTests/FutureSchedulingTests.swift +++ b/FlowTests/FutureSchedulingTests.swift @@ -184,7 +184,9 @@ class FutureNewSchedulingTests: FutureTest { var f = Future(v).delay(by: delay) f = f.map(on: .concurrentBackground) { $0*2 } return f/*assertValue(v*2)*/.assert(on: .main).always(on: .concurrentBackground) { - mutex.protect { completeCount += 1 } + mutex.lock() + completeCount += 1 + mutex.unlock() } }).onCancel { e.fulfill() } diff --git a/FlowTests/SignalProviderTests.swift b/FlowTests/SignalProviderTests.swift index dcc2e31..590f856 100644 --- a/FlowTests/SignalProviderTests.swift +++ b/FlowTests/SignalProviderTests.swift @@ -1718,13 +1718,19 @@ class SignalProviderTests: XCTestCase { _ = Signal(callbacker: callbacker).start(with: 1).take(first: 2).onEvent(on: .concurrentBackground) { event in switch event { case .value(let val): - mutex.protect { result.append(val) } + mutex.lock() + result.append(val) + mutex.unlock() backgroundQueue.async { callbacker.callAll(with: val + 1) } - mutex.protect { result.append(val*10) } + mutex.lock() + result.append(val*10) + mutex.unlock() case .end: - XCTAssertEqual(mutex.protect { result }, [1, 10, 2, 20]) + mutex.lock() + XCTAssertEqual(result, [1, 10, 2, 20]) + mutex.unlock() } } } @@ -2718,13 +2724,15 @@ final class SimulatedTimer { func schedule(at time: TimeInterval, execute work: @escaping () -> ()) -> Disposable { let key = UUID() - mutex.protect { - assert(time >= self.time) - scheduledWork[key] = (time, work) - } + mutex.lock() + assert(time >= self.time) + scheduledWork[key] = (time, work) + mutex.unlock() return Disposer { - self.mutex.protect { self.scheduledWork[key] = nil } + self.mutex.lock() + self.scheduledWork[key] = nil + self.mutex.unlock() } } @@ -2748,7 +2756,9 @@ final class SimulatedTimer { //print("call", next) next.value.work() - mutex.protect { count -= 1 } + mutex.lock() + count -= 1 + mutex.unlock() mainQueue.async { self.release() } } } From 3aa11bcac843dc022b8c1884833620502c177fbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niil=20=C3=96hlin?= Date: Mon, 30 Mar 2020 11:40:18 +0200 Subject: [PATCH 2/4] create extension for pthread_mutex_t --- Flow/Callbacker.swift | 23 ++++++-------- Flow/Disposable.swift | 35 ++++++++++----------- Flow/Future.swift | 18 +++++------ Flow/FutureQueue.swift | 35 ++++++++++----------- Flow/Locking.swift | 56 ++++++++++++++++++++++------------ Flow/OrderedCallbacker.swift | 37 +++++++++++----------- Flow/Signal+Construction.swift | 22 +++++-------- Flow/Signal+Scheduling.swift | 27 ++++++++-------- 8 files changed, 124 insertions(+), 129 deletions(-) diff --git a/Flow/Callbacker.swift b/Flow/Callbacker.swift index a581fc4..c29209c 100644 --- a/Flow/Callbacker.swift +++ b/Flow/Callbacker.swift @@ -21,22 +21,19 @@ public final class Callbacker { private var callbacks = Callbacks.none private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } public init() { - withMutex { $0.initialize() } + _mutex.initialize() } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - withMutex { $0.lock() } - defer { withMutex { $0.unlock() } } + _mutex.lock() + defer { _mutex.unlock() } switch callbacks { case .none: return true @@ -48,8 +45,8 @@ public final class Callbacker { /// Register a callback to be called when `callAll` is executed. /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (Value) -> Void) -> Disposable { - withMutex { $0.lock() } - defer { withMutex { $0.unlock() } } + _mutex.lock() + defer { _mutex.unlock() } let key = generateKey() @@ -65,8 +62,8 @@ public final class Callbacker { } return NoLockKeyDisposer(key) { key in - self.withMutex { $0.lock() } - defer { self.withMutex { $0.unlock() } } + self._mutex.lock() + defer { self._mutex.unlock() } switch self.callbacks { case .single(let singleKey, _) where singleKey == key: @@ -84,9 +81,9 @@ public final class Callbacker { /// Will call all registered callbacks with `value` public func callAll(with value: Value) { - withMutex { $0.lock() } + _mutex.lock() let callbacks = self.callbacks - withMutex { $0.unlock() } + _mutex.unlock() switch callbacks { case .none: break diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 5fa4b42..71c4149 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -29,26 +29,23 @@ public struct NilDisposer: Disposable { public final class Disposer: Disposable { private var disposer: (() -> ())? private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } /// Pass a closure to be called when being disposed public init(_ disposer: @escaping () -> () = {}) { self.disposer = disposer - withMutex { $0.initialize() } + _mutex.initialize() } deinit { dispose() - withMutex { $0.deinitialize() } + _mutex.deinitialize() } public func dispose() { - withMutex { $0.lock() } + _mutex.lock() let disposer = self.disposer self.disposer = nil - withMutex { $0.unlock() } + _mutex.unlock() disposer?() } } @@ -61,50 +58,50 @@ public final class Disposer: Disposable { public final class DisposeBag: Disposable { private var disposables: [Disposable] private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } /// Create an empty instance public init() { self.disposables = [] - withMutex { $0.initialize() } + _mutex.initialize() } /// Create an instance already containing `disposables` public init(_ disposables: S) where S.Iterator.Element == Disposable { self.disposables = Array(disposables) - withMutex { $0.initialize() } + _mutex.initialize() } /// Create an instance already containing `disposables` public init(_ disposables: Disposable...) { self.disposables = disposables - withMutex { $0.initialize() } + _mutex.initialize() } deinit { dispose() - withMutex { $0.deinitialize() } + _mutex.deinitialize() } /// Returns true if there is currently no disposables to dispose. public var isEmpty: Bool { - return withMutex { $0.protect { disposables.isEmpty } } + _mutex.lock() + let isEmpty = disposables.isEmpty + _mutex.unlock() + return isEmpty } public func dispose() { - withMutex { $0.lock() } + _mutex.lock() let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. self.disposables = [] - withMutex { $0.unlock() } + _mutex.unlock() for disposable in disposables { disposable.dispose() } } /// Add `disposable` to `self` public func add(_ disposable: Disposable) { - withMutex { $0.lock() } - defer { withMutex { $0.unlock() } } + _mutex.lock() + defer { _mutex.unlock() } disposables.append(disposable) } } diff --git a/Flow/Future.swift b/Flow/Future.swift index 26a0b80..ce6d755 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -107,7 +107,7 @@ public final class Future { try onResult(completion, Mover(shouldClone: true)) } } - withMutex { $0.initialize() } + _mutex.initialize() scheduler.async { do { @@ -143,13 +143,13 @@ public final class Future { state = .completed(result) clone = { Future(result: result) } - withMutex { $0.initialize() } + _mutex.initialize() } deinit { OSAtomicDecrement32(&futureUnitTestAliveCount) memPrint("Future deinit", futureUnitTestAliveCount) - withMutex { $0.deinitialize() } + _mutex.deinitialize() } } @@ -327,20 +327,18 @@ func memPrint(_ str: String, _ count: Int32) { } private extension Future { - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } - private var protectedState: State { - return withMutex { $0.protect { state } } + _mutex.lock() + defer { _mutex.unlock() } + return state } func lock() { - withMutex { $0.lock() } + _mutex.lock() } func unlock() { - withMutex { $0.unlock() } + _mutex.unlock() } func completeWithResult(_ result: Result) { diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index f058111..eb5b6ff 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -61,9 +61,9 @@ public extension FutureQueue { return Future { completion in let item = QueueItem(operation: operation, completion: completion) - self.withMutex { $0.lock() } + self._mutex.lock() self.items.append(item) - self.withMutex { $0.unlock() } + self._mutex.unlock() self.executeNextItem() @@ -119,7 +119,9 @@ public extension FutureQueue { public extension FutureQueue { /// Do we have any enqueued operations? var isEmpty: Bool { - return withMutex { $0.protect { items.isEmpty } } + _mutex.lock() + defer { _mutex.unlock() } + return items.isEmpty } /// Returns a signal that will signal when `isEmpty` is changed. @@ -164,22 +166,20 @@ public extension FutureQueue { /// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true. var closedError: Error? { - return withMutex { $0.protect { _closedError } } + _mutex.lock() + defer { _mutex.unlock() } + return _closedError } } private extension FutureQueue { - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } - - func lock() { withMutex { $0.lock() } } - func unlock() { withMutex { $0.unlock() } } + func lock() { _mutex.lock() } + func unlock() { _mutex.unlock() } func removeItem(_ item: Executable) { - withMutex { $0.lock() } + _mutex.lock() _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } - withMutex { $0.unlock() } + _mutex.unlock() } func executeNextItem() { @@ -218,27 +218,24 @@ private final class QueueItem: Executable { private weak var future: Future? private var hasBeenCancelled = false private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } init(operation: @escaping () throws -> Future, completion: @escaping (Result) -> ()) { self.completion = completion self.operation = operation - withMutex { $0.initialize() } + _mutex.initialize() OSAtomicIncrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item init", queueItemUnitTestAliveCount) } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() OSAtomicDecrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item deinit", queueItemUnitTestAliveCount) } - private func lock() { withMutex { $0.lock() } } - private func unlock() { withMutex { $0.unlock() } } + private func lock() { _mutex.lock() } + private func unlock() { _mutex.unlock() } private func complete(_ result: (Result)) { lock() diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 8ca8472..874badf 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -11,26 +11,45 @@ import Foundation /// A reference wrapper around a POSIX thread mutex public final class Mutex { private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } public init() { - withMutex { $0.initialize() } + _mutex.initialize() } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() } /// Attempt to acquire the lock, blocking a thread’s execution until the lock can be acquired. public func lock() { - withMutex { $0.lock() } + _mutex.lock() } /// Releases a previously acquired lock. public func unlock() { - withMutex { $0.unlock() } + _mutex.unlock() + } +} + +extension pthread_mutex_t { + mutating func withPointer(_ body: (PThreadMutex) throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &self, body) + } + + mutating func initialize() { + withPointer { $0.initialize() } + } + + mutating func deinitialize() { + withPointer { $0.deinitialize() } + } + + mutating func lock() { + withPointer { $0.lock() } + } + + mutating func unlock() { + withPointer { $0.unlock() } } } @@ -81,18 +100,15 @@ final class StateAndCallback: Disposable { var val: State fileprivate var disposables = [Disposable]() private var _mutex = pthread_mutex_t() - fileprivate func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } init(state: State, callback: @escaping (Value) -> ()) { val = state self.callback = callback - withMutex { $0.initialize() } + _mutex.initialize() } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() dispose() } @@ -102,27 +118,27 @@ final class StateAndCallback: Disposable { } func lock() { - withMutex { $0.lock() } + _mutex.lock() } func unlock() { - withMutex { $0.unlock() } + _mutex.unlock() } @discardableResult func protect(_ block: () throws -> T) rethrows -> T { - withMutex { $0.lock() } - defer { withMutex { $0.unlock() } } + _mutex.lock() + defer { _mutex.unlock() } return try block() } func dispose() { - withMutex { $0.lock() } + _mutex.lock() let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. callback = nil exclusiveQueue = [] self.disposables = [] - withMutex { $0.unlock() } + _mutex.unlock() for disposable in disposables { disposable.dispose() } } @@ -188,12 +204,12 @@ extension StateAndCallback where Value == () { func +=(bag: StateAndCallback, disposable: Disposable?) { guard let disposable = disposable else { return } - bag.withMutex { $0.lock() } + bag.lock() let hasBeenDisposed = bag.callback == nil if !hasBeenDisposed { bag.disposables.append(disposable) } - bag.withMutex { $0.unlock() } + bag.unlock() if hasBeenDisposed { disposable.dispose() } diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index 6c83100..3d557ab 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -15,21 +15,21 @@ import Foundation public final class OrderedCallbacker { private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:] private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } public init() { - withMutex { $0.initialize() } + _mutex.initialize() } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - return withMutex { $0.protect { callbacks.isEmpty } } + _mutex.lock() + let isEmpty = callbacks.isEmpty + _mutex.unlock() + return isEmpty } /// Register a callback and orderedValue to be called when `callAll` is executed. @@ -37,14 +37,14 @@ public final class OrderedCallbacker { /// - Parameter orderedValue: The value used to order this callback /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable { - return withMutex { - $0.protect { - let key = generateKey() - callbacks[key] = (orderedValue, callback) - return Disposer { - self.withMutex { $0.protect { self.callbacks[key] = nil } } - } - } + _mutex.lock() + defer { _mutex.unlock() } + let key = generateKey() + callbacks[key] = (orderedValue, callback) + return Disposer { + self._mutex.lock() + self.callbacks[key] = nil + self._mutex.unlock() } } @@ -52,11 +52,10 @@ public final class OrderedCallbacker { /// - Returns: A `Future` that will complete when all callbacks has been called. @discardableResult public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> { - return withMutex { - $0.protect { - callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } - }.mapToFuture { $0(value) }.toVoid() - } + _mutex.lock() + let sortedCallbacks = callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } + _mutex.unlock() + return sortedCallbacks.mapToFuture { $0(value) }.toVoid() } } diff --git a/Flow/Signal+Construction.swift b/Flow/Signal+Construction.swift index fc98885..7230f4e 100644 --- a/Flow/Signal+Construction.swift +++ b/Flow/Signal+Construction.swift @@ -113,29 +113,26 @@ private final class CallbackState: Disposable { let sharedKey: Key private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } init(shared: SharedState? = nil, getValue: (() -> Value)?, callback: @escaping (EventType) -> Void) { self.shared = shared self.sharedKey = shared == nil ? 0 : generateKey() self.getValue = getValue self.callback = callback - withMutex { $0.initialize() } + _mutex.initialize() } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() shared?.remove(key: sharedKey) } func lock() { - withMutex { $0.lock() } + _mutex.lock() } func unlock() { - withMutex { $0.unlock() } + _mutex.unlock() } // For efficiency `Self` could also also behave as a `NoLockKeyDisposer``, saving us an allocation for each listener. @@ -295,9 +292,6 @@ private final class CallbackState: Disposable { final class SharedState { private let getValue: (() -> Value)? private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } typealias Callback = (EventType) -> Void var firstCallback: (key: Key, value: Callback)? @@ -307,19 +301,19 @@ final class SharedState { init(getValue: (() -> Value)? = nil) { self.getValue = getValue - withMutex { $0.initialize() } + _mutex.initialize() } deinit { - withMutex { $0.deinitialize() } + _mutex.deinitialize() } func lock() { - withMutex { $0.lock() } + _mutex.lock() } func unlock() { - withMutex { $0.unlock() } + _mutex.unlock() } func remove(key: Key) { diff --git a/Flow/Signal+Scheduling.swift b/Flow/Signal+Scheduling.swift index 14e959b..2209a7d 100644 --- a/Flow/Signal+Scheduling.swift +++ b/Flow/Signal+Scheduling.swift @@ -120,9 +120,6 @@ internal extension CoreSignal { private final class OnEventTypeDisposer: Disposable { private var disposable: Disposable? private var _mutex = pthread_mutex_t() - private func withMutex(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &_mutex, body) - } private let scheduler: Scheduler private var callback: ((EventType) -> Void)? @@ -130,41 +127,41 @@ private final class OnEventTypeDisposer: Disposable { init(on scheduler: Scheduler, callback: @escaping (EventType) -> Void, onEventType: @escaping (@escaping (EventType) -> Void) -> Disposable) { self.scheduler = scheduler self.callback = callback - withMutex { $0.initialize() } + _mutex.initialize() let disposable = onEventType { [weak self] in self?.handleEventType($0) } - withMutex { $0.lock() } + _mutex.lock() if self.callback == nil { disposable.dispose() } else { self.disposable = disposable } - withMutex { $0.unlock() } + _mutex.unlock() } deinit { dispose() - withMutex { $0.deinitialize() } + _mutex.deinitialize() } public func dispose() { - withMutex { $0.lock() } + _mutex.lock() let disposable = self.disposable self.disposable = nil callback = nil - withMutex { $0.unlock() } + _mutex.unlock() disposable?.dispose() } func handleEventType(_ eventType: EventType) { - withMutex { $0.lock() } + _mutex.lock() guard let callback = self.callback else { - return withMutex { $0.unlock() } + return _mutex.unlock() } - withMutex { $0.unlock() } + _mutex.unlock() if scheduler.isImmediate { validate(eventType: eventType) @@ -179,14 +176,14 @@ private final class OnEventTypeDisposer: Disposable { scheduler.async { [weak self] in guard let `self` = self else { return } // At the time we are scheduled, we might already been disposed - self.withMutex { $0.lock() } + self._mutex.lock() guard let callback = self.callback else { - return self.withMutex { $0.unlock() } + return self._mutex.unlock() } self.validate(eventType: eventType) - self.withMutex { $0.unlock() } + self._mutex.unlock() callback(eventType) if case .event(.end) = eventType { self.dispose() From bfbf94bdb4e6b1edeb1d9e63967a7137d943f51b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niil=20=C3=96hlin?= Date: Mon, 30 Mar 2020 12:46:10 +0200 Subject: [PATCH 3/4] rename _mutex to mutex --- Flow/Callbacker.swift | 22 +++++++++++----------- Flow/Disposable.swift | 32 ++++++++++++++++---------------- Flow/Future.swift | 16 ++++++++-------- Flow/FutureQueue.swift | 32 ++++++++++++++++---------------- Flow/Locking.swift | 30 +++++++++++++++--------------- Flow/OrderedCallbacker.swift | 22 +++++++++++----------- Flow/Signal+Construction.swift | 20 ++++++++++---------- Flow/Signal+Scheduling.swift | 26 +++++++++++++------------- 8 files changed, 100 insertions(+), 100 deletions(-) diff --git a/Flow/Callbacker.swift b/Flow/Callbacker.swift index c29209c..99bfccf 100644 --- a/Flow/Callbacker.swift +++ b/Flow/Callbacker.swift @@ -20,20 +20,20 @@ public final class Callbacker { } private var callbacks = Callbacks.none - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() public init() { - _mutex.initialize() + mutex.initialize() } deinit { - _mutex.deinitialize() + mutex.deinitialize() } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } switch callbacks { case .none: return true @@ -45,8 +45,8 @@ public final class Callbacker { /// Register a callback to be called when `callAll` is executed. /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (Value) -> Void) -> Disposable { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } let key = generateKey() @@ -62,8 +62,8 @@ public final class Callbacker { } return NoLockKeyDisposer(key) { key in - self._mutex.lock() - defer { self._mutex.unlock() } + self.mutex.lock() + defer { self.mutex.unlock() } switch self.callbacks { case .single(let singleKey, _) where singleKey == key: @@ -81,9 +81,9 @@ public final class Callbacker { /// Will call all registered callbacks with `value` public func callAll(with value: Value) { - _mutex.lock() + mutex.lock() let callbacks = self.callbacks - _mutex.unlock() + mutex.unlock() switch callbacks { case .none: break diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 71c4149..5949b22 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -28,24 +28,24 @@ public struct NilDisposer: Disposable { /// - Note: Is thread safe and reentrant (dispose callback could call itself) public final class Disposer: Disposable { private var disposer: (() -> ())? - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() /// Pass a closure to be called when being disposed public init(_ disposer: @escaping () -> () = {}) { self.disposer = disposer - _mutex.initialize() + mutex.initialize() } deinit { dispose() - _mutex.deinitialize() + mutex.deinitialize() } public func dispose() { - _mutex.lock() + mutex.lock() let disposer = self.disposer self.disposer = nil - _mutex.unlock() + mutex.unlock() disposer?() } } @@ -57,51 +57,51 @@ public final class Disposer: Disposable { /// - Note: New disposables could be added after a disposal. public final class DisposeBag: Disposable { private var disposables: [Disposable] - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() /// Create an empty instance public init() { self.disposables = [] - _mutex.initialize() + mutex.initialize() } /// Create an instance already containing `disposables` public init(_ disposables: S) where S.Iterator.Element == Disposable { self.disposables = Array(disposables) - _mutex.initialize() + mutex.initialize() } /// Create an instance already containing `disposables` public init(_ disposables: Disposable...) { self.disposables = disposables - _mutex.initialize() + mutex.initialize() } deinit { dispose() - _mutex.deinitialize() + mutex.deinitialize() } /// Returns true if there is currently no disposables to dispose. public var isEmpty: Bool { - _mutex.lock() + mutex.lock() let isEmpty = disposables.isEmpty - _mutex.unlock() + mutex.unlock() return isEmpty } public func dispose() { - _mutex.lock() + mutex.lock() let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. self.disposables = [] - _mutex.unlock() + mutex.unlock() for disposable in disposables { disposable.dispose() } } /// Add `disposable` to `self` public func add(_ disposable: Disposable) { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } disposables.append(disposable) } } diff --git a/Flow/Future.swift b/Flow/Future.swift index ce6d755..82bf33f 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -57,7 +57,7 @@ public final class Future { private var state: State private let clone: () -> Future - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() /// Helper used to move external futures inside `Future.init`'s `onComplete` closure. Needed for repetition to work properly. public struct Mover { @@ -107,7 +107,7 @@ public final class Future { try onResult(completion, Mover(shouldClone: true)) } } - _mutex.initialize() + mutex.initialize() scheduler.async { do { @@ -143,13 +143,13 @@ public final class Future { state = .completed(result) clone = { Future(result: result) } - _mutex.initialize() + mutex.initialize() } deinit { OSAtomicDecrement32(&futureUnitTestAliveCount) memPrint("Future deinit", futureUnitTestAliveCount) - _mutex.deinitialize() + mutex.deinitialize() } } @@ -328,17 +328,17 @@ func memPrint(_ str: String, _ count: Int32) { private extension Future { private var protectedState: State { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } return state } func lock() { - _mutex.lock() + mutex.lock() } func unlock() { - _mutex.unlock() + mutex.unlock() } func completeWithResult(_ result: Result) { diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index eb5b6ff..145ed53 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -18,7 +18,7 @@ public final class FutureQueue { private let queueScheduler: Scheduler private var _closedError: Error? private let isEmptyCallbacker = Callbacker() - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() // enqueued items. private var items: [Executable] = [] { @@ -61,9 +61,9 @@ public extension FutureQueue { return Future { completion in let item = QueueItem(operation: operation, completion: completion) - self._mutex.lock() + self.mutex.lock() self.items.append(item) - self._mutex.unlock() + self.mutex.unlock() self.executeNextItem() @@ -119,8 +119,8 @@ public extension FutureQueue { public extension FutureQueue { /// Do we have any enqueued operations? var isEmpty: Bool { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } return items.isEmpty } @@ -166,20 +166,20 @@ public extension FutureQueue { /// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true. var closedError: Error? { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } return _closedError } } private extension FutureQueue { - func lock() { _mutex.lock() } - func unlock() { _mutex.unlock() } + func lock() { mutex.lock() } + func unlock() { mutex.unlock() } func removeItem(_ item: Executable) { - _mutex.lock() + mutex.lock() _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } - _mutex.unlock() + mutex.unlock() } func executeNextItem() { @@ -217,25 +217,25 @@ private final class QueueItem: Executable { private let completion: (Result) -> () private weak var future: Future? private var hasBeenCancelled = false - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() init(operation: @escaping () throws -> Future, completion: @escaping (Result) -> ()) { self.completion = completion self.operation = operation - _mutex.initialize() + mutex.initialize() OSAtomicIncrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item init", queueItemUnitTestAliveCount) } deinit { - _mutex.deinitialize() + mutex.deinitialize() OSAtomicDecrement32(&queueItemUnitTestAliveCount) memPrint("Queue Item deinit", queueItemUnitTestAliveCount) } - private func lock() { _mutex.lock() } - private func unlock() { _mutex.unlock() } + private func lock() { mutex.lock() } + private func unlock() { mutex.unlock() } private func complete(_ result: (Result)) { lock() diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 874badf..24af83c 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -10,24 +10,24 @@ import Foundation /// A reference wrapper around a POSIX thread mutex public final class Mutex { - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() public init() { - _mutex.initialize() + mutex.initialize() } deinit { - _mutex.deinitialize() + mutex.deinitialize() } /// Attempt to acquire the lock, blocking a thread’s execution until the lock can be acquired. public func lock() { - _mutex.lock() + mutex.lock() } /// Releases a previously acquired lock. public func unlock() { - _mutex.unlock() + mutex.unlock() } } @@ -35,7 +35,7 @@ extension pthread_mutex_t { mutating func withPointer(_ body: (PThreadMutex) throws -> T) rethrows -> T { try withUnsafeMutablePointer(to: &self, body) } - + mutating func initialize() { withPointer { $0.initialize() } } @@ -99,16 +99,16 @@ final class StateAndCallback: Disposable { var callback: ((Value) -> ())? var val: State fileprivate var disposables = [Disposable]() - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() init(state: State, callback: @escaping (Value) -> ()) { val = state self.callback = callback - _mutex.initialize() + mutex.initialize() } deinit { - _mutex.deinitialize() + mutex.deinitialize() dispose() } @@ -118,27 +118,27 @@ final class StateAndCallback: Disposable { } func lock() { - _mutex.lock() + mutex.lock() } func unlock() { - _mutex.unlock() + mutex.unlock() } @discardableResult func protect(_ block: () throws -> T) rethrows -> T { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } return try block() } func dispose() { - _mutex.lock() + mutex.lock() let disposables = self.disposables // make sure to make a copy in the case any call to dispose will recursivaly call us back. callback = nil exclusiveQueue = [] self.disposables = [] - _mutex.unlock() + mutex.unlock() for disposable in disposables { disposable.dispose() } } diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index 3d557ab..075db90 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -14,21 +14,21 @@ import Foundation /// - Note: Is thread safe. public final class OrderedCallbacker { private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:] - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() public init() { - _mutex.initialize() + mutex.initialize() } deinit { - _mutex.deinitialize() + mutex.deinitialize() } /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - _mutex.lock() + mutex.lock() let isEmpty = callbacks.isEmpty - _mutex.unlock() + mutex.unlock() return isEmpty } @@ -37,14 +37,14 @@ public final class OrderedCallbacker { /// - Parameter orderedValue: The value used to order this callback /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable { - _mutex.lock() - defer { _mutex.unlock() } + mutex.lock() + defer { mutex.unlock() } let key = generateKey() callbacks[key] = (orderedValue, callback) return Disposer { - self._mutex.lock() + self.mutex.lock() self.callbacks[key] = nil - self._mutex.unlock() + self.mutex.unlock() } } @@ -52,9 +52,9 @@ public final class OrderedCallbacker { /// - Returns: A `Future` that will complete when all callbacks has been called. @discardableResult public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> { - _mutex.lock() + mutex.lock() let sortedCallbacks = callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } - _mutex.unlock() + mutex.unlock() return sortedCallbacks.mapToFuture { $0(value) }.toVoid() } } diff --git a/Flow/Signal+Construction.swift b/Flow/Signal+Construction.swift index 7230f4e..0bdfc15 100644 --- a/Flow/Signal+Construction.swift +++ b/Flow/Signal+Construction.swift @@ -112,27 +112,27 @@ private final class CallbackState: Disposable { private var shared: SharedState? let sharedKey: Key - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() init(shared: SharedState? = nil, getValue: (() -> Value)?, callback: @escaping (EventType) -> Void) { self.shared = shared self.sharedKey = shared == nil ? 0 : generateKey() self.getValue = getValue self.callback = callback - _mutex.initialize() + mutex.initialize() } deinit { - _mutex.deinitialize() + mutex.deinitialize() shared?.remove(key: sharedKey) } func lock() { - _mutex.lock() + mutex.lock() } func unlock() { - _mutex.unlock() + mutex.unlock() } // For efficiency `Self` could also also behave as a `NoLockKeyDisposer``, saving us an allocation for each listener. @@ -291,7 +291,7 @@ private final class CallbackState: Disposable { /// Helper to implement sharing of a single `onEvent` if more than one listner, see `SignalOption.shared` final class SharedState { private let getValue: (() -> Value)? - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() typealias Callback = (EventType) -> Void var firstCallback: (key: Key, value: Callback)? @@ -301,19 +301,19 @@ final class SharedState { init(getValue: (() -> Value)? = nil) { self.getValue = getValue - _mutex.initialize() + mutex.initialize() } deinit { - _mutex.deinitialize() + mutex.deinitialize() } func lock() { - _mutex.lock() + mutex.lock() } func unlock() { - _mutex.unlock() + mutex.unlock() } func remove(key: Key) { diff --git a/Flow/Signal+Scheduling.swift b/Flow/Signal+Scheduling.swift index 2209a7d..6db5972 100644 --- a/Flow/Signal+Scheduling.swift +++ b/Flow/Signal+Scheduling.swift @@ -119,7 +119,7 @@ internal extension CoreSignal { // Using custom Disposable instead of DisposeBag for efficiency (less allocations) private final class OnEventTypeDisposer: Disposable { private var disposable: Disposable? - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() private let scheduler: Scheduler private var callback: ((EventType) -> Void)? @@ -127,41 +127,41 @@ private final class OnEventTypeDisposer: Disposable { init(on scheduler: Scheduler, callback: @escaping (EventType) -> Void, onEventType: @escaping (@escaping (EventType) -> Void) -> Disposable) { self.scheduler = scheduler self.callback = callback - _mutex.initialize() + mutex.initialize() let disposable = onEventType { [weak self] in self?.handleEventType($0) } - _mutex.lock() + mutex.lock() if self.callback == nil { disposable.dispose() } else { self.disposable = disposable } - _mutex.unlock() + mutex.unlock() } deinit { dispose() - _mutex.deinitialize() + mutex.deinitialize() } public func dispose() { - _mutex.lock() + mutex.lock() let disposable = self.disposable self.disposable = nil callback = nil - _mutex.unlock() + mutex.unlock() disposable?.dispose() } func handleEventType(_ eventType: EventType) { - _mutex.lock() + mutex.lock() guard let callback = self.callback else { - return _mutex.unlock() + return mutex.unlock() } - _mutex.unlock() + mutex.unlock() if scheduler.isImmediate { validate(eventType: eventType) @@ -176,14 +176,14 @@ private final class OnEventTypeDisposer: Disposable { scheduler.async { [weak self] in guard let `self` = self else { return } // At the time we are scheduled, we might already been disposed - self._mutex.lock() + self.mutex.lock() guard let callback = self.callback else { - return self._mutex.unlock() + return self.mutex.unlock() } self.validate(eventType: eventType) - self._mutex.unlock() + self.mutex.unlock() callback(eventType) if case .event(.end) = eventType { self.dispose() From 797072b267fedd887d4549100e05390a8d625624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niil=20=C3=96hlin?= Date: Mon, 30 Mar 2020 14:49:40 +0200 Subject: [PATCH 4/4] Remove missing return statement --- Flow/Locking.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 24af83c..4447da2 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -33,7 +33,7 @@ public final class Mutex { extension pthread_mutex_t { mutating func withPointer(_ body: (PThreadMutex) throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &self, body) + return try withUnsafeMutablePointer(to: &self, body) } mutating func initialize() {