diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-16 18:50:27 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-16 18:50:27 +0100 |
| commit | 573907447db3e19d49c0bcaf3f659cf2d599c738 (patch) | |
| tree | 9374a390ad5ab89ce6991398e2eb11154a3dcb4e /src/zencore | |
| parent | add wipe prevention via file in data root dir (#548) (diff) | |
| download | zen-573907447db3e19d49c0bcaf3f659cf2d599c738.tar.xz zen-573907447db3e19d49c0bcaf3f659cf2d599c738.zip | |
blocking queue fix (#550)
* make BlockingQueue::m_CompleteAdding non-atomic
* ZenCacheDiskLayer::Flush logging
* name worker threads in ZenCacheDiskLayer::DiscoverBuckets
* name worker threads in gcv2
* improved logging in ZenServerInstance
* scrub threadpool naming
* remove waitpid handling, we should just call wait to kill zombie processes
Diffstat (limited to 'src/zencore')
| -rw-r--r-- | src/zencore/include/zencore/blockingqueue.h | 22 | ||||
| -rw-r--r-- | src/zencore/thread.cpp | 15 |
2 files changed, 11 insertions, 26 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h index f92df5a54..995ba6bfb 100644 --- a/src/zencore/include/zencore/blockingqueue.h +++ b/src/zencore/include/zencore/blockingqueue.h @@ -22,7 +22,6 @@ public: { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); - m_Size++; } m_NewItemSignal.notify_one(); @@ -30,20 +29,16 @@ public: bool WaitAndDequeue(T& Item) { - if (m_CompleteAdding.load()) + std::unique_lock Lock(m_Lock); + if (m_Queue.empty() && !m_CompleteAdding) { - return false; + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; }); } - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - if (!m_Queue.empty()) { Item = std::move(m_Queue.front()); m_Queue.pop_front(); - m_Size--; - return true; } @@ -52,9 +47,13 @@ public: void CompleteAdding() { - if (!m_CompleteAdding.load()) + std::unique_lock Lock(m_Lock); + if (!m_CompleteAdding) { - m_CompleteAdding.store(true); + ZEN_ASSERT(m_Queue.empty()); + m_CompleteAdding = true; + + Lock.unlock(); m_NewItemSignal.notify_all(); } } @@ -69,8 +68,7 @@ private: mutable std::mutex m_Lock; std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; + bool m_CompleteAdding = false; }; } // namespace zen diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index 1174f902f..a55bc5d69 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -668,20 +668,7 @@ ProcessHandle::Wait(int TimeoutMs) for (int SleepedTimeMS = 0;; SleepedTimeMS += SleepMs) { int WaitState = 0; - int Res = waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED); -# if 1 - ZEN_UNUSED(Res); -# else - if (Res == -1) - { - int32_t LastError = zen::GetLastError(); - if (LastError == ECHILD || LastError == ESRCH) - { - return true; - } - ThrowSystemError(static_cast<uint32_t>(LastError), "Process::Wait waitpid failed"sv); - } -# endif + waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED); if (kill(m_Pid, 0) < 0) { |