aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-16 18:50:27 +0100
committerGitHub <[email protected]>2023-11-16 18:50:27 +0100
commit573907447db3e19d49c0bcaf3f659cf2d599c738 (patch)
tree9374a390ad5ab89ce6991398e2eb11154a3dcb4e /src
parentadd wipe prevention via file in data root dir (#548) (diff)
downloadzen-573907447db3e19d49c0bcaf3f659cf2d599c738.tar.xz
zen-573907447db3e19d49c0bcaf3f659cf2d599c738.zip
blocking queue fix (#550)
* make BlockingQueue::m_CompleteAdding non-atomic * ZenCacheDiskLayer::Flush logging * name worker threads in ZenCacheDiskLayer::DiscoverBuckets * name worker threads in gcv2 * improved logging in ZenServerInstance * scrub threadpool naming * remove waitpid handling, we should just call wait to kill zombie processes
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/blockingqueue.h22
-rw-r--r--src/zencore/thread.cpp15
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp43
-rw-r--r--src/zenserver/zenserver.cpp2
-rw-r--r--src/zenstore/gc.cpp2
-rw-r--r--src/zenutil/include/zenutil/zenserverprocess.h2
-rw-r--r--src/zenutil/zenserverprocess.cpp46
7 files changed, 78 insertions, 54 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h
index f92df5a54..995ba6bfb 100644
--- a/src/zencore/include/zencore/blockingqueue.h
+++ b/src/zencore/include/zencore/blockingqueue.h
@@ -22,7 +22,6 @@ public:
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
- m_Size++;
}
m_NewItemSignal.notify_one();
@@ -30,20 +29,16 @@ public:
bool WaitAndDequeue(T& Item)
{
- if (m_CompleteAdding.load())
+ std::unique_lock Lock(m_Lock);
+ if (m_Queue.empty() && !m_CompleteAdding)
{
- return false;
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; });
}
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
if (!m_Queue.empty())
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
- m_Size--;
-
return true;
}
@@ -52,9 +47,13 @@ public:
void CompleteAdding()
{
- if (!m_CompleteAdding.load())
+ std::unique_lock Lock(m_Lock);
+ if (!m_CompleteAdding)
{
- m_CompleteAdding.store(true);
+ ZEN_ASSERT(m_Queue.empty());
+ m_CompleteAdding = true;
+
+ Lock.unlock();
m_NewItemSignal.notify_all();
}
}
@@ -69,8 +68,7 @@ private:
mutable std::mutex m_Lock;
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
+ bool m_CompleteAdding = false;
};
} // namespace zen
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index 1174f902f..a55bc5d69 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -668,20 +668,7 @@ ProcessHandle::Wait(int TimeoutMs)
for (int SleepedTimeMS = 0;; SleepedTimeMS += SleepMs)
{
int WaitState = 0;
- int Res = waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED);
-# if 1
- ZEN_UNUSED(Res);
-# else
- if (Res == -1)
- {
- int32_t LastError = zen::GetLastError();
- if (LastError == ECHILD || LastError == ESRCH)
- {
- return true;
- }
- ThrowSystemError(static_cast<uint32_t>(LastError), "Process::Wait waitpid failed"sv);
- }
-# endif
+ waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED);
if (kill(m_Pid, 0) < 0)
{
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index a6cb54444..afb974d76 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -3210,7 +3210,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
const size_t MaxHwTreadUse = std::thread::hardware_concurrency();
const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size()));
- WorkerThreadPool Pool(WorkerThreadPoolCount);
+ WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate");
Latch WorkLatch(1);
for (auto& BucketPath : FoundBucketDirectories)
{
@@ -3300,9 +3300,17 @@ void
ZenCacheDiskLayer::Flush()
{
std::vector<CacheBucket*> Buckets;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (Buckets.empty())
+ {
+ return;
+ }
+ ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
{
- RwLock::SharedLockScope _(m_Lock);
+ RwLock::SharedLockScope __(m_Lock);
if (m_Buckets.empty())
{
return;
@@ -3314,21 +3322,26 @@ ZenCacheDiskLayer::Flush()
Buckets.push_back(Bucket);
}
}
- const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
- const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size()));
-
- WorkerThreadPool Pool(WorkerThreadPoolCount);
- Latch WorkLatch(1);
- for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
- Bucket->Flush();
- });
+ const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
+ const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size()));
+
+ WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush");
+ Latch WorkLatch(1);
+ for (auto& Bucket : Buckets)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+ Bucket->Flush();
+ });
+ }
+ WorkLatch.CountDown();
+ while (!WorkLatch.Wait(1000))
+ {
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
+ }
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
}
void
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 8359f5088..e6724e40a 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -782,7 +782,7 @@ ZenServer::ScrubStorage()
Stopwatch Timer;
ZEN_INFO("Storage validation STARTING");
- WorkerThreadPool ThreadPool{1};
+ WorkerThreadPool ThreadPool{1, "Scrub"};
ScrubContext Ctx{ThreadPool};
m_CidStore->ScrubStorage(Ctx);
m_ProjectStore->ScrubStorage(Ctx);
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 778a47626..7f9ca5236 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -581,7 +581,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
Result.ReferencerStats.resize(m_GcReferencers.size());
- WorkerThreadPool ThreadPool(WorkerThreadPoolCount);
+ WorkerThreadPool ThreadPool(WorkerThreadPoolCount, "GCV2");
ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
if (!m_GcReferencers.empty())
diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h
index 60adfba54..14d19ad39 100644
--- a/src/zenutil/include/zenutil/zenserverprocess.h
+++ b/src/zenutil/include/zenutil/zenserverprocess.h
@@ -70,6 +70,7 @@ struct ZenServerInstance
void Detach();
inline int GetPid() { return m_Process.Pid(); }
inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; }
+ bool IsRunning();
void SetTestDir(std::filesystem::path TestDir);
inline void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view())
@@ -95,6 +96,7 @@ private:
std::filesystem::path m_TestDir;
int m_BasePort = 0;
std::optional<int> m_OwnerPid;
+ std::string m_Name;
void CreateShutdownEvent(int BasePort);
void SpawnServer(int BasePort, std::string_view AdditionalServerArgs, int WaitTimeoutMs);
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index 3e735dc07..2971b0fab 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -487,20 +487,33 @@ ZenServerInstance::SignalShutdown()
void
ZenServerInstance::Shutdown()
{
- if (m_Process.IsValid() && m_ShutdownOnDestroy)
+ if (m_Process.IsValid())
{
- if (m_Terminate)
+ if (m_ShutdownOnDestroy)
{
- ZEN_INFO("Terminating zenserver process");
- m_Process.Terminate(111);
- m_Process.Reset();
+ if (m_Terminate)
+ {
+ ZEN_INFO("Terminating zenserver process {}", m_Name);
+ m_Process.Terminate(111);
+ m_Process.Reset();
+ ZEN_DEBUG("zenserver process {} ({}) terminated", m_Name, m_Process.Pid());
+ }
+ else
+ {
+ ZEN_DEBUG("Requesting zenserver process {} ({}) to shut down", m_Name, m_Process.Pid());
+ SignalShutdown();
+ ZEN_DEBUG("Waiting for zenserver process {} ({}) to shut down", m_Name, m_Process.Pid());
+ while (!m_Process.Wait(5000))
+ {
+ ZEN_WARN("Waiting for zenserver process {} ({}) timed out", m_Name, m_Process.Pid());
+ }
+ m_Process.Reset();
+ }
+ ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid());
}
else
{
- SignalShutdown();
- ZEN_DEBUG("Waiting for zenserver process {} to shut down", m_Process.Pid());
- m_Process.Wait();
- m_Process.Reset();
+ ZEN_DEBUG("Detached from zenserver process {} ({})", m_Name, m_Process.Pid());
}
}
}
@@ -521,6 +534,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr
ExtendableStringBuilder<32> LogId;
LogId << "Zen" << ChildId;
+ m_Name = LogId.ToString();
ExtendableStringBuilder<512> CommandLine;
CommandLine << "zenserver" ZEN_EXE_SUFFIX_LITERAL; // see CreateProc() call for actual binary path
@@ -534,7 +548,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr
m_OwnerPid = MyPid;
}
- CommandLine << " --test --log-id " << LogId;
+ CommandLine << " --test --log-id " << m_Name;
CommandLine << " --no-sentry";
}
@@ -613,7 +627,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr
{
if (!WaitUntilReady(WaitTimeoutMs))
{
- throw std::runtime_error(fmt::format("server start timeout after {}", NiceTimeSpanMs(WaitTimeoutMs)));
+ throw std::runtime_error(fmt::format("server start of {} timeout after {}", m_Name, NiceTimeSpanMs(WaitTimeoutMs)));
}
// Determine effective base port
@@ -737,4 +751,14 @@ ZenServerInstance::SetTestDir(std::filesystem::path TestDir)
m_TestDir = TestDir;
}
+bool
+ZenServerInstance::IsRunning()
+{
+ if (!m_Process.IsValid())
+ {
+ return false;
+ }
+ return m_Process.IsRunning();
+}
+
} // namespace zen