diff --git a/Sources/DesignAlgorithmsKit/Behavioral/Orchestration/TaskOrchestrator.swift b/Sources/DesignAlgorithmsKit/Behavioral/Orchestration/TaskOrchestrator.swift new file mode 100644 index 0000000..8e1a13f --- /dev/null +++ b/Sources/DesignAlgorithmsKit/Behavioral/Orchestration/TaskOrchestrator.swift @@ -0,0 +1,138 @@ +import Foundation + +/// Represents a unit of work in a dependency graph. +public struct DAGTask: Identifiable, Sendable { + public let id: String + public let operation: @Sendable () async throws -> Void + public let dependencies: Set + + public init(id: String, dependencies: Set = [], operation: @escaping @Sendable () async throws -> Void) { + self.id = id + self.dependencies = dependencies + self.operation = operation + } +} + +public enum OrchestrationError: Error { + case cycleDetected + case dependencyNotFound(taskId: String, dependencyId: String) + case taskFailure(taskId: String, error: Error) +} + +/// A Task Orchestrator that executes tasks based on a Directed Acyclic Graph (DAG). +/// It ensures that a task only runs when all its dependencies have successfully completed. +@available(macOS 12.0, iOS 15.0, *) +public actor TaskOrchestrator { + + private var tasks: [String: DAGTask] = [:] + + public init() {} + + /// Adds a task to the orchestrator. + public func addTask(_ task: DAGTask) { + tasks[task.id] = task + } + + /// Validates the graph for cycles and missing dependencies. + public func validate() throws { + // Check for missing dependencies + for task in tasks.values { + for dep in task.dependencies { + guard tasks[dep] != nil else { + throw OrchestrationError.dependencyNotFound(taskId: task.id, dependencyId: dep) + } + } + } + + // Detect Cycles (DFS) + var visited: Set = [] + var recursionStack: Set = [] + + func hasCycle(_ nodeId: String) -> Bool { + visited.insert(nodeId) + recursionStack.insert(nodeId) + + if let node = tasks[nodeId] { + for dep in node.dependencies { + if !visited.contains(dep) { + if hasCycle(dep) { return true } + } else if recursionStack.contains(dep) { + return true + } + } + } + + recursionStack.remove(nodeId) + return false + } + + for nodeId in tasks.keys { + if !visited.contains(nodeId) { + if hasCycle(nodeId) { + throw OrchestrationError.cycleDetected + } + } + } + } + + /// Executes all tasks in the graph, respecting dependencies. + /// Runs independent tasks in parallel where possible. + public func execute() async throws { + try validate() + + // Build adjacency list for "dependents" (upstream -> [downstream]) + var dependents: [String: [String]] = [:] + var inDegree: [String: Int] = [:] + + for task in tasks.values { + inDegree[task.id] = task.dependencies.count + for dep in task.dependencies { + dependents[dep, default: []].append(task.id) + } + } + + // Queue of tasks ready to run (in-degree 0) + let readyQueue: [String] = tasks.values.filter { ($0.dependencies.isEmpty) }.map { $0.id } + + // We use a task group to run ready tasks concurrently + try await withThrowingTaskGroup(of: String.self) { group in + + // Initial batch + for taskId in readyQueue { + guard let task = tasks[taskId] else { continue } + group.addTask { + try await task.operation() + return taskId + } + } + + // Loop as tasks finish + var remainingTasks = tasks.count + + while remainingTasks > 0 { + // Wait for any task to finish + guard let finishedTaskId = try await group.next() else { + break + } + + remainingTasks -= 1 + + // Unlock downstream + if let downstreamNodes = dependents[finishedTaskId] { + for downstreamId in downstreamNodes { + inDegree[downstreamId, default: 0] -= 1 + if inDegree[downstreamId] == 0 { + // Ready! + if let task = tasks[downstreamId] { + group.addTask { + try await task.operation() + return downstreamId + } + } + } + } + } + } + } + } +} diff --git a/Sources/DesignAlgorithmsKit/Behavioral/ResourceCoordinator/ResourceCoordinator.swift b/Sources/DesignAlgorithmsKit/Behavioral/ResourceCoordinator/ResourceCoordinator.swift new file mode 100644 index 0000000..3e059fd --- /dev/null +++ b/Sources/DesignAlgorithmsKit/Behavioral/ResourceCoordinator/ResourceCoordinator.swift @@ -0,0 +1,112 @@ +import Foundation + +/// A coordinator that manages safe concurrent access to resources identified by keys (e.g., file paths). +/// It implements a Read-Write Lock semantics where multiple readers can access a resource simultaneously, +/// but writers require exclusive access. +@available(macOS 12.0, iOS 15.0, *) +public actor ResourceCoordinator { + + private var locks: [String: PathLock] = [:] + + public init() {} + + /// Executes the given block with a shared (read) lock on the specified resource. + /// Other readers can execute concurrently, but writers will block. + public func withReadLock(for key: String, operation: () async throws -> T) async throws -> T { + return try await access(path: key, type: .read, operation: operation) + } + + /// Helper to just acquire and return a token? + /// No, closures are safer. + + public func access(path: String, type: AccessType, operation: () async throws -> T) async throws -> T { + let lock = getLock(for: path) + switch type { + case .read: + await lock.lockRead() + // We use standard do/defer pattern here since we are inside an async function + defer { Task { await lock.unlockRead() } } + return try await operation() + case .write: + await lock.lockWrite() + defer { Task { await lock.unlockWrite() } } + return try await operation() + } + } + + private func getLock(for key: String) -> PathLock { + if let lock = locks[key] { + return lock + } + let newLock = PathLock() + locks[key] = newLock + return newLock + } + + public enum AccessType { + case read + case write + } +} + +/// A standard Read-Write lock implemented as an Actor. +@available(macOS 12.0, iOS 15.0, *) +actor PathLock { + private var readers: Int = 0 + private var writers: Int = 0 // Should be 0 or 1 + private var writeWaiters: [CheckedContinuation] = [] + private var readWaiters: [CheckedContinuation] = [] + + func lockRead() async { + if writers > 0 || !writeWaiters.isEmpty { + // Writer has priority or active + await withCheckedContinuation { continuation in + readWaiters.append(continuation) + } + } else { + readers += 1 + } + } + + func unlockRead() { + readers -= 1 + if readers == 0 { + // If no more readers, wake one writer if any + if !writeWaiters.isEmpty { + let writer = writeWaiters.removeFirst() + writers = 1 // Pass ownership + writer.resume() + } + } + } + + func lockWrite() async { + if readers > 0 || writers > 0 { + await withCheckedContinuation { continuation in + writeWaiters.append(continuation) + } + } else { + writers = 1 + } + } + + func unlockWrite() { + writers = 0 + // Prefer writers? Or strict FIFO? + // Simple implementation: Wake one writer if present, else wake all readers. + + if !writeWaiters.isEmpty { + let nextWriter = writeWaiters.removeFirst() + writers = 1 + nextWriter.resume() + } else { + // Wake ALL readers + let currentReaders = readWaiters + readWaiters.removeAll() + readers += currentReaders.count + for reader in currentReaders { + reader.resume() + } + } + } +} diff --git a/Tests/DesignAlgorithmsKitTests/Behavioral/Orchestration/TaskOrchestratorTests.swift b/Tests/DesignAlgorithmsKitTests/Behavioral/Orchestration/TaskOrchestratorTests.swift new file mode 100644 index 0000000..2f90e20 --- /dev/null +++ b/Tests/DesignAlgorithmsKitTests/Behavioral/Orchestration/TaskOrchestratorTests.swift @@ -0,0 +1,92 @@ +import XCTest +@testable import DesignAlgorithmsKit + +@available(macOS 12.0, iOS 15.0, *) +final class TaskOrchestratorTests: XCTestCase { + + func testLinearDependency() async throws { + let orchestrator = TaskOrchestrator() + let result = ResultCollector() + + let taskA = DAGTask(id: "A") { + await result.append("A") + } + + // B depends on A + let taskB = DAGTask(id: "B", dependencies: ["A"]) { + await result.append("B") + } + + // C depends on B + let taskC = DAGTask(id: "C", dependencies: ["B"]) { + await result.append("C") + } + + await orchestrator.addTask(taskA) + await orchestrator.addTask(taskB) + await orchestrator.addTask(taskC) + + try await orchestrator.execute() + + let order = await result.items + XCTAssertEqual(order, ["A", "B", "C"]) + } + + func testParallelExecution() async throws { + let orchestrator = TaskOrchestrator() + let result = ResultCollector() + + // A and B independent. C depends on both. + let taskA = DAGTask(id: "A") { + try? await Task.sleep(nanoseconds: 10_000_000) + await result.append("A") + } + + let taskB = DAGTask(id: "B") { + try? await Task.sleep(nanoseconds: 10_000_000) + await result.append("B") + } + + let taskC = DAGTask(id: "C", dependencies: ["A", "B"]) { + await result.append("C") + } + + await orchestrator.addTask(taskA) + await orchestrator.addTask(taskB) + await orchestrator.addTask(taskC) + + try await orchestrator.execute() + + let order = await result.items + // A and B can be in any order, but both must be before C + XCTAssertTrue(order.contains("A")) + XCTAssertTrue(order.contains("B")) + XCTAssertEqual(order.last, "C") + XCTAssertEqual(order.count, 3) + } + + func testCycleDetection() async { + let orchestrator = TaskOrchestrator() + + let taskA = DAGTask(id: "A", dependencies: ["B"]) { } + let taskB = DAGTask(id: "B", dependencies: ["A"]) { } + + await orchestrator.addTask(taskA) + await orchestrator.addTask(taskB) + + do { + try await orchestrator.execute() + XCTFail("Should have thrown cycle detected error") + } catch OrchestrationError.cycleDetected { + // Success + } catch { + XCTFail("Wrong error: \(error)") + } + } + + // MARK: - Helpers + actor ResultCollector { + var items: [String] = [] + func append(_ item: String) { items.append(item) } + } +} diff --git a/Tests/DesignAlgorithmsKitTests/Behavioral/ResourceCoordinatorTests.swift b/Tests/DesignAlgorithmsKitTests/Behavioral/ResourceCoordinatorTests.swift new file mode 100644 index 0000000..60ba7c3 --- /dev/null +++ b/Tests/DesignAlgorithmsKitTests/Behavioral/ResourceCoordinatorTests.swift @@ -0,0 +1,74 @@ +import XCTest +@testable import DesignAlgorithmsKit + +@available(macOS 12.0, iOS 15.0, *) +final class ResourceCoordinatorTests: XCTestCase { + + // MARK: - Integration Tests + + func testConcurrentReads() async throws { + let coordinator = ResourceCoordinator() + let path = "test/file.txt" + + let counter = Counter() + + // Launch 10 concurrent readers + await withTaskGroup(of: Void.self) { group in + for _ in 0..<10 { + group.addTask { + try? await coordinator.access(path: path, type: .read) { + try? await Task.sleep(nanoseconds: 10_000_000) // 10ms + await counter.increment() + } + } + } + } + + // Assert all 10 ran + let count = await counter.value + XCTAssertEqual(count, 10) + } + + func testWriteExclusivity() async throws { + let coordinator = ResourceCoordinator() + let path = "test/shared.txt" + let state = SharedState() + + let writeExp = expectation(description: "Write finished") + let readExp = expectation(description: "Read finished") + + Task { + try await coordinator.access(path: path, type: .write) { + await state.set(100) + try await Task.sleep(nanoseconds: 200_000_000) // 200ms + await state.set(200) + } + writeExp.fulfill() + } + + // Delay slightly to ensure writer enters lock + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + + Task { + let val = try await coordinator.access(path: path, type: .read) { + return await state.value + } + XCTAssertEqual(val, 200, "Reader should have waited for writer to finish") + readExp.fulfill() + } + + await fulfillment(of: [writeExp, readExp], timeout: 2.0) + } + + // MARK: - Helpers + + actor Counter { + var value = 0 + func increment() { value += 1 } + } + + actor SharedState { + var value = 0 + func set(_ v: Int) { value = v } + } +}