From 7115f60b91b5717d90f643fd692010905c7004db Mon Sep 17 00:00:00 2001 From: Bryan Galdrikian Date: Thu, 31 May 2018 11:36:08 -0700 Subject: Blast 1.1.3. See docs/release_notes.txt. --- test/src/utils/TaskDispatcher.h | 438 ++++++++++++++++++++-------------------- 1 file changed, 219 insertions(+), 219 deletions(-) mode change 100644 => 100755 test/src/utils/TaskDispatcher.h (limited to 'test/src/utils/TaskDispatcher.h') diff --git a/test/src/utils/TaskDispatcher.h b/test/src/utils/TaskDispatcher.h old mode 100644 new mode 100755 index e44434f..09df3b4 --- a/test/src/utils/TaskDispatcher.h +++ b/test/src/utils/TaskDispatcher.h @@ -1,219 +1,219 @@ -// This code contains NVIDIA Confidential Information and is disclosed to you -// under a form of NVIDIA software license agreement provided separately to you. -// -// Notice -// NVIDIA Corporation and its licensors retain all intellectual property and -// proprietary rights in and to this software and related documentation and -// any modifications thereto. Any use, reproduction, disclosure, or -// distribution of this software and related documentation without an express -// license agreement from NVIDIA Corporation is strictly prohibited. -// -// ALL NVIDIA DESIGN SPECIFICATIONS, CODE ARE PROVIDED "AS IS.". NVIDIA MAKES -// NO WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO -// THE MATERIALS, AND EXPRESSLY DISCLAIMS ALL IMPLIED WARRANTIES OF NONINFRINGEMENT, -// MERCHANTABILITY, AND FITNESS FOR A PARTICULAR PURPOSE. -// -// Information and code furnished is believed to be accurate and reliable. -// However, NVIDIA Corporation assumes no responsibility for the consequences of use of such -// information or for any infringement of patents or other rights of third parties that may -// result from its use. No license is granted by implication or otherwise under any patent -// or patent rights of NVIDIA Corporation. Details are subject to change without notice. -// This code supersedes and replaces all information previously supplied. -// NVIDIA Corporation products are not authorized for use as critical -// components in life support devices or systems without express written approval of -// NVIDIA Corporation. -// -// Copyright (c) 2016-2018 NVIDIA Corporation. All rights reserved. - - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -class TaskDispatcher -{ -public: - class Task - { - public: - virtual void process() = 0; - virtual ~Task() {}; - }; - - typedef std::function)> OnTaskFinishedFunction; - - TaskDispatcher(uint32_t threadCount, OnTaskFinishedFunction onTaskFinished) : - m_workingThreadsCount(0), m_onTaskFinished(onTaskFinished) - { - m_threads.resize(threadCount); - for (uint32_t i = 0; i < threadCount; i++) - { - m_threads[i] = std::unique_ptr(new Thread(i, m_completionSemaphore)); - m_threads[i]->start(); - m_freeThreads.push(m_threads[i].get()); - } - } - - void addTask(std::unique_ptr task) - { - m_tasks.push(std::move(task)); - } - - void process() - { - // main loop - while (m_tasks.size() > 0 || m_workingThreadsCount > 0) - { - // assign tasks - while (!(m_tasks.empty() || m_freeThreads.empty())) - { - auto task = std::move(m_tasks.front()); - m_tasks.pop(); - - Thread* freeThread = m_freeThreads.front(); - m_freeThreads.pop(); - - freeThread->processTask(std::move(task)); - m_workingThreadsCount++; - } - - m_completionSemaphore.wait(); - - // check for completion - for (std::unique_ptr& thread : m_threads) - { - if (thread->isTaskFinished()) - { - std::unique_ptr task; - thread->collectTask(task); - m_onTaskFinished(*this, std::move(task)); - - m_freeThreads.push(thread.get()); - m_workingThreadsCount--; - break; - } - } - } - } - -private: - class Semaphore - { - public: - Semaphore(int count_ = 0) - : m_count(count_) {} - - inline void notify() - { - std::unique_lock lock(m_mutex); - m_count++; - m_cv.notify_one(); - } - - inline void wait() - { - std::unique_lock lock(m_mutex); - - while (m_count == 0){ - m_cv.wait(lock); - } - m_count--; - } - - - private: - std::mutex m_mutex; - std::condition_variable m_cv; - int m_count; - }; - - class Thread - { - public: - Thread(uint32_t id_, Semaphore& completionSemaphore) : m_id(id_), m_completionSemaphore(completionSemaphore), m_running(false), m_taskFinished(false) {} - virtual ~Thread() { stop(); } - - void start() - { - if (!m_running) - { - m_running = true; - m_thread = std::thread(&Thread::body, this); - } - } - - void stop() - { - if (m_running) - { - m_running = false; - m_newTaskSemaphore.notify(); - m_thread.join(); - } - } - - void processTask(std::unique_ptr task) - { - m_task = std::move(task); - m_taskFinished = false; - m_newTaskSemaphore.notify(); - } - - void collectTask(std::unique_ptr& task) - { - task = std::move(m_task); - m_task = nullptr; - m_taskFinished = false; - } - - bool hasTask() const { return m_task != nullptr; } - - bool isTaskFinished() const { return m_taskFinished; } - - private: - void body() - { - while (1) - { - m_newTaskSemaphore.wait(); - - if (!m_running) - return; - - m_task->process(); - m_taskFinished = true; - - m_completionSemaphore.notify(); - } - } - - uint32_t m_id; - Semaphore& m_completionSemaphore; - std::thread m_thread; - bool m_running; - - std::unique_ptr m_task; - std::atomic m_taskFinished; - - Semaphore m_newTaskSemaphore; - }; - -private: - uint32_t m_workingThreadsCount; - - std::queue> m_tasks; - OnTaskFinishedFunction m_onTaskFinished; - - std::vector> m_threads; - std::queue m_freeThreads; - - Semaphore m_completionSemaphore; -}; - +// This code contains NVIDIA Confidential Information and is disclosed to you +// under a form of NVIDIA software license agreement provided separately to you. +// +// Notice +// NVIDIA Corporation and its licensors retain all intellectual property and +// proprietary rights in and to this software and related documentation and +// any modifications thereto. Any use, reproduction, disclosure, or +// distribution of this software and related documentation without an express +// license agreement from NVIDIA Corporation is strictly prohibited. +// +// ALL NVIDIA DESIGN SPECIFICATIONS, CODE ARE PROVIDED "AS IS.". NVIDIA MAKES +// NO WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO +// THE MATERIALS, AND EXPRESSLY DISCLAIMS ALL IMPLIED WARRANTIES OF NONINFRINGEMENT, +// MERCHANTABILITY, AND FITNESS FOR A PARTICULAR PURPOSE. +// +// Information and code furnished is believed to be accurate and reliable. +// However, NVIDIA Corporation assumes no responsibility for the consequences of use of such +// information or for any infringement of patents or other rights of third parties that may +// result from its use. No license is granted by implication or otherwise under any patent +// or patent rights of NVIDIA Corporation. Details are subject to change without notice. +// This code supersedes and replaces all information previously supplied. +// NVIDIA Corporation products are not authorized for use as critical +// components in life support devices or systems without express written approval of +// NVIDIA Corporation. +// +// Copyright (c) 2016-2018 NVIDIA Corporation. All rights reserved. + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +class TaskDispatcher +{ +public: + class Task + { + public: + virtual void process() = 0; + virtual ~Task() {}; + }; + + typedef std::function)> OnTaskFinishedFunction; + + TaskDispatcher(uint32_t threadCount, OnTaskFinishedFunction onTaskFinished) : + m_workingThreadsCount(0), m_onTaskFinished(onTaskFinished) + { + m_threads.resize(threadCount); + for (uint32_t i = 0; i < threadCount; i++) + { + m_threads[i] = std::unique_ptr(new Thread(i, m_completionSemaphore)); + m_threads[i]->start(); + m_freeThreads.push(m_threads[i].get()); + } + } + + void addTask(std::unique_ptr task) + { + m_tasks.push(std::move(task)); + } + + void process() + { + // main loop + while (m_tasks.size() > 0 || m_workingThreadsCount > 0) + { + // assign tasks + while (!(m_tasks.empty() || m_freeThreads.empty())) + { + auto task = std::move(m_tasks.front()); + m_tasks.pop(); + + Thread* freeThread = m_freeThreads.front(); + m_freeThreads.pop(); + + freeThread->processTask(std::move(task)); + m_workingThreadsCount++; + } + + m_completionSemaphore.wait(); + + // check for completion + for (std::unique_ptr& thread : m_threads) + { + if (thread->isTaskFinished()) + { + std::unique_ptr task; + thread->collectTask(task); + m_onTaskFinished(*this, std::move(task)); + + m_freeThreads.push(thread.get()); + m_workingThreadsCount--; + break; + } + } + } + } + +private: + class Semaphore + { + public: + Semaphore(int count_ = 0) + : m_count(count_) {} + + inline void notify() + { + std::unique_lock lock(m_mutex); + m_count++; + m_cv.notify_one(); + } + + inline void wait() + { + std::unique_lock lock(m_mutex); + + while (m_count == 0){ + m_cv.wait(lock); + } + m_count--; + } + + + private: + std::mutex m_mutex; + std::condition_variable m_cv; + int m_count; + }; + + class Thread + { + public: + Thread(uint32_t id_, Semaphore& completionSemaphore) : m_id(id_), m_completionSemaphore(completionSemaphore), m_running(false), m_taskFinished(false) {} + virtual ~Thread() { stop(); } + + void start() + { + if (!m_running) + { + m_running = true; + m_thread = std::thread(&Thread::body, this); + } + } + + void stop() + { + if (m_running) + { + m_running = false; + m_newTaskSemaphore.notify(); + m_thread.join(); + } + } + + void processTask(std::unique_ptr task) + { + m_task = std::move(task); + m_taskFinished = false; + m_newTaskSemaphore.notify(); + } + + void collectTask(std::unique_ptr& task) + { + task = std::move(m_task); + m_task = nullptr; + m_taskFinished = false; + } + + bool hasTask() const { return m_task != nullptr; } + + bool isTaskFinished() const { return m_taskFinished; } + + private: + void body() + { + while (1) + { + m_newTaskSemaphore.wait(); + + if (!m_running) + return; + + m_task->process(); + m_taskFinished = true; + + m_completionSemaphore.notify(); + } + } + + uint32_t m_id; + Semaphore& m_completionSemaphore; + std::thread m_thread; + bool m_running; + + std::unique_ptr m_task; + std::atomic m_taskFinished; + + Semaphore m_newTaskSemaphore; + }; + +private: + uint32_t m_workingThreadsCount; + + std::queue> m_tasks; + OnTaskFinishedFunction m_onTaskFinished; + + std::vector> m_threads; + std::queue m_freeThreads; + + Semaphore m_completionSemaphore; +}; + -- cgit v1.2.3