Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import Foundation

/// Represents a unit of work in a dependency graph.
public struct DAGTask: Identifiable, Sendable {
public let id: String
public let operation: @Sendable () async throws -> Void
public let dependencies: Set<String>

public init(id: String, dependencies: Set<String> = [], operation: @escaping @Sendable () async throws -> Void) {
self.id = id
self.dependencies = dependencies
self.operation = operation
}
}

public enum OrchestrationError: Error {
case cycleDetected
case dependencyNotFound(taskId: String, dependencyId: String)
case taskFailure(taskId: String, error: Error)
}

/// A Task Orchestrator that executes tasks based on a Directed Acyclic Graph (DAG).
/// It ensures that a task only runs when all its dependencies have successfully completed.
@available(macOS 12.0, iOS 15.0, *)
public actor TaskOrchestrator {

private var tasks: [String: DAGTask] = [:]

public init() {}

/// Adds a task to the orchestrator.
public func addTask(_ task: DAGTask) {
tasks[task.id] = task
}

/// Validates the graph for cycles and missing dependencies.
public func validate() throws {
// Check for missing dependencies
for task in tasks.values {
for dep in task.dependencies {
guard tasks[dep] != nil else {
throw OrchestrationError.dependencyNotFound(taskId: task.id, dependencyId: dep)
}
}
}

// Detect Cycles (DFS)
var visited: Set<String> = []
var recursionStack: Set<String> = []

func hasCycle(_ nodeId: String) -> Bool {
visited.insert(nodeId)
recursionStack.insert(nodeId)

if let node = tasks[nodeId] {
for dep in node.dependencies {
if !visited.contains(dep) {
if hasCycle(dep) { return true }
} else if recursionStack.contains(dep) {
return true
}
}
}

recursionStack.remove(nodeId)
return false
}

for nodeId in tasks.keys {
if !visited.contains(nodeId) {
if hasCycle(nodeId) {
throw OrchestrationError.cycleDetected
}
}
}
}

/// Executes all tasks in the graph, respecting dependencies.
/// Runs independent tasks in parallel where possible.
public func execute() async throws {
try validate()

// Build adjacency list for "dependents" (upstream -> [downstream])
var dependents: [String: [String]] = [:]
var inDegree: [String: Int] = [:]

for task in tasks.values {
inDegree[task.id] = task.dependencies.count
for dep in task.dependencies {
dependents[dep, default: []].append(task.id)
}
}

// Queue of tasks ready to run (in-degree 0)
let readyQueue: [String] = tasks.values.filter { ($0.dependencies.isEmpty) }.map { $0.id }

// We use a task group to run ready tasks concurrently
try await withThrowingTaskGroup(of: String.self) { group in

// Initial batch
for taskId in readyQueue {
guard let task = tasks[taskId] else { continue }
group.addTask {
try await task.operation()
return taskId
}
}

// Loop as tasks finish
var remainingTasks = tasks.count

while remainingTasks > 0 {
// Wait for any task to finish
guard let finishedTaskId = try await group.next() else {
break
}

remainingTasks -= 1

// Unlock downstream
if let downstreamNodes = dependents[finishedTaskId] {
for downstreamId in downstreamNodes {
inDegree[downstreamId, default: 0] -= 1
if inDegree[downstreamId] == 0 {
// Ready!
if let task = tasks[downstreamId] {
group.addTask {
try await task.operation()
return downstreamId
}
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import Foundation

/// A coordinator that manages safe concurrent access to resources identified by keys (e.g., file paths).
/// It implements a Read-Write Lock semantics where multiple readers can access a resource simultaneously,
/// but writers require exclusive access.
@available(macOS 12.0, iOS 15.0, *)
public actor ResourceCoordinator {

private var locks: [String: PathLock] = [:]

public init() {}

/// Executes the given block with a shared (read) lock on the specified resource.
/// Other readers can execute concurrently, but writers will block.
public func withReadLock<T>(for key: String, operation: () async throws -> T) async throws -> T {
return try await access(path: key, type: .read, operation: operation)
}

/// Helper to just acquire and return a token?
/// No, closures are safer.

public func access<T>(path: String, type: AccessType, operation: () async throws -> T) async throws -> T {
let lock = getLock(for: path)
switch type {
case .read:
await lock.lockRead()
// We use standard do/defer pattern here since we are inside an async function
defer { Task { await lock.unlockRead() } }
return try await operation()
case .write:
await lock.lockWrite()
defer { Task { await lock.unlockWrite() } }
return try await operation()
}
}

private func getLock(for key: String) -> PathLock {
if let lock = locks[key] {
return lock
}
let newLock = PathLock()
locks[key] = newLock
return newLock
}

public enum AccessType {
case read
case write
}
}

/// A standard Read-Write lock implemented as an Actor.
@available(macOS 12.0, iOS 15.0, *)
actor PathLock {
private var readers: Int = 0
private var writers: Int = 0 // Should be 0 or 1
private var writeWaiters: [CheckedContinuation<Void, Never>] = []
private var readWaiters: [CheckedContinuation<Void, Never>] = []

func lockRead() async {
if writers > 0 || !writeWaiters.isEmpty {
// Writer has priority or active
await withCheckedContinuation { continuation in
readWaiters.append(continuation)
}
} else {
readers += 1
}
}

func unlockRead() {
readers -= 1
if readers == 0 {
// If no more readers, wake one writer if any
if !writeWaiters.isEmpty {
let writer = writeWaiters.removeFirst()
writers = 1 // Pass ownership
writer.resume()
}
}
}

func lockWrite() async {
if readers > 0 || writers > 0 {
await withCheckedContinuation { continuation in
writeWaiters.append(continuation)
}
} else {
writers = 1
}
}

func unlockWrite() {
writers = 0
// Prefer writers? Or strict FIFO?
// Simple implementation: Wake one writer if present, else wake all readers.

if !writeWaiters.isEmpty {
let nextWriter = writeWaiters.removeFirst()
writers = 1
nextWriter.resume()
} else {
// Wake ALL readers
let currentReaders = readWaiters
readWaiters.removeAll()
readers += currentReaders.count
for reader in currentReaders {
reader.resume()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import XCTest
@testable import DesignAlgorithmsKit

@available(macOS 12.0, iOS 15.0, *)
final class TaskOrchestratorTests: XCTestCase {

func testLinearDependency() async throws {
let orchestrator = TaskOrchestrator()
let result = ResultCollector()

let taskA = DAGTask(id: "A") {
await result.append("A")
}

// B depends on A
let taskB = DAGTask(id: "B", dependencies: ["A"]) {
await result.append("B")
}

// C depends on B
let taskC = DAGTask(id: "C", dependencies: ["B"]) {
await result.append("C")
}

await orchestrator.addTask(taskA)
await orchestrator.addTask(taskB)
await orchestrator.addTask(taskC)

try await orchestrator.execute()

let order = await result.items
XCTAssertEqual(order, ["A", "B", "C"])
}

func testParallelExecution() async throws {
let orchestrator = TaskOrchestrator()
let result = ResultCollector()

// A and B independent. C depends on both.
let taskA = DAGTask(id: "A") {
try? await Task.sleep(nanoseconds: 10_000_000)
await result.append("A")
}

let taskB = DAGTask(id: "B") {
try? await Task.sleep(nanoseconds: 10_000_000)
await result.append("B")
}

let taskC = DAGTask(id: "C", dependencies: ["A", "B"]) {
await result.append("C")
}

await orchestrator.addTask(taskA)
await orchestrator.addTask(taskB)
await orchestrator.addTask(taskC)

try await orchestrator.execute()

let order = await result.items
// A and B can be in any order, but both must be before C
XCTAssertTrue(order.contains("A"))
XCTAssertTrue(order.contains("B"))
XCTAssertEqual(order.last, "C")
XCTAssertEqual(order.count, 3)
}

func testCycleDetection() async {
let orchestrator = TaskOrchestrator()

let taskA = DAGTask(id: "A", dependencies: ["B"]) { }
let taskB = DAGTask(id: "B", dependencies: ["A"]) { }

await orchestrator.addTask(taskA)
await orchestrator.addTask(taskB)

do {
try await orchestrator.execute()
XCTFail("Should have thrown cycle detected error")
} catch OrchestrationError.cycleDetected {
// Success
} catch {
XCTFail("Wrong error: \(error)")
}
}

// MARK: - Helpers
actor ResultCollector {
var items: [String] = []
func append(_ item: String) { items.append(item) }
}
}
Loading