aboutsummaryrefslogtreecommitdiff
path: root/src/zencore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-16 18:50:27 +0100
committerGitHub <[email protected]>2023-11-16 18:50:27 +0100
commit573907447db3e19d49c0bcaf3f659cf2d599c738 (patch)
tree9374a390ad5ab89ce6991398e2eb11154a3dcb4e /src/zencore
parentadd wipe prevention via file in data root dir (#548) (diff)
downloadzen-573907447db3e19d49c0bcaf3f659cf2d599c738.tar.xz
zen-573907447db3e19d49c0bcaf3f659cf2d599c738.zip
blocking queue fix (#550)
* make BlockingQueue::m_CompleteAdding non-atomic * ZenCacheDiskLayer::Flush logging * name worker threads in ZenCacheDiskLayer::DiscoverBuckets * name worker threads in gcv2 * improved logging in ZenServerInstance * scrub threadpool naming * remove waitpid handling, we should just call wait to kill zombie processes
Diffstat (limited to 'src/zencore')
-rw-r--r--src/zencore/include/zencore/blockingqueue.h22
-rw-r--r--src/zencore/thread.cpp15
2 files changed, 11 insertions, 26 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h
index f92df5a54..995ba6bfb 100644
--- a/src/zencore/include/zencore/blockingqueue.h
+++ b/src/zencore/include/zencore/blockingqueue.h
@@ -22,7 +22,6 @@ public:
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
- m_Size++;
}
m_NewItemSignal.notify_one();
@@ -30,20 +29,16 @@ public:
bool WaitAndDequeue(T& Item)
{
- if (m_CompleteAdding.load())
+ std::unique_lock Lock(m_Lock);
+ if (m_Queue.empty() && !m_CompleteAdding)
{
- return false;
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; });
}
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
if (!m_Queue.empty())
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
- m_Size--;
-
return true;
}
@@ -52,9 +47,13 @@ public:
void CompleteAdding()
{
- if (!m_CompleteAdding.load())
+ std::unique_lock Lock(m_Lock);
+ if (!m_CompleteAdding)
{
- m_CompleteAdding.store(true);
+ ZEN_ASSERT(m_Queue.empty());
+ m_CompleteAdding = true;
+
+ Lock.unlock();
m_NewItemSignal.notify_all();
}
}
@@ -69,8 +68,7 @@ private:
mutable std::mutex m_Lock;
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
+ bool m_CompleteAdding = false;
};
} // namespace zen
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index 1174f902f..a55bc5d69 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -668,20 +668,7 @@ ProcessHandle::Wait(int TimeoutMs)
for (int SleepedTimeMS = 0;; SleepedTimeMS += SleepMs)
{
int WaitState = 0;
- int Res = waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED);
-# if 1
- ZEN_UNUSED(Res);
-# else
- if (Res == -1)
- {
- int32_t LastError = zen::GetLastError();
- if (LastError == ECHILD || LastError == ESRCH)
- {
- return true;
- }
- ThrowSystemError(static_cast<uint32_t>(LastError), "Process::Wait waitpid failed"sv);
- }
-# endif
+ waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED);
if (kill(m_Pid, 0) < 0)
{