diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-17 14:38:13 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-17 14:38:13 +0100 |
| commit | b0082066596178f7b72d9963bffdec306a5b6250 (patch) | |
| tree | 83505561e50e8a09237b05bddb579a714344c219 /src | |
| parent | use dynamic port assignment for tests (#545) (diff) | |
| download | zen-b0082066596178f7b72d9963bffdec306a5b6250.tar.xz zen-b0082066596178f7b72d9963bffdec306a5b6250.zip | |
fix named event (#553)
* fix named event timout and test, fix blocking queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/blockingqueue.h | 24 | ||||
| -rw-r--r-- | src/zencore/thread.cpp | 13 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 16 |
3 files changed, 34 insertions, 19 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h index 995ba6bfb..e91fdc659 100644 --- a/src/zencore/include/zencore/blockingqueue.h +++ b/src/zencore/include/zencore/blockingqueue.h @@ -30,19 +30,22 @@ public: bool WaitAndDequeue(T& Item) { std::unique_lock Lock(m_Lock); - if (m_Queue.empty() && !m_CompleteAdding) + if (m_Queue.empty()) { + if (m_CompleteAdding) + { + return false; + } m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; }); + if (m_Queue.empty()) + { + ZEN_ASSERT(m_CompleteAdding); + return false; + } } - - if (!m_Queue.empty()) - { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - return true; - } - - return false; + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + return true; } void CompleteAdding() @@ -50,7 +53,6 @@ public: std::unique_lock Lock(m_Lock); if (!m_CompleteAdding) { - ZEN_ASSERT(m_Queue.empty()); m_CompleteAdding = true; Lock.unlock(); diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index a55bc5d69..6092895b0 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -411,9 +411,10 @@ NamedEvent::Wait(int TimeoutMs) } # if defined(_GNU_SOURCE) + const int TimeoutSec = TimeoutMs / 1000; struct timespec TimeoutValue = { - .tv_sec = TimeoutMs >> 10, - .tv_nsec = (TimeoutMs & 0x3ff) << 20, + .tv_sec = TimeoutSec, + .tv_nsec = (TimeoutMs - (TimeoutSec * 1000)) * 1000000, }; Result = semtimedop(Sem, &SemOp, 1, &TimeoutValue); # else @@ -431,7 +432,6 @@ NamedEvent::Wait(int TimeoutMs) TimeoutMs -= SleepTimeMs; } while (TimeoutMs > 0); # endif // _GNU_SOURCE - return Result == 0; #endif } @@ -1225,19 +1225,20 @@ TEST_CASE("NamedEvent") CHECK(!bEventSet); } + NamedEvent ReadyEvent(Name + "_ready"); + // Thread check std::thread Waiter = std::thread([Name]() { NamedEvent ReadyEvent(Name + "_ready"); ReadyEvent.Set(); NamedEvent TestEvent(Name); - TestEvent.Wait(100); + TestEvent.Wait(1000); }); - NamedEvent ReadyEvent(Name + "_ready"); ReadyEvent.Wait(); - zen::Sleep(50); + zen::Sleep(100); TestEvent.Set(); Waiter.join(); diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 3a4b1e6a1..bdb9de9dc 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -150,7 +150,10 @@ struct WorkerThreadPool::Impl for (std::thread& Thread : m_WorkerThreads) { - Thread.join(); + if (Thread.joinable()) + { + Thread.join(); + } } m_WorkerThreads.clear(); @@ -219,7 +222,16 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work) } else { - Work->Execute(); + try + { + Work->Execute(); + } + catch (std::exception& e) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what()); + } } } |