From d1438aa02d712908fdbdc73555e271b6da8577dd Mon Sep 17 00:00:00 2001 From: Paul Fox Date: Wed, 7 Jun 2023 12:09:49 +1000 Subject: [PATCH 1/3] Added additional information to help with debugging tasks in worker pools --- Include/udWorkerPool.h | 4 +- Source/udWorkerPool.cpp | 117 ++++++++++++++++++++++++++++------------ 2 files changed, 84 insertions(+), 37 deletions(-) diff --git a/Include/udWorkerPool.h b/Include/udWorkerPool.h index d148a45b..78ee3b29 100644 --- a/Include/udWorkerPool.h +++ b/Include/udWorkerPool.h @@ -19,7 +19,7 @@ udResult udWorkerPool_Create(udWorkerPool **ppPool, uint8_t totalThreads, const void udWorkerPool_Destroy(udWorkerPool **ppPool); // Adds a function to run on a background thread, optionally with userdata. If clearMemory is true, it will call udFree on pUserData after running -udResult udWorkerPool_AddTask(udWorkerPool *pPool, udWorkerPoolCallback func, void *pUserData = nullptr, bool clearMemory = true, udWorkerPoolCallback postFunction = nullptr); +udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData = nullptr, bool clearMemory = true, udWorkerPoolCallback postFunction = nullptr); // This must be run on the main thread, handles marshalling work back from worker threads if required // The parameter can be used to limit how much work is done each time this is called @@ -27,6 +27,6 @@ udResult udWorkerPool_AddTask(udWorkerPool *pPool, udWorkerPoolCallback func, vo udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit = 0); // Returns true if there are workers currently processing tasks or if workers should be processing tasks -bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads = nullptr, size_t *pQueuedTasks = nullptr); +bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads = nullptr, size_t *pQueuedWTTasks = nullptr, size_t *pQueuedMTTasks = nullptr); #endif // udWorkerPool_h__ diff --git a/Source/udWorkerPool.cpp b/Source/udWorkerPool.cpp index e9e7b7d7..f1ec4c35 100644 --- a/Source/udWorkerPool.cpp +++ b/Source/udWorkerPool.cpp @@ -9,24 +9,31 @@ #include #include -struct udWorkerPoolThread -{ - udWorkerPool *pPool; - udThread *pThread; -}; - struct udWorkerPoolTask { + const char *pTaskName; + double startTime; + udWorkerPoolCallback function; udWorkerPoolCallback postFunction; // runs on main thread + void *pDataBlock; bool freeDataBlock; }; +struct udWorkerPoolThread +{ + udWorkerPool *pPool; + udThread *pThread; + udWorkerPoolTask currentTask; // Used mostly for debugging +}; + struct udWorkerPool { - udSafeDeque *pQueuedTasks; - udSafeDeque *pQueuedPostTasks; + udRWLock *pRWLock; + + udChunkedArray queuedTasks; + udChunkedArray queuedPostTasks; udSemaphore *pSemaphore; std::atomic activeThreads; @@ -46,7 +53,6 @@ uint32_t udWorkerPool_DoWork(void *pPoolPtr) udWorkerPoolThread *pThreadData = (udWorkerPoolThread*)pPoolPtr; udWorkerPool *pPool = pThreadData->pPool; - udWorkerPoolTask currentTask; int waitValue; while (pPool->isRunning) @@ -58,19 +64,32 @@ uint32_t udWorkerPool_DoWork(void *pPoolPtr) ++pPool->activeThreads; - if (udSafeDeque_PopFront(pPool->pQueuedTasks, ¤tTask) != udR_Success) + udWriteLockRWLock(pPool->pRWLock); + bool poppedOK = pPool->queuedTasks.PopFront(&pThreadData->currentTask); + udWriteUnlockRWLock(pPool->pRWLock); + + if (!poppedOK) { --pPool->activeThreads; continue; } - if (currentTask.function) - currentTask.function(currentTask.pDataBlock); + if (pThreadData->currentTask.function) + pThreadData->currentTask.function(pThreadData->currentTask.pDataBlock); - if (currentTask.postFunction) - udSafeDeque_PushBack(pPool->pQueuedPostTasks, currentTask); - else if (currentTask.freeDataBlock) - udFree(currentTask.pDataBlock); + if (pThreadData->currentTask.postFunction) + { + udWriteLockRWLock(pPool->pRWLock); + pPool->queuedPostTasks.PushBack(pThreadData->currentTask); + udWriteUnlockRWLock(pPool->pRWLock); + } + else + { + if (pThreadData->currentTask.freeDataBlock) + udFree(pThreadData->currentTask.pDataBlock); + + udFree(pThreadData->currentTask.pTaskName); + } --pPool->activeThreads; } @@ -91,11 +110,12 @@ udResult udWorkerPool_Create(udWorkerPool **ppPool, uint8_t totalThreads, const pPool = udAllocType(udWorkerPool, 1, udAF_Zero); UD_ERROR_NULL(pPool, udR_MemoryAllocationFailure); + pPool->pRWLock = udCreateRWLock(); pPool->pSemaphore = udCreateSemaphore(); UD_ERROR_NULL(pPool, udR_MemoryAllocationFailure); - UD_ERROR_CHECK(udSafeDeque_Create(&pPool->pQueuedTasks, 32)); - UD_ERROR_CHECK(udSafeDeque_Create(&pPool->pQueuedPostTasks, 32)); + UD_ERROR_CHECK(pPool->queuedTasks.Init(32)); + UD_ERROR_CHECK(pPool->queuedPostTasks.Init(32)); pPool->isRunning = true; pPool->totalThreads = totalThreads; @@ -136,38 +156,44 @@ void udWorkerPool_Destroy(udWorkerPool **ppPool) udThread_Destroy(&pPool->pThreadData[i].pThread); } + udWriteLockRWLock(pPool->pRWLock); + udWorkerPoolTask currentTask; - while (udSafeDeque_PopFront(pPool->pQueuedTasks, ¤tTask) == udR_Success) + while (pPool->queuedTasks.PopFront(¤tTask)) { if (currentTask.freeDataBlock) udFree(currentTask.pDataBlock); + udFree(currentTask.pTaskName); } - while (udSafeDeque_PopFront(pPool->pQueuedPostTasks, ¤tTask) == udR_Success) + while (pPool->queuedPostTasks.PopFront(¤tTask)) { if (currentTask.freeDataBlock) udFree(currentTask.pDataBlock); + udFree(currentTask.pTaskName); } - udSafeDeque_Destroy(&pPool->pQueuedTasks); - udSafeDeque_Destroy(&pPool->pQueuedPostTasks); + pPool->queuedTasks.Deinit(); + pPool->queuedPostTasks.Deinit(); udDestroySemaphore(&pPool->pSemaphore); + udWriteUnlockRWLock(pPool->pRWLock); + udDestroyRWLock(&pPool->pRWLock); + udFree(pPool->pThreadData); udFree(pPool); } // ---------------------------------------------------------------------------- // Author: Paul Fox, May 2015 -udResult udWorkerPool_AddTask(udWorkerPool *pPool, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/) +udResult udWorkerPool_AddNamedTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/) { udResult result = udR_Failure; udWorkerPoolTask tempTask; UD_ERROR_NULL(pPool, udR_InvalidParameter); - UD_ERROR_NULL(pPool->pQueuedTasks, udR_NotInitialized); - UD_ERROR_NULL(pPool->pQueuedPostTasks, udR_NotInitialized); UD_ERROR_NULL(pPool->pSemaphore, udR_NotInitialized); + UD_ERROR_NULL(pPool->pRWLock, udR_NotInitialized); UD_ERROR_IF(!pPool->isRunning, udR_NotAllowed); tempTask.function = func; @@ -175,10 +201,17 @@ udResult udWorkerPool_AddTask(udWorkerPool *pPool, udWorkerPoolCallback func, vo tempTask.pDataBlock = pUserData; tempTask.freeDataBlock = clearMemory; + + tempTask.pTaskName = udStrdup(pTaskName); + tempTask.startTime = udGetEpochSecsUTCf(); + + udWriteLockRWLock(pPool->pRWLock); if (func == nullptr && postFunction != nullptr) UD_ERROR_CHECK(udSafeDeque_PushBack(pPool->pQueuedPostTasks, tempTask)); else UD_ERROR_CHECK(udSafeDeque_PushBack(pPool->pQueuedTasks, tempTask)); + udWriteUnlockRWLock(pPool->pRWLock); + udIncrementSemaphore(pPool->pSemaphore); @@ -195,20 +228,30 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit /*= 0*/) udWorkerPoolTask currentTask; udResult result = udR_Success; int processedItems = 0; + bool popSuccess = false; UD_ERROR_NULL(pPool, udR_InvalidParameter); - UD_ERROR_NULL(pPool->pQueuedTasks, udR_NotInitialized); - UD_ERROR_NULL(pPool->pQueuedPostTasks, udR_NotInitialized); + UD_ERROR_NULL(pPool->pRWLock, udR_NotInitialized); UD_ERROR_NULL(pPool->pSemaphore, udR_NotInitialized); UD_ERROR_IF(!pPool->isRunning, udR_NotAllowed); - while (udSafeDeque_PopFront(pPool->pQueuedPostTasks, ¤tTask) == udR_Success) + + while (true) { + udWriteLockRWLock(pPool->pRWLock); + popSuccess = pPool->queuedPostTasks.PopFront(¤tTask); + udWriteUnlockRWLock(pPool->pRWLock); + + if (!popSuccess) + break; + currentTask.postFunction(currentTask.pDataBlock); if (currentTask.freeDataBlock) udFree(currentTask.pDataBlock); + udFree(currentTask.pTaskName); + if (++processedItems == processLimit) break; } @@ -222,21 +265,25 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit /*= 0*/) // ---------------------------------------------------------------------------- // Author: Paul Fox, May 2015 -bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads /*= nullptr*/, size_t *pQueuedTasks /*= nullptr*/) +bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads /*= nullptr*/, size_t *pQueuedWTTasks /*= nullptr*/, size_t *pQueuedMTTasks /*= nullptr*/) { if (pPool == nullptr) return false; - udLockMutex(pPool->pQueuedTasks->pMutex); + udReadLockRWLock(pPool->pRWLock); int32_t activeThreads = pPool->activeThreads; - size_t queuedTasks = pPool->pQueuedTasks->chunkedArray.length; - udReleaseMutex(pPool->pQueuedTasks->pMutex); + size_t queuedWTTasks = pPool->queuedTasks.length; + size_t queuedMTTasks = pPool->queuedPostTasks.length; + udReadUnlockRWLock(pPool->pRWLock); if (pActiveThreads) *pActiveThreads = (size_t)std::max(activeThreads, 0); - if (pQueuedTasks) - *pQueuedTasks = queuedTasks; + if (pQueuedWTTasks) + *pQueuedWTTasks = queuedWTTasks; + + if (pQueuedMTTasks) + *pQueuedMTTasks = queuedMTTasks; - return (activeThreads > 0 || queuedTasks > 0); + return (activeThreads > 0 || queuedWTTasks > 0 || queuedMTTasks > 0); } From d2647d5ccf316946cbea68fdf05c65a1e6218995 Mon Sep 17 00:00:00 2001 From: Paul Fox Date: Wed, 7 Jun 2023 13:31:32 +1000 Subject: [PATCH 2/3] Added worker pool item iterator --- Include/udWorkerPool.h | 2 ++ Source/udWorkerPool.cpp | 21 ++++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/Include/udWorkerPool.h b/Include/udWorkerPool.h index 78ee3b29..83e5593c 100644 --- a/Include/udWorkerPool.h +++ b/Include/udWorkerPool.h @@ -29,4 +29,6 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit = 0); // Returns true if there are workers currently processing tasks or if workers should be processing tasks bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads = nullptr, size_t *pQueuedWTTasks = nullptr, size_t *pQueuedMTTasks = nullptr); +void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback callback); + #endif // udWorkerPool_h__ diff --git a/Source/udWorkerPool.cpp b/Source/udWorkerPool.cpp index f1ec4c35..1ac64de3 100644 --- a/Source/udWorkerPool.cpp +++ b/Source/udWorkerPool.cpp @@ -92,6 +92,8 @@ uint32_t udWorkerPool_DoWork(void *pPoolPtr) } --pPool->activeThreads; + pThreadData->currentTask.pTaskName = nullptr; + pThreadData->currentTask.startTime = udGetEpochSecsUTCf(); } return 0; @@ -186,7 +188,7 @@ void udWorkerPool_Destroy(udWorkerPool **ppPool) // ---------------------------------------------------------------------------- // Author: Paul Fox, May 2015 -udResult udWorkerPool_AddNamedTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/) +udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/) { udResult result = udR_Failure; udWorkerPoolTask tempTask; @@ -287,3 +289,20 @@ bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads / return (activeThreads > 0 || queuedWTTasks > 0 || queuedMTTasks > 0); } + +void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback callback) +{ + udReadLockRWLock(pPool->pRWLock); + + for (int i = 0; i < pPool->totalThreads; ++i) + { + callback(pPool->pThreadData[i].currentTask.pTaskName, pPool->pThreadData[i].currentTask.startTime, true); + } + + for (const auto &item : pPool->queuedTasks) + { + callback(item.pTaskName, item.startTime, false); + } + + udReadUnlockRWLock(pPool->pRWLock); +} From a8103a462abe67b527ddf658ad13946294a5c1b2 Mon Sep 17 00:00:00 2001 From: Paul Fox Date: Wed, 7 Jun 2023 15:27:46 +1000 Subject: [PATCH 3/3] Added ability to bump a worker pool job to top of the queue and cancel jobs in queue --- Include/udWorkerPool.h | 7 ++- Source/udWorkerPool.cpp | 115 ++++++++++++++++++++++++++++++++-------- 2 files changed, 98 insertions(+), 24 deletions(-) diff --git a/Include/udWorkerPool.h b/Include/udWorkerPool.h index 83e5593c..61243845 100644 --- a/Include/udWorkerPool.h +++ b/Include/udWorkerPool.h @@ -19,7 +19,7 @@ udResult udWorkerPool_Create(udWorkerPool **ppPool, uint8_t totalThreads, const void udWorkerPool_Destroy(udWorkerPool **ppPool); // Adds a function to run on a background thread, optionally with userdata. If clearMemory is true, it will call udFree on pUserData after running -udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData = nullptr, bool clearMemory = true, udWorkerPoolCallback postFunction = nullptr); +udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData = nullptr, bool clearMemory = true, udWorkerPoolCallback postFunction = nullptr, int32_t *pJobID = nullptr); // This must be run on the main thread, handles marshalling work back from worker threads if required // The parameter can be used to limit how much work is done each time this is called @@ -29,6 +29,9 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit = 0); // Returns true if there are workers currently processing tasks or if workers should be processing tasks bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads = nullptr, size_t *pQueuedWTTasks = nullptr, size_t *pQueuedMTTasks = nullptr); -void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback callback); +void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback callback); + +udResult udWorkerPool_TryCancelJob(udWorkerPool *pPool, int32_t jobID); // Cancel the job if its in the queue otherwise do nothing +udResult udWorkerPool_BumpJob(udWorkerPool *pPool, int32_t jobID); // Put the job at the start of the queue #endif // udWorkerPool_h__ diff --git a/Source/udWorkerPool.cpp b/Source/udWorkerPool.cpp index 1ac64de3..ee78d3c6 100644 --- a/Source/udWorkerPool.cpp +++ b/Source/udWorkerPool.cpp @@ -11,6 +11,7 @@ struct udWorkerPoolTask { + int32_t jobID; const char *pTaskName; double startTime; @@ -42,8 +43,17 @@ struct udWorkerPool udWorkerPoolThread *pThreadData; std::atomic isRunning; + std::atomic nextJobID; }; +void udWorkerPool_CleanupTask(udWorkerPoolTask *pTask) +{ + if (pTask->freeDataBlock) + udFree(pTask->pDataBlock); + + udFree(pTask->pTaskName); +} + // ---------------------------------------------------------------------------- // Author: Paul Fox, May 2015 uint32_t udWorkerPool_DoWork(void *pPoolPtr) @@ -85,15 +95,13 @@ uint32_t udWorkerPool_DoWork(void *pPoolPtr) } else { - if (pThreadData->currentTask.freeDataBlock) - udFree(pThreadData->currentTask.pDataBlock); - - udFree(pThreadData->currentTask.pTaskName); + udWorkerPool_CleanupTask(&pThreadData->currentTask); } --pPool->activeThreads; pThreadData->currentTask.pTaskName = nullptr; pThreadData->currentTask.startTime = udGetEpochSecsUTCf(); + pThreadData->currentTask.jobID = 0; } return 0; @@ -163,16 +171,12 @@ void udWorkerPool_Destroy(udWorkerPool **ppPool) udWorkerPoolTask currentTask; while (pPool->queuedTasks.PopFront(¤tTask)) { - if (currentTask.freeDataBlock) - udFree(currentTask.pDataBlock); - udFree(currentTask.pTaskName); + udWorkerPool_CleanupTask(¤tTask); } while (pPool->queuedPostTasks.PopFront(¤tTask)) { - if (currentTask.freeDataBlock) - udFree(currentTask.pDataBlock); - udFree(currentTask.pTaskName); + udWorkerPool_CleanupTask(¤tTask); } pPool->queuedTasks.Deinit(); @@ -188,8 +192,11 @@ void udWorkerPool_Destroy(udWorkerPool **ppPool) // ---------------------------------------------------------------------------- // Author: Paul Fox, May 2015 -udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/) +udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/, int32_t *pJobID /*= nullptr*/) { + if (func == nullptr && postFunction == nullptr) + return udR_NothingToDo; + udResult result = udR_Failure; udWorkerPoolTask tempTask; @@ -206,12 +213,16 @@ udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWork tempTask.pTaskName = udStrdup(pTaskName); tempTask.startTime = udGetEpochSecsUTCf(); + tempTask.jobID = (++pPool->nextJobID); + + if (pJobID != nullptr) + *pJobID = tempTask.jobID; udWriteLockRWLock(pPool->pRWLock); - if (func == nullptr && postFunction != nullptr) - UD_ERROR_CHECK(udSafeDeque_PushBack(pPool->pQueuedPostTasks, tempTask)); + if (func != nullptr) + UD_ERROR_CHECK(pPool->queuedTasks.PushBack(tempTask)); else - UD_ERROR_CHECK(udSafeDeque_PushBack(pPool->pQueuedTasks, tempTask)); + UD_ERROR_CHECK(pPool->queuedPostTasks.PushBack(tempTask)); udWriteUnlockRWLock(pPool->pRWLock); @@ -237,7 +248,6 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit /*= 0*/) UD_ERROR_NULL(pPool->pSemaphore, udR_NotInitialized); UD_ERROR_IF(!pPool->isRunning, udR_NotAllowed); - while (true) { udWriteLockRWLock(pPool->pRWLock); @@ -249,10 +259,7 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit /*= 0*/) currentTask.postFunction(currentTask.pDataBlock); - if (currentTask.freeDataBlock) - udFree(currentTask.pDataBlock); - - udFree(currentTask.pTaskName); + udWorkerPool_CleanupTask(¤tTask); if (++processedItems == processLimit) break; @@ -290,19 +297,83 @@ bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads / return (activeThreads > 0 || queuedWTTasks > 0 || queuedMTTasks > 0); } -void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback callback) +void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback callback) { udReadLockRWLock(pPool->pRWLock); for (int i = 0; i < pPool->totalThreads; ++i) { - callback(pPool->pThreadData[i].currentTask.pTaskName, pPool->pThreadData[i].currentTask.startTime, true); + callback(pPool->pThreadData[i].currentTask.pTaskName, pPool->pThreadData[i].currentTask.startTime, true, pPool->pThreadData[i].currentTask.jobID); } for (const auto &item : pPool->queuedTasks) { - callback(item.pTaskName, item.startTime, false); + callback(item.pTaskName, item.startTime, false, item.jobID); } udReadUnlockRWLock(pPool->pRWLock); } + +udResult udWorkerPool_TryCancelJob(udWorkerPool *pPool, int32_t jobID) +{ + udResult result = udR_Failure; + + udWriteLockRWLock(pPool->pRWLock); + + for (int i = 0; i < pPool->totalThreads; ++i) + { + UD_ERROR_IF(pPool->pThreadData[i].currentTask.jobID == jobID, udR_InProgress); + } + + for (size_t i = 0; i < pPool->queuedTasks.length; ++i) + { + if (pPool->queuedTasks[i].jobID == jobID) + { + udWorkerPool_CleanupTask(&pPool->queuedTasks[i]); + pPool->queuedTasks.RemoveAt(i); + result = udR_Success; + break; + } + } + +epilogue: + udWriteUnlockRWLock(pPool->pRWLock); + + return result; +} + +udResult udWorkerPool_BumpJob(udWorkerPool *pPool, int32_t jobID) +{ + udResult result = udR_Failure; + + bool foundTask = false; + udWorkerPoolTask currentTask = {}; + + udWriteLockRWLock(pPool->pRWLock); + + for (int i = 0; i < pPool->totalThreads; ++i) + { + UD_ERROR_IF(pPool->pThreadData[i].currentTask.jobID == jobID, udR_InProgress); + } + + for (size_t i = 0; i < pPool->queuedTasks.length; ++i) + { + if (pPool->queuedTasks[i].jobID == jobID) + { + currentTask = pPool->queuedTasks[i]; + pPool->queuedTasks.RemoveAt(i); + foundTask = true; + break; + } + } + + if (foundTask) + { + pPool->queuedTasks.PushFront(currentTask); + } + +epilogue: + udWriteUnlockRWLock(pPool->pRWLock); + + return result; +}