diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-16 18:50:27 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-16 18:50:27 +0100 |
| commit | 573907447db3e19d49c0bcaf3f659cf2d599c738 (patch) | |
| tree | 9374a390ad5ab89ce6991398e2eb11154a3dcb4e /src/zencore/include | |
| parent | add wipe prevention via file in data root dir (#548) (diff) | |
| download | zen-573907447db3e19d49c0bcaf3f659cf2d599c738.tar.xz zen-573907447db3e19d49c0bcaf3f659cf2d599c738.zip | |
blocking queue fix (#550)
* make BlockingQueue::m_CompleteAdding non-atomic
* ZenCacheDiskLayer::Flush logging
* name worker threads in ZenCacheDiskLayer::DiscoverBuckets
* name worker threads in gcv2
* improved logging in ZenServerInstance
* scrub threadpool naming
* remove waitpid handling, we should just call wait to kill zombie processes
Diffstat (limited to 'src/zencore/include')
| -rw-r--r-- | src/zencore/include/zencore/blockingqueue.h | 22 |
1 files changed, 10 insertions, 12 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h index f92df5a54..995ba6bfb 100644 --- a/src/zencore/include/zencore/blockingqueue.h +++ b/src/zencore/include/zencore/blockingqueue.h @@ -22,7 +22,6 @@ public: { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); - m_Size++; } m_NewItemSignal.notify_one(); @@ -30,20 +29,16 @@ public: bool WaitAndDequeue(T& Item) { - if (m_CompleteAdding.load()) + std::unique_lock Lock(m_Lock); + if (m_Queue.empty() && !m_CompleteAdding) { - return false; + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; }); } - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - if (!m_Queue.empty()) { Item = std::move(m_Queue.front()); m_Queue.pop_front(); - m_Size--; - return true; } @@ -52,9 +47,13 @@ public: void CompleteAdding() { - if (!m_CompleteAdding.load()) + std::unique_lock Lock(m_Lock); + if (!m_CompleteAdding) { - m_CompleteAdding.store(true); + ZEN_ASSERT(m_Queue.empty()); + m_CompleteAdding = true; + + Lock.unlock(); m_NewItemSignal.notify_all(); } } @@ -69,8 +68,7 @@ private: mutable std::mutex m_Lock; std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; + bool m_CompleteAdding = false; }; } // namespace zen |