Skip to content

Commit f19b2bc

Browse files
author
Ruben Nine
committed
Improving reliability of MultipartUpload abort logic (addresses #18.)
1 parent 11570f6 commit f19b2bc

File tree

2 files changed

+46
-36
lines changed

2 files changed

+46
-36
lines changed

FilestackSDK/Internal/Operations/BaseOperation.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ class BaseOperation: Operation {
3838
}
3939
}
4040

41+
override func cancel() {
42+
if isExecuting {
43+
state = .finished
44+
}
45+
}
46+
4147
override var isReady: Bool {
4248
return state == .ready
4349
}

FilestackSDK/Internal/Uploaders/MultipartUpload.swift

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,16 @@ class MultipartUpload: Uploader {
5757
private let security: Security?
5858
private let uploadQueue: DispatchQueue = DispatchQueue(label: "com.filestack.upload-queue")
5959
private let maxRetries = 5
60-
private let uploadOperationUnderlyingQueue = DispatchQueue(label: "com.filestack.upload-operation-queue",
61-
qos: .utility,
62-
attributes: .concurrent)
63-
private let uploadOperationQueue = OperationQueue()
60+
61+
private let masterOperationUnderlyingQueue = DispatchQueue(label: "com.filestack.master-upload-operation-queue",
62+
qos: .utility)
63+
64+
private let partOperationUnderlyingQueue = DispatchQueue(label: "com.filestack.part-upload-operation-queue",
65+
qos: .utility,
66+
attributes: .concurrent)
67+
68+
private let masterOperationQueue = OperationQueue()
69+
private let partOperationQueue = OperationQueue()
6470

6571
// MARK: - Lifecycle Functions
6672

@@ -77,8 +83,10 @@ class MultipartUpload: Uploader {
7783
self.shouldAbort = false
7884
self.masterProgress.totalUnitCount = Int64(uploadable.size ?? 0)
7985

80-
uploadOperationQueue.underlyingQueue = uploadOperationUnderlyingQueue
81-
uploadOperationQueue.maxConcurrentOperationCount = options.partUploadConcurrency
86+
masterOperationQueue.underlyingQueue = masterOperationUnderlyingQueue
87+
masterOperationQueue.maxConcurrentOperationCount = 1
88+
partOperationQueue.underlyingQueue = partOperationUnderlyingQueue
89+
partOperationQueue.maxConcurrentOperationCount = options.partUploadConcurrency
8290
}
8391

8492
// MARK: - Uploadable Protocol Implementation
@@ -103,7 +111,8 @@ class MultipartUpload: Uploader {
103111

104112
uploadQueue.sync {
105113
shouldAbort = true
106-
uploadOperationQueue.cancelAllOperations()
114+
partOperationQueue.cancelAllOperations()
115+
masterOperationQueue.cancelAllOperations()
107116
currentStatus = .cancelled
108117
}
109118

@@ -165,10 +174,10 @@ private extension MultipartUpload {
165174
fail(with: MultipartUploadError.aborted)
166175
return
167176
} else {
168-
uploadOperationQueue.addOperation(startOperation)
177+
masterOperationQueue.addOperation(startOperation)
169178
}
170179

171-
uploadOperationQueue.waitUntilAllOperationsAreFinished()
180+
masterOperationQueue.waitUntilAllOperationsAreFinished()
172181

173182
// Ensure that there's a response and JSON payload or fail.
174183
guard let response = startOperation.response, let json = response.json else {
@@ -205,24 +214,6 @@ private extension MultipartUpload {
205214
var partsAndEtags: [Int: String] = [:]
206215

207216
let chunkSize = (canUseIntelligentIngestion ? ChunkSize.ii : ChunkSize.regular).rawValue
208-
let beforeCompleteCheckPointOperation = BlockOperation()
209-
210-
beforeCompleteCheckPointOperation.completionBlock = {
211-
if self.shouldAbort {
212-
self.fail(with: MultipartUploadError.aborted)
213-
return
214-
} else {
215-
self.addCompleteOperation(fileName: fileName,
216-
fileSize: fileSize,
217-
mimeType: mimeType,
218-
uri: uri,
219-
region: region,
220-
uploadID: uploadID,
221-
partsAndEtags: partsAndEtags,
222-
usingIntelligentIngestion: canUseIntelligentIngestion,
223-
retriesLeft: self.maxRetries)
224-
}
225-
}
226217

227218
// Submit all parts
228219
while !shouldAbort, seekPoint < fileSize {
@@ -262,21 +253,34 @@ private extension MultipartUpload {
262253
}
263254

264255
if self.shouldAbort {
265-
self.uploadOperationQueue.cancelAllOperations()
256+
self.partOperationQueue.cancelAllOperations()
266257
}
267258
}
268259

269260
checkpointOperation.addDependency(partOperation)
270-
uploadOperationQueue.addOperation(partOperation)
271-
uploadOperationQueue.addOperation(checkpointOperation)
272-
273-
beforeCompleteCheckPointOperation.addDependency(partOperation)
274-
beforeCompleteCheckPointOperation.addDependency(checkpointOperation)
261+
partOperationQueue.addOperation(partOperation)
262+
partOperationQueue.addOperation(checkpointOperation)
275263

276264
seekPoint += UInt64(chunkSize)
277265
}
278266

279-
uploadOperationQueue.addOperation(beforeCompleteCheckPointOperation)
267+
masterOperationQueue.addOperation {
268+
self.partOperationQueue.waitUntilAllOperationsAreFinished()
269+
270+
if self.shouldAbort {
271+
self.fail(with: MultipartUploadError.aborted)
272+
} else {
273+
self.addCompleteOperation(fileName: fileName,
274+
fileSize: fileSize,
275+
mimeType: mimeType,
276+
uri: uri,
277+
region: region,
278+
uploadID: uploadID,
279+
partsAndEtags: partsAndEtags,
280+
usingIntelligentIngestion: canUseIntelligentIngestion,
281+
retriesLeft: self.maxRetries)
282+
}
283+
}
280284
}
281285

282286
func uploadSubmitPartOperation(usingIntelligentIngestion: Bool,
@@ -386,8 +390,8 @@ private extension MultipartUpload {
386390
}
387391

388392
checkpointOperation.addDependency(completeOperation)
389-
uploadOperationQueue.addOperation(completeOperation)
390-
uploadOperationQueue.addOperation(checkpointOperation)
393+
masterOperationQueue.addOperation(completeOperation)
394+
masterOperationQueue.addOperation(checkpointOperation)
391395
}
392396
}
393397

0 commit comments

Comments
 (0)