Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Include/udWorkerPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ udResult udWorkerPool_Create(udWorkerPool **ppPool, uint8_t totalThreads, const
void udWorkerPool_Destroy(udWorkerPool **ppPool);

// Adds a function to run on a background thread, optionally with userdata. If clearMemory is true, it will call udFree on pUserData after running
udResult udWorkerPool_AddTask(udWorkerPool *pPool, udWorkerPoolCallback func, void *pUserData = nullptr, bool clearMemory = true, udWorkerPoolCallback postFunction = nullptr);
udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData = nullptr, bool clearMemory = true, udWorkerPoolCallback postFunction = nullptr, int32_t *pJobID = nullptr);

// This must be run on the main thread, handles marshalling work back from worker threads if required
// The parameter can be used to limit how much work is done each time this is called
// Returns udR_NothingToDo if no work was done- otherwise udR_Success
udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit = 0);

// Returns true if there are workers currently processing tasks or if workers should be processing tasks
bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads = nullptr, size_t *pQueuedTasks = nullptr);
bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads = nullptr, size_t *pQueuedWTTasks = nullptr, size_t *pQueuedMTTasks = nullptr);

void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback<void(const char *taskName, double queuedAt, bool isActive, int32_t jobID)> callback);

udResult udWorkerPool_TryCancelJob(udWorkerPool *pPool, int32_t jobID); // Cancel the job if its in the queue otherwise do nothing
udResult udWorkerPool_BumpJob(udWorkerPool *pPool, int32_t jobID); // Put the job at the start of the queue

#endif // udWorkerPool_h__
225 changes: 181 additions & 44 deletions Source/udWorkerPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,32 @@
#include <atomic>
#include <algorithm>

struct udWorkerPoolThread
{
udWorkerPool *pPool;
udThread *pThread;
};

struct udWorkerPoolTask
{
int32_t jobID;
const char *pTaskName;
double startTime;

udWorkerPoolCallback function;
udWorkerPoolCallback postFunction; // runs on main thread

void *pDataBlock;
bool freeDataBlock;
};

struct udWorkerPoolThread
{
udWorkerPool *pPool;
udThread *pThread;
udWorkerPoolTask currentTask; // Used mostly for debugging
};

struct udWorkerPool
{
udSafeDeque<udWorkerPoolTask> *pQueuedTasks;
udSafeDeque<udWorkerPoolTask> *pQueuedPostTasks;
udRWLock *pRWLock;

udChunkedArray<udWorkerPoolTask> queuedTasks;
udChunkedArray<udWorkerPoolTask> queuedPostTasks;

udSemaphore *pSemaphore;
std::atomic<int32_t> activeThreads;
Expand All @@ -35,8 +43,17 @@ struct udWorkerPool
udWorkerPoolThread *pThreadData;

std::atomic<bool> isRunning;
std::atomic<int32_t> nextJobID;
};

void udWorkerPool_CleanupTask(udWorkerPoolTask *pTask)
{
if (pTask->freeDataBlock)
udFree(pTask->pDataBlock);

udFree(pTask->pTaskName);
}

// ----------------------------------------------------------------------------
// Author: Paul Fox, May 2015
uint32_t udWorkerPool_DoWork(void *pPoolPtr)
Expand All @@ -46,7 +63,6 @@ uint32_t udWorkerPool_DoWork(void *pPoolPtr)
udWorkerPoolThread *pThreadData = (udWorkerPoolThread*)pPoolPtr;
udWorkerPool *pPool = pThreadData->pPool;

udWorkerPoolTask currentTask;
int waitValue;

while (pPool->isRunning)
Expand All @@ -58,21 +74,34 @@ uint32_t udWorkerPool_DoWork(void *pPoolPtr)

++pPool->activeThreads;

if (udSafeDeque_PopFront(pPool->pQueuedTasks, &currentTask) != udR_Success)
udWriteLockRWLock(pPool->pRWLock);
bool poppedOK = pPool->queuedTasks.PopFront(&pThreadData->currentTask);
udWriteUnlockRWLock(pPool->pRWLock);

if (!poppedOK)
{
--pPool->activeThreads;
continue;
}

if (currentTask.function)
currentTask.function(currentTask.pDataBlock);
if (pThreadData->currentTask.function)
pThreadData->currentTask.function(pThreadData->currentTask.pDataBlock);

if (currentTask.postFunction)
udSafeDeque_PushBack(pPool->pQueuedPostTasks, currentTask);
else if (currentTask.freeDataBlock)
udFree(currentTask.pDataBlock);
if (pThreadData->currentTask.postFunction)
{
udWriteLockRWLock(pPool->pRWLock);
pPool->queuedPostTasks.PushBack(pThreadData->currentTask);
udWriteUnlockRWLock(pPool->pRWLock);
}
else
{
udWorkerPool_CleanupTask(&pThreadData->currentTask);
}

--pPool->activeThreads;
pThreadData->currentTask.pTaskName = nullptr;
pThreadData->currentTask.startTime = udGetEpochSecsUTCf();
pThreadData->currentTask.jobID = 0;
}

return 0;
Expand All @@ -91,11 +120,12 @@ udResult udWorkerPool_Create(udWorkerPool **ppPool, uint8_t totalThreads, const
pPool = udAllocType(udWorkerPool, 1, udAF_Zero);
UD_ERROR_NULL(pPool, udR_MemoryAllocationFailure);

pPool->pRWLock = udCreateRWLock();
pPool->pSemaphore = udCreateSemaphore();
UD_ERROR_NULL(pPool, udR_MemoryAllocationFailure);

UD_ERROR_CHECK(udSafeDeque_Create(&pPool->pQueuedTasks, 32));
UD_ERROR_CHECK(udSafeDeque_Create(&pPool->pQueuedPostTasks, 32));
UD_ERROR_CHECK(pPool->queuedTasks.Init(32));
UD_ERROR_CHECK(pPool->queuedPostTasks.Init(32));

pPool->isRunning = true;
pPool->totalThreads = totalThreads;
Expand Down Expand Up @@ -136,49 +166,65 @@ void udWorkerPool_Destroy(udWorkerPool **ppPool)
udThread_Destroy(&pPool->pThreadData[i].pThread);
}

udWriteLockRWLock(pPool->pRWLock);

udWorkerPoolTask currentTask;
while (udSafeDeque_PopFront(pPool->pQueuedTasks, &currentTask) == udR_Success)
while (pPool->queuedTasks.PopFront(&currentTask))
{
if (currentTask.freeDataBlock)
udFree(currentTask.pDataBlock);
udWorkerPool_CleanupTask(&currentTask);
}

while (udSafeDeque_PopFront(pPool->pQueuedPostTasks, &currentTask) == udR_Success)
while (pPool->queuedPostTasks.PopFront(&currentTask))
{
if (currentTask.freeDataBlock)
udFree(currentTask.pDataBlock);
udWorkerPool_CleanupTask(&currentTask);
}

udSafeDeque_Destroy(&pPool->pQueuedTasks);
udSafeDeque_Destroy(&pPool->pQueuedPostTasks);
pPool->queuedTasks.Deinit();
pPool->queuedPostTasks.Deinit();
udDestroySemaphore(&pPool->pSemaphore);

udWriteUnlockRWLock(pPool->pRWLock);
udDestroyRWLock(&pPool->pRWLock);

udFree(pPool->pThreadData);
udFree(pPool);
}

// ----------------------------------------------------------------------------
// Author: Paul Fox, May 2015
udResult udWorkerPool_AddTask(udWorkerPool *pPool, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/)
udResult udWorkerPool_AddTask(udWorkerPool *pPool, const char *pTaskName, udWorkerPoolCallback func, void *pUserData /*= nullptr*/, bool clearMemory /*= true*/, udWorkerPoolCallback postFunction /*= nullptr*/, int32_t *pJobID /*= nullptr*/)
{
if (func == nullptr && postFunction == nullptr)
return udR_NothingToDo;

udResult result = udR_Failure;
udWorkerPoolTask tempTask;

UD_ERROR_NULL(pPool, udR_InvalidParameter);
UD_ERROR_NULL(pPool->pQueuedTasks, udR_NotInitialized);
UD_ERROR_NULL(pPool->pQueuedPostTasks, udR_NotInitialized);
UD_ERROR_NULL(pPool->pSemaphore, udR_NotInitialized);
UD_ERROR_NULL(pPool->pRWLock, udR_NotInitialized);
UD_ERROR_IF(!pPool->isRunning, udR_NotAllowed);

tempTask.function = func;
tempTask.postFunction = postFunction;
tempTask.pDataBlock = pUserData;
tempTask.freeDataBlock = clearMemory;

if (func == nullptr && postFunction != nullptr)
UD_ERROR_CHECK(udSafeDeque_PushBack(pPool->pQueuedPostTasks, tempTask));

tempTask.pTaskName = udStrdup(pTaskName);
tempTask.startTime = udGetEpochSecsUTCf();
tempTask.jobID = (++pPool->nextJobID);

if (pJobID != nullptr)
*pJobID = tempTask.jobID;

udWriteLockRWLock(pPool->pRWLock);
if (func != nullptr)
UD_ERROR_CHECK(pPool->queuedTasks.PushBack(tempTask));
else
UD_ERROR_CHECK(udSafeDeque_PushBack(pPool->pQueuedTasks, tempTask));
UD_ERROR_CHECK(pPool->queuedPostTasks.PushBack(tempTask));
udWriteUnlockRWLock(pPool->pRWLock);


udIncrementSemaphore(pPool->pSemaphore);

Expand All @@ -195,19 +241,25 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit /*= 0*/)
udWorkerPoolTask currentTask;
udResult result = udR_Success;
int processedItems = 0;
bool popSuccess = false;

UD_ERROR_NULL(pPool, udR_InvalidParameter);
UD_ERROR_NULL(pPool->pQueuedTasks, udR_NotInitialized);
UD_ERROR_NULL(pPool->pQueuedPostTasks, udR_NotInitialized);
UD_ERROR_NULL(pPool->pRWLock, udR_NotInitialized);
UD_ERROR_NULL(pPool->pSemaphore, udR_NotInitialized);
UD_ERROR_IF(!pPool->isRunning, udR_NotAllowed);

while (udSafeDeque_PopFront(pPool->pQueuedPostTasks, &currentTask) == udR_Success)
while (true)
{
udWriteLockRWLock(pPool->pRWLock);
popSuccess = pPool->queuedPostTasks.PopFront(&currentTask);
udWriteUnlockRWLock(pPool->pRWLock);

if (!popSuccess)
break;

currentTask.postFunction(currentTask.pDataBlock);

if (currentTask.freeDataBlock)
udFree(currentTask.pDataBlock);
udWorkerPool_CleanupTask(&currentTask);

if (++processedItems == processLimit)
break;
Expand All @@ -222,21 +274,106 @@ udResult udWorkerPool_DoPostWork(udWorkerPool *pPool, int processLimit /*= 0*/)

// ----------------------------------------------------------------------------
// Author: Paul Fox, May 2015
bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads /*= nullptr*/, size_t *pQueuedTasks /*= nullptr*/)
bool udWorkerPool_HasActiveWorkers(udWorkerPool *pPool, size_t *pActiveThreads /*= nullptr*/, size_t *pQueuedWTTasks /*= nullptr*/, size_t *pQueuedMTTasks /*= nullptr*/)
{
if (pPool == nullptr)
return false;

udLockMutex(pPool->pQueuedTasks->pMutex);
udReadLockRWLock(pPool->pRWLock);
int32_t activeThreads = pPool->activeThreads;
size_t queuedTasks = pPool->pQueuedTasks->chunkedArray.length;
udReleaseMutex(pPool->pQueuedTasks->pMutex);
size_t queuedWTTasks = pPool->queuedTasks.length;
size_t queuedMTTasks = pPool->queuedPostTasks.length;
udReadUnlockRWLock(pPool->pRWLock);

if (pActiveThreads)
*pActiveThreads = (size_t)std::max(activeThreads, 0);

if (pQueuedTasks)
*pQueuedTasks = queuedTasks;
if (pQueuedWTTasks)
*pQueuedWTTasks = queuedWTTasks;

if (pQueuedMTTasks)
*pQueuedMTTasks = queuedMTTasks;

return (activeThreads > 0 || queuedWTTasks > 0 || queuedMTTasks > 0);
}

void udWorkerPool_IterateItems(udWorkerPool *pPool, udCallback<void(const char *taskName, double queuedAt, bool isActive, int32_t jobID)> callback)
{
udReadLockRWLock(pPool->pRWLock);

for (int i = 0; i < pPool->totalThreads; ++i)
{
callback(pPool->pThreadData[i].currentTask.pTaskName, pPool->pThreadData[i].currentTask.startTime, true, pPool->pThreadData[i].currentTask.jobID);
}

for (const auto &item : pPool->queuedTasks)
{
callback(item.pTaskName, item.startTime, false, item.jobID);
}

udReadUnlockRWLock(pPool->pRWLock);
}

udResult udWorkerPool_TryCancelJob(udWorkerPool *pPool, int32_t jobID)
{
udResult result = udR_Failure;

udWriteLockRWLock(pPool->pRWLock);

return (activeThreads > 0 || queuedTasks > 0);
for (int i = 0; i < pPool->totalThreads; ++i)
{
UD_ERROR_IF(pPool->pThreadData[i].currentTask.jobID == jobID, udR_InProgress);
}

for (size_t i = 0; i < pPool->queuedTasks.length; ++i)
{
if (pPool->queuedTasks[i].jobID == jobID)
{
udWorkerPool_CleanupTask(&pPool->queuedTasks[i]);
pPool->queuedTasks.RemoveAt(i);
result = udR_Success;
break;
}
}

epilogue:
udWriteUnlockRWLock(pPool->pRWLock);

return result;
}

udResult udWorkerPool_BumpJob(udWorkerPool *pPool, int32_t jobID)
{
udResult result = udR_Failure;

bool foundTask = false;
udWorkerPoolTask currentTask = {};

udWriteLockRWLock(pPool->pRWLock);

for (int i = 0; i < pPool->totalThreads; ++i)
{
UD_ERROR_IF(pPool->pThreadData[i].currentTask.jobID == jobID, udR_InProgress);
}

for (size_t i = 0; i < pPool->queuedTasks.length; ++i)
{
if (pPool->queuedTasks[i].jobID == jobID)
{
currentTask = pPool->queuedTasks[i];
pPool->queuedTasks.RemoveAt(i);
foundTask = true;
break;
}
}

if (foundTask)
{
pPool->queuedTasks.PushFront(currentTask);
}

epilogue:
udWriteUnlockRWLock(pPool->pRWLock);

return result;
}