aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 14:06:42 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 14:06:42 +0100
commit69296eaebd26cc9e6b87480f3b25a705b8c5bcb9 (patch)
tree0641c4cb6c4c9736973fab5cbb1c55166dea7286 /src
parentadd --hub-instance-config option to set lua config path for hub instances (#854) (diff)
downloadzen-69296eaebd26cc9e6b87480f3b25a705b8c5bcb9.tar.xz
zen-69296eaebd26cc9e6b87480f3b25a705b8c5bcb9.zip
workaround for change in xmake behaviour around download file naming (#858)
xmake 3.0.7 has a different naming convention than 2.9.9 leading to issues in minio on_install also includes a fix for rpc.record test on Linux by replacing std::atomic wait/notify with condition_variable GCC's std::atomic<int64_t>::wait/notify on Linux uses a proxy hash table mechanism (futex only supports 32-bit words) with known issues (GCC Bug 98033, Bug 115955). Replace with std::mutex + std::condition_variable which is well-tested and consistent with the rest of the codebase.
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/xmake.lua2
-rw-r--r--src/zenutil/rpcrecording.cpp100
2 files changed, 44 insertions, 58 deletions
diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua
index 7bfc3575e..aa306190f 100644
--- a/src/zenserver/xmake.lua
+++ b/src/zenserver/xmake.lua
@@ -19,7 +19,7 @@ target("zenserver")
add_headerfiles("**.h")
add_rules("utils.bin2c", {extensions = {".zip"}})
add_files("**.cpp")
- add_files("$(buildir)/frontend/html.zip")
+ add_files(path.join(os.projectdir(), get_config("builddir") or get_config("buildir") or "build", "frontend/html.zip"))
add_files("zenserver.cpp", {unity_ignored = true })
if is_plat("linux") and not (get_config("toolchain") or ""):find("clang") then
diff --git a/src/zenutil/rpcrecording.cpp b/src/zenutil/rpcrecording.cpp
index 28a0091cb..a9e95b9ce 100644
--- a/src/zenutil/rpcrecording.cpp
+++ b/src/zenutil/rpcrecording.cpp
@@ -17,7 +17,9 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <condition_variable>
#include <deque>
+#include <mutex>
#include <thread>
namespace zen::cache {
@@ -282,7 +284,6 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
// for performance
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
-const int64_t MaximumBacklogCount = 2000;
std::string
MakeSegmentPath(uint64_t SegmentIndex)
@@ -366,10 +367,10 @@ private:
};
std::unique_ptr<std::thread> m_WriterThread;
- std::atomic_bool m_IsWriterReady{false};
std::atomic_bool m_IsActive{false};
- std::atomic_int64_t m_PendingRequests{0};
- RwLock m_RequestQueueLock;
+ std::mutex m_QueueMutex;
+ std::condition_variable m_QueueCondition;
+ bool m_IsWriterReady = false;
std::deque<QueuedRequest> m_RequestQueue;
void WriterThreadMain();
@@ -660,7 +661,8 @@ RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
- m_IsWriterReady.wait(false);
+ std::unique_lock<std::mutex> Lock(m_QueueMutex);
+ m_QueueCondition.wait(Lock, [this] { return m_IsWriterReady; });
}
void
@@ -668,15 +670,19 @@ RecordedRequestsWriter::EndWrite()
{
if (m_WriterThread)
{
- m_IsActive = false;
- const int64_t PendingCount = m_PendingRequests.fetch_add(1);
- m_PendingRequests.notify_all();
-
- if (PendingCount)
{
- ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
+ m_IsActive = false;
+ const size_t PendingCount = m_RequestQueue.size();
+
+ if (PendingCount)
+ {
+ ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ }
}
+ m_QueueCondition.notify_all();
+
if (m_WriterThread->joinable())
{
m_WriterThread->join();
@@ -695,12 +701,11 @@ RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, con
OwnedRequest.MakeOwned();
{
- RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)});
- m_PendingRequests.fetch_add(1);
}
- m_PendingRequests.notify_all();
+ m_QueueCondition.notify_one();
}
}
@@ -710,55 +715,36 @@ RecordedRequestsWriter::WriterThreadMain()
SetCurrentThreadName("rpc_writer");
EnsureCurrentSegment();
- m_IsWriterReady.store(true);
- m_IsWriterReady.notify_all();
+ {
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
+ m_IsWriterReady = true;
+ }
+ m_QueueCondition.notify_all();
- while (m_IsActive)
+ while (true)
{
- m_PendingRequests.wait(0);
+ QueuedRequest Request;
- while (m_PendingRequests)
{
- RwLock::ExclusiveLockScope _(m_RequestQueueLock);
- if (!m_RequestQueue.empty())
- {
- bool DrainBacklog = false;
-
- do
- {
- QueuedRequest Request = m_RequestQueue.front();
-
- m_RequestQueue.pop_front();
- m_PendingRequests.fetch_sub(1);
-
- // For a sufficiently large backlog, keep blocking queueing operations
- // until we get below the threshold
- DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount;
-
- if (!DrainBacklog)
- {
- _.ReleaseNow();
- }
-
- try
- {
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
- Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
- }
- catch (const std::exception&)
- {
- // TODO: what's the right behaviour here? The most likely cause would
- // be some I/O error and we probably ought to just shut down recording
- // at that point
- }
- } while (DrainBacklog);
- }
- else
+ std::unique_lock<std::mutex> Lock(m_QueueMutex);
+ m_QueueCondition.wait(Lock, [this] { return !m_RequestQueue.empty() || !m_IsActive; });
+
+ if (m_RequestQueue.empty())
{
- // shutdown increments this counter so we need to decrement it
- // here even though we didn't process any request
- m_PendingRequests.fetch_sub(1);
+ break;
}
+
+ Request = std::move(m_RequestQueue.front());
+ m_RequestQueue.pop_front();
+ }
+
+ try
+ {
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+ Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
+ }
+ catch (const std::exception&)
+ {
}
}