diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 14:06:42 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-18 14:06:42 +0100 |
| commit | 69296eaebd26cc9e6b87480f3b25a705b8c5bcb9 (patch) | |
| tree | 0641c4cb6c4c9736973fab5cbb1c55166dea7286 /src | |
| parent | add --hub-instance-config option to set lua config path for hub instances (#854) (diff) | |
| download | zen-69296eaebd26cc9e6b87480f3b25a705b8c5bcb9.tar.xz zen-69296eaebd26cc9e6b87480f3b25a705b8c5bcb9.zip | |
workaround for change in xmake behaviour around download file naming (#858)
xmake 3.0.7 has a different naming convention than 2.9.9 leading to issues in minio on_install
also includes a fix for rpc.record test on Linux by replacing std::atomic wait/notify with condition_variable
GCC's std::atomic<int64_t>::wait/notify on Linux uses a proxy hash table
mechanism (futex only supports 32-bit words) with known issues (GCC Bug
98033, Bug 115955). Replace with std::mutex + std::condition_variable
which is well-tested and consistent with the rest of the codebase.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/xmake.lua | 2 | ||||
| -rw-r--r-- | src/zenutil/rpcrecording.cpp | 100 |
2 files changed, 44 insertions, 58 deletions
diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index 7bfc3575e..aa306190f 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -19,7 +19,7 @@ target("zenserver") add_headerfiles("**.h") add_rules("utils.bin2c", {extensions = {".zip"}}) add_files("**.cpp") - add_files("$(buildir)/frontend/html.zip") + add_files(path.join(os.projectdir(), get_config("builddir") or get_config("buildir") or "build", "frontend/html.zip")) add_files("zenserver.cpp", {unity_ignored = true }) if is_plat("linux") and not (get_config("toolchain") or ""):find("clang") then diff --git a/src/zenutil/rpcrecording.cpp b/src/zenutil/rpcrecording.cpp index 28a0091cb..a9e95b9ce 100644 --- a/src/zenutil/rpcrecording.cpp +++ b/src/zenutil/rpcrecording.cpp @@ -17,7 +17,9 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <condition_variable> #include <deque> +#include <mutex> #include <thread> namespace zen::cache { @@ -282,7 +284,6 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try // for performance const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; -const int64_t MaximumBacklogCount = 2000; std::string MakeSegmentPath(uint64_t SegmentIndex) @@ -366,10 +367,10 @@ private: }; std::unique_ptr<std::thread> m_WriterThread; - std::atomic_bool m_IsWriterReady{false}; std::atomic_bool m_IsActive{false}; - std::atomic_int64_t m_PendingRequests{0}; - RwLock m_RequestQueueLock; + std::mutex m_QueueMutex; + std::condition_variable m_QueueCondition; + bool m_IsWriterReady = false; std::deque<QueuedRequest> m_RequestQueue; void WriterThreadMain(); @@ -660,7 +661,8 @@ RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); - m_IsWriterReady.wait(false); + std::unique_lock<std::mutex> Lock(m_QueueMutex); + m_QueueCondition.wait(Lock, [this] { return m_IsWriterReady; }); } void @@ -668,15 +670,19 @@ RecordedRequestsWriter::EndWrite() { if (m_WriterThread) { - m_IsActive = false; - const int64_t PendingCount = m_PendingRequests.fetch_add(1); - m_PendingRequests.notify_all(); - - if (PendingCount) { - ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); + std::lock_guard<std::mutex> Lock(m_QueueMutex); + m_IsActive = false; + const size_t PendingCount = m_RequestQueue.size(); + + if (PendingCount) + { + ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); + } } + m_QueueCondition.notify_all(); + if (m_WriterThread->joinable()) { m_WriterThread->join(); @@ -695,12 +701,11 @@ RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, con OwnedRequest.MakeOwned(); { - RwLock::ExclusiveLockScope _(m_RequestQueueLock); + std::lock_guard<std::mutex> Lock(m_QueueMutex); m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); - m_PendingRequests.fetch_add(1); } - m_PendingRequests.notify_all(); + m_QueueCondition.notify_one(); } } @@ -710,55 +715,36 @@ RecordedRequestsWriter::WriterThreadMain() SetCurrentThreadName("rpc_writer"); EnsureCurrentSegment(); - m_IsWriterReady.store(true); - m_IsWriterReady.notify_all(); + { + std::lock_guard<std::mutex> Lock(m_QueueMutex); + m_IsWriterReady = true; + } + m_QueueCondition.notify_all(); - while (m_IsActive) + while (true) { - m_PendingRequests.wait(0); + QueuedRequest Request; - while (m_PendingRequests) { - RwLock::ExclusiveLockScope _(m_RequestQueueLock); - if (!m_RequestQueue.empty()) - { - bool DrainBacklog = false; - - do - { - QueuedRequest Request = m_RequestQueue.front(); - - m_RequestQueue.pop_front(); - m_PendingRequests.fetch_sub(1); - - // For a sufficiently large backlog, keep blocking queueing operations - // until we get below the threshold - DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; - - if (!DrainBacklog) - { - _.ReleaseNow(); - } - - try - { - RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); - Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); - } - catch (const std::exception&) - { - // TODO: what's the right behaviour here? The most likely cause would - // be some I/O error and we probably ought to just shut down recording - // at that point - } - } while (DrainBacklog); - } - else + std::unique_lock<std::mutex> Lock(m_QueueMutex); + m_QueueCondition.wait(Lock, [this] { return !m_RequestQueue.empty() || !m_IsActive; }); + + if (m_RequestQueue.empty()) { - // shutdown increments this counter so we need to decrement it - // here even though we didn't process any request - m_PendingRequests.fetch_sub(1); + break; } + + Request = std::move(m_RequestQueue.front()); + m_RequestQueue.pop_front(); + } + + try + { + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); + } + catch (const std::exception&) + { } } |