aboutsummaryrefslogtreecommitdiff
path: root/test/src/utils/TaskDispatcher.h
diff options
context:
space:
mode:
Diffstat (limited to 'test/src/utils/TaskDispatcher.h')
-rwxr-xr-x[-rw-r--r--]test/src/utils/TaskDispatcher.h438
1 files changed, 219 insertions, 219 deletions
diff --git a/test/src/utils/TaskDispatcher.h b/test/src/utils/TaskDispatcher.h
index e44434f..09df3b4 100644..100755
--- a/test/src/utils/TaskDispatcher.h
+++ b/test/src/utils/TaskDispatcher.h
@@ -1,219 +1,219 @@
-// This code contains NVIDIA Confidential Information and is disclosed to you
-// under a form of NVIDIA software license agreement provided separately to you.
-//
-// Notice
-// NVIDIA Corporation and its licensors retain all intellectual property and
-// proprietary rights in and to this software and related documentation and
-// any modifications thereto. Any use, reproduction, disclosure, or
-// distribution of this software and related documentation without an express
-// license agreement from NVIDIA Corporation is strictly prohibited.
-//
-// ALL NVIDIA DESIGN SPECIFICATIONS, CODE ARE PROVIDED "AS IS.". NVIDIA MAKES
-// NO WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
-// THE MATERIALS, AND EXPRESSLY DISCLAIMS ALL IMPLIED WARRANTIES OF NONINFRINGEMENT,
-// MERCHANTABILITY, AND FITNESS FOR A PARTICULAR PURPOSE.
-//
-// Information and code furnished is believed to be accurate and reliable.
-// However, NVIDIA Corporation assumes no responsibility for the consequences of use of such
-// information or for any infringement of patents or other rights of third parties that may
-// result from its use. No license is granted by implication or otherwise under any patent
-// or patent rights of NVIDIA Corporation. Details are subject to change without notice.
-// This code supersedes and replaces all information previously supplied.
-// NVIDIA Corporation products are not authorized for use as critical
-// components in life support devices or systems without express written approval of
-// NVIDIA Corporation.
-//
-// Copyright (c) 2016-2018 NVIDIA Corporation. All rights reserved.
-
-
-#pragma once
-
-#include <thread>
-#include <mutex>
-#include <queue>
-#include <list>
-#include <future>
-#include <condition_variable>
-#include <memory>
-#include <atomic>
-
-class TaskDispatcher
-{
-public:
- class Task
- {
- public:
- virtual void process() = 0;
- virtual ~Task() {};
- };
-
- typedef std::function<void(TaskDispatcher& dispatcher, std::unique_ptr<Task>)> OnTaskFinishedFunction;
-
- TaskDispatcher(uint32_t threadCount, OnTaskFinishedFunction onTaskFinished) :
- m_workingThreadsCount(0), m_onTaskFinished(onTaskFinished)
- {
- m_threads.resize(threadCount);
- for (uint32_t i = 0; i < threadCount; i++)
- {
- m_threads[i] = std::unique_ptr<Thread>(new Thread(i, m_completionSemaphore));
- m_threads[i]->start();
- m_freeThreads.push(m_threads[i].get());
- }
- }
-
- void addTask(std::unique_ptr<Task> task)
- {
- m_tasks.push(std::move(task));
- }
-
- void process()
- {
- // main loop
- while (m_tasks.size() > 0 || m_workingThreadsCount > 0)
- {
- // assign tasks
- while (!(m_tasks.empty() || m_freeThreads.empty()))
- {
- auto task = std::move(m_tasks.front());
- m_tasks.pop();
-
- Thread* freeThread = m_freeThreads.front();
- m_freeThreads.pop();
-
- freeThread->processTask(std::move(task));
- m_workingThreadsCount++;
- }
-
- m_completionSemaphore.wait();
-
- // check for completion
- for (std::unique_ptr<Thread>& thread : m_threads)
- {
- if (thread->isTaskFinished())
- {
- std::unique_ptr<Task> task;
- thread->collectTask(task);
- m_onTaskFinished(*this, std::move(task));
-
- m_freeThreads.push(thread.get());
- m_workingThreadsCount--;
- break;
- }
- }
- }
- }
-
-private:
- class Semaphore
- {
- public:
- Semaphore(int count_ = 0)
- : m_count(count_) {}
-
- inline void notify()
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- m_count++;
- m_cv.notify_one();
- }
-
- inline void wait()
- {
- std::unique_lock<std::mutex> lock(m_mutex);
-
- while (m_count == 0){
- m_cv.wait(lock);
- }
- m_count--;
- }
-
-
- private:
- std::mutex m_mutex;
- std::condition_variable m_cv;
- int m_count;
- };
-
- class Thread
- {
- public:
- Thread(uint32_t id_, Semaphore& completionSemaphore) : m_id(id_), m_completionSemaphore(completionSemaphore), m_running(false), m_taskFinished(false) {}
- virtual ~Thread() { stop(); }
-
- void start()
- {
- if (!m_running)
- {
- m_running = true;
- m_thread = std::thread(&Thread::body, this);
- }
- }
-
- void stop()
- {
- if (m_running)
- {
- m_running = false;
- m_newTaskSemaphore.notify();
- m_thread.join();
- }
- }
-
- void processTask(std::unique_ptr<Task> task)
- {
- m_task = std::move(task);
- m_taskFinished = false;
- m_newTaskSemaphore.notify();
- }
-
- void collectTask(std::unique_ptr<Task>& task)
- {
- task = std::move(m_task);
- m_task = nullptr;
- m_taskFinished = false;
- }
-
- bool hasTask() const { return m_task != nullptr; }
-
- bool isTaskFinished() const { return m_taskFinished; }
-
- private:
- void body()
- {
- while (1)
- {
- m_newTaskSemaphore.wait();
-
- if (!m_running)
- return;
-
- m_task->process();
- m_taskFinished = true;
-
- m_completionSemaphore.notify();
- }
- }
-
- uint32_t m_id;
- Semaphore& m_completionSemaphore;
- std::thread m_thread;
- bool m_running;
-
- std::unique_ptr<Task> m_task;
- std::atomic<bool> m_taskFinished;
-
- Semaphore m_newTaskSemaphore;
- };
-
-private:
- uint32_t m_workingThreadsCount;
-
- std::queue<std::unique_ptr<Task>> m_tasks;
- OnTaskFinishedFunction m_onTaskFinished;
-
- std::vector<std::unique_ptr<Thread>> m_threads;
- std::queue<Thread*> m_freeThreads;
-
- Semaphore m_completionSemaphore;
-};
-
+// This code contains NVIDIA Confidential Information and is disclosed to you
+// under a form of NVIDIA software license agreement provided separately to you.
+//
+// Notice
+// NVIDIA Corporation and its licensors retain all intellectual property and
+// proprietary rights in and to this software and related documentation and
+// any modifications thereto. Any use, reproduction, disclosure, or
+// distribution of this software and related documentation without an express
+// license agreement from NVIDIA Corporation is strictly prohibited.
+//
+// ALL NVIDIA DESIGN SPECIFICATIONS, CODE ARE PROVIDED "AS IS.". NVIDIA MAKES
+// NO WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
+// THE MATERIALS, AND EXPRESSLY DISCLAIMS ALL IMPLIED WARRANTIES OF NONINFRINGEMENT,
+// MERCHANTABILITY, AND FITNESS FOR A PARTICULAR PURPOSE.
+//
+// Information and code furnished is believed to be accurate and reliable.
+// However, NVIDIA Corporation assumes no responsibility for the consequences of use of such
+// information or for any infringement of patents or other rights of third parties that may
+// result from its use. No license is granted by implication or otherwise under any patent
+// or patent rights of NVIDIA Corporation. Details are subject to change without notice.
+// This code supersedes and replaces all information previously supplied.
+// NVIDIA Corporation products are not authorized for use as critical
+// components in life support devices or systems without express written approval of
+// NVIDIA Corporation.
+//
+// Copyright (c) 2016-2018 NVIDIA Corporation. All rights reserved.
+
+
+#pragma once
+
+#include <thread>
+#include <mutex>
+#include <queue>
+#include <list>
+#include <future>
+#include <condition_variable>
+#include <memory>
+#include <atomic>
+
+class TaskDispatcher
+{
+public:
+ class Task
+ {
+ public:
+ virtual void process() = 0;
+ virtual ~Task() {};
+ };
+
+ typedef std::function<void(TaskDispatcher& dispatcher, std::unique_ptr<Task>)> OnTaskFinishedFunction;
+
+ TaskDispatcher(uint32_t threadCount, OnTaskFinishedFunction onTaskFinished) :
+ m_workingThreadsCount(0), m_onTaskFinished(onTaskFinished)
+ {
+ m_threads.resize(threadCount);
+ for (uint32_t i = 0; i < threadCount; i++)
+ {
+ m_threads[i] = std::unique_ptr<Thread>(new Thread(i, m_completionSemaphore));
+ m_threads[i]->start();
+ m_freeThreads.push(m_threads[i].get());
+ }
+ }
+
+ void addTask(std::unique_ptr<Task> task)
+ {
+ m_tasks.push(std::move(task));
+ }
+
+ void process()
+ {
+ // main loop
+ while (m_tasks.size() > 0 || m_workingThreadsCount > 0)
+ {
+ // assign tasks
+ while (!(m_tasks.empty() || m_freeThreads.empty()))
+ {
+ auto task = std::move(m_tasks.front());
+ m_tasks.pop();
+
+ Thread* freeThread = m_freeThreads.front();
+ m_freeThreads.pop();
+
+ freeThread->processTask(std::move(task));
+ m_workingThreadsCount++;
+ }
+
+ m_completionSemaphore.wait();
+
+ // check for completion
+ for (std::unique_ptr<Thread>& thread : m_threads)
+ {
+ if (thread->isTaskFinished())
+ {
+ std::unique_ptr<Task> task;
+ thread->collectTask(task);
+ m_onTaskFinished(*this, std::move(task));
+
+ m_freeThreads.push(thread.get());
+ m_workingThreadsCount--;
+ break;
+ }
+ }
+ }
+ }
+
+private:
+ class Semaphore
+ {
+ public:
+ Semaphore(int count_ = 0)
+ : m_count(count_) {}
+
+ inline void notify()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ m_count++;
+ m_cv.notify_one();
+ }
+
+ inline void wait()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ while (m_count == 0){
+ m_cv.wait(lock);
+ }
+ m_count--;
+ }
+
+
+ private:
+ std::mutex m_mutex;
+ std::condition_variable m_cv;
+ int m_count;
+ };
+
+ class Thread
+ {
+ public:
+ Thread(uint32_t id_, Semaphore& completionSemaphore) : m_id(id_), m_completionSemaphore(completionSemaphore), m_running(false), m_taskFinished(false) {}
+ virtual ~Thread() { stop(); }
+
+ void start()
+ {
+ if (!m_running)
+ {
+ m_running = true;
+ m_thread = std::thread(&Thread::body, this);
+ }
+ }
+
+ void stop()
+ {
+ if (m_running)
+ {
+ m_running = false;
+ m_newTaskSemaphore.notify();
+ m_thread.join();
+ }
+ }
+
+ void processTask(std::unique_ptr<Task> task)
+ {
+ m_task = std::move(task);
+ m_taskFinished = false;
+ m_newTaskSemaphore.notify();
+ }
+
+ void collectTask(std::unique_ptr<Task>& task)
+ {
+ task = std::move(m_task);
+ m_task = nullptr;
+ m_taskFinished = false;
+ }
+
+ bool hasTask() const { return m_task != nullptr; }
+
+ bool isTaskFinished() const { return m_taskFinished; }
+
+ private:
+ void body()
+ {
+ while (1)
+ {
+ m_newTaskSemaphore.wait();
+
+ if (!m_running)
+ return;
+
+ m_task->process();
+ m_taskFinished = true;
+
+ m_completionSemaphore.notify();
+ }
+ }
+
+ uint32_t m_id;
+ Semaphore& m_completionSemaphore;
+ std::thread m_thread;
+ bool m_running;
+
+ std::unique_ptr<Task> m_task;
+ std::atomic<bool> m_taskFinished;
+
+ Semaphore m_newTaskSemaphore;
+ };
+
+private:
+ uint32_t m_workingThreadsCount;
+
+ std::queue<std::unique_ptr<Task>> m_tasks;
+ OnTaskFinishedFunction m_onTaskFinished;
+
+ std::vector<std::unique_ptr<Thread>> m_threads;
+ std::queue<Thread*> m_freeThreads;
+
+ Semaphore m_completionSemaphore;
+};
+