diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-16 18:50:27 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-16 18:50:27 +0100 |
| commit | 573907447db3e19d49c0bcaf3f659cf2d599c738 (patch) | |
| tree | 9374a390ad5ab89ce6991398e2eb11154a3dcb4e /src | |
| parent | add wipe prevention via file in data root dir (#548) (diff) | |
| download | zen-573907447db3e19d49c0bcaf3f659cf2d599c738.tar.xz zen-573907447db3e19d49c0bcaf3f659cf2d599c738.zip | |
blocking queue fix (#550)
* make BlockingQueue::m_CompleteAdding non-atomic
* ZenCacheDiskLayer::Flush logging
* name worker threads in ZenCacheDiskLayer::DiscoverBuckets
* name worker threads in gcv2
* improved logging in ZenServerInstance
* scrub threadpool naming
* remove waitpid handling, we should just call wait to kill zombie processes
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/blockingqueue.h | 22 | ||||
| -rw-r--r-- | src/zencore/thread.cpp | 15 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 43 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 2 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 46 |
7 files changed, 78 insertions, 54 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h index f92df5a54..995ba6bfb 100644 --- a/src/zencore/include/zencore/blockingqueue.h +++ b/src/zencore/include/zencore/blockingqueue.h @@ -22,7 +22,6 @@ public: { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); - m_Size++; } m_NewItemSignal.notify_one(); @@ -30,20 +29,16 @@ public: bool WaitAndDequeue(T& Item) { - if (m_CompleteAdding.load()) + std::unique_lock Lock(m_Lock); + if (m_Queue.empty() && !m_CompleteAdding) { - return false; + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; }); } - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - if (!m_Queue.empty()) { Item = std::move(m_Queue.front()); m_Queue.pop_front(); - m_Size--; - return true; } @@ -52,9 +47,13 @@ public: void CompleteAdding() { - if (!m_CompleteAdding.load()) + std::unique_lock Lock(m_Lock); + if (!m_CompleteAdding) { - m_CompleteAdding.store(true); + ZEN_ASSERT(m_Queue.empty()); + m_CompleteAdding = true; + + Lock.unlock(); m_NewItemSignal.notify_all(); } } @@ -69,8 +68,7 @@ private: mutable std::mutex m_Lock; std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; + bool m_CompleteAdding = false; }; } // namespace zen diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index 1174f902f..a55bc5d69 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -668,20 +668,7 @@ ProcessHandle::Wait(int TimeoutMs) for (int SleepedTimeMS = 0;; SleepedTimeMS += SleepMs) { int WaitState = 0; - int Res = waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED); -# if 1 - ZEN_UNUSED(Res); -# else - if (Res == -1) - { - int32_t LastError = zen::GetLastError(); - if (LastError == ECHILD || LastError == ESRCH) - { - return true; - } - ThrowSystemError(static_cast<uint32_t>(LastError), "Process::Wait waitpid failed"sv); - } -# endif + waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED); if (kill(m_Pid, 0) < 0) { diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index a6cb54444..afb974d76 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -3210,7 +3210,7 @@ ZenCacheDiskLayer::DiscoverBuckets() const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size())); - WorkerThreadPool Pool(WorkerThreadPoolCount); + WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate"); Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { @@ -3300,9 +3300,17 @@ void ZenCacheDiskLayer::Flush() { std::vector<CacheBucket*> Buckets; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Buckets.empty()) + { + return; + } + ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); { - RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope __(m_Lock); if (m_Buckets.empty()) { return; @@ -3314,21 +3322,26 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount); - Latch WorkLatch(1); - for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - Bucket->Flush(); - }); + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size())); + + WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush"); + Latch WorkLatch(1); + for (auto& Bucket : Buckets) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); + } + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); + } } - WorkLatch.CountDown(); - WorkLatch.Wait(); } void diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 8359f5088..e6724e40a 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -782,7 +782,7 @@ ZenServer::ScrubStorage() Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); - WorkerThreadPool ThreadPool{1}; + WorkerThreadPool ThreadPool{1, "Scrub"}; ScrubContext Ctx{ThreadPool}; m_CidStore->ScrubStorage(Ctx); m_ProjectStore->ScrubStorage(Ctx); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 778a47626..7f9ca5236 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -581,7 +581,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) Result.ReferencerStats.resize(m_GcReferencers.size()); - WorkerThreadPool ThreadPool(WorkerThreadPoolCount); + WorkerThreadPool ThreadPool(WorkerThreadPoolCount, "GCV2"); ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); if (!m_GcReferencers.empty()) diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 60adfba54..14d19ad39 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -70,6 +70,7 @@ struct ZenServerInstance void Detach(); inline int GetPid() { return m_Process.Pid(); } inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } + bool IsRunning(); void SetTestDir(std::filesystem::path TestDir); inline void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view()) @@ -95,6 +96,7 @@ private: std::filesystem::path m_TestDir; int m_BasePort = 0; std::optional<int> m_OwnerPid; + std::string m_Name; void CreateShutdownEvent(int BasePort); void SpawnServer(int BasePort, std::string_view AdditionalServerArgs, int WaitTimeoutMs); diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 3e735dc07..2971b0fab 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -487,20 +487,33 @@ ZenServerInstance::SignalShutdown() void ZenServerInstance::Shutdown() { - if (m_Process.IsValid() && m_ShutdownOnDestroy) + if (m_Process.IsValid()) { - if (m_Terminate) + if (m_ShutdownOnDestroy) { - ZEN_INFO("Terminating zenserver process"); - m_Process.Terminate(111); - m_Process.Reset(); + if (m_Terminate) + { + ZEN_INFO("Terminating zenserver process {}", m_Name); + m_Process.Terminate(111); + m_Process.Reset(); + ZEN_DEBUG("zenserver process {} ({}) terminated", m_Name, m_Process.Pid()); + } + else + { + ZEN_DEBUG("Requesting zenserver process {} ({}) to shut down", m_Name, m_Process.Pid()); + SignalShutdown(); + ZEN_DEBUG("Waiting for zenserver process {} ({}) to shut down", m_Name, m_Process.Pid()); + while (!m_Process.Wait(5000)) + { + ZEN_WARN("Waiting for zenserver process {} ({}) timed out", m_Name, m_Process.Pid()); + } + m_Process.Reset(); + } + ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid()); } else { - SignalShutdown(); - ZEN_DEBUG("Waiting for zenserver process {} to shut down", m_Process.Pid()); - m_Process.Wait(); - m_Process.Reset(); + ZEN_DEBUG("Detached from zenserver process {} ({})", m_Name, m_Process.Pid()); } } } @@ -521,6 +534,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr ExtendableStringBuilder<32> LogId; LogId << "Zen" << ChildId; + m_Name = LogId.ToString(); ExtendableStringBuilder<512> CommandLine; CommandLine << "zenserver" ZEN_EXE_SUFFIX_LITERAL; // see CreateProc() call for actual binary path @@ -534,7 +548,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr m_OwnerPid = MyPid; } - CommandLine << " --test --log-id " << LogId; + CommandLine << " --test --log-id " << m_Name; CommandLine << " --no-sentry"; } @@ -613,7 +627,7 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr { if (!WaitUntilReady(WaitTimeoutMs)) { - throw std::runtime_error(fmt::format("server start timeout after {}", NiceTimeSpanMs(WaitTimeoutMs))); + throw std::runtime_error(fmt::format("server start of {} timeout after {}", m_Name, NiceTimeSpanMs(WaitTimeoutMs))); } // Determine effective base port @@ -737,4 +751,14 @@ ZenServerInstance::SetTestDir(std::filesystem::path TestDir) m_TestDir = TestDir; } +bool +ZenServerInstance::IsRunning() +{ + if (!m_Process.IsValid()) + { + return false; + } + return m_Process.IsRunning(); +} + } // namespace zen |