aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-16 18:50:27 +0100
committerGitHub <[email protected]>2023-11-16 18:50:27 +0100
commit573907447db3e19d49c0bcaf3f659cf2d599c738 (patch)
tree9374a390ad5ab89ce6991398e2eb11154a3dcb4e /src/zencore/include
parentadd wipe prevention via file in data root dir (#548) (diff)
downloadzen-573907447db3e19d49c0bcaf3f659cf2d599c738.tar.xz
zen-573907447db3e19d49c0bcaf3f659cf2d599c738.zip
blocking queue fix (#550)
* make BlockingQueue::m_CompleteAdding non-atomic * ZenCacheDiskLayer::Flush logging * name worker threads in ZenCacheDiskLayer::DiscoverBuckets * name worker threads in gcv2 * improved logging in ZenServerInstance * scrub threadpool naming * remove waitpid handling, we should just call wait to kill zombie processes
Diffstat (limited to 'src/zencore/include')
-rw-r--r--src/zencore/include/zencore/blockingqueue.h22
1 files changed, 10 insertions, 12 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h
index f92df5a54..995ba6bfb 100644
--- a/src/zencore/include/zencore/blockingqueue.h
+++ b/src/zencore/include/zencore/blockingqueue.h
@@ -22,7 +22,6 @@ public:
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
- m_Size++;
}
m_NewItemSignal.notify_one();
@@ -30,20 +29,16 @@ public:
bool WaitAndDequeue(T& Item)
{
- if (m_CompleteAdding.load())
+ std::unique_lock Lock(m_Lock);
+ if (m_Queue.empty() && !m_CompleteAdding)
{
- return false;
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; });
}
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
if (!m_Queue.empty())
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
- m_Size--;
-
return true;
}
@@ -52,9 +47,13 @@ public:
void CompleteAdding()
{
- if (!m_CompleteAdding.load())
+ std::unique_lock Lock(m_Lock);
+ if (!m_CompleteAdding)
{
- m_CompleteAdding.store(true);
+ ZEN_ASSERT(m_Queue.empty());
+ m_CompleteAdding = true;
+
+ Lock.unlock();
m_NewItemSignal.notify_all();
}
}
@@ -69,8 +68,7 @@ private:
mutable std::mutex m_Lock;
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
+ bool m_CompleteAdding = false;
};
} // namespace zen