aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/rpcrecording.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/rpcrecording.cpp')
-rw-r--r--src/zenutil/rpcrecording.cpp106
1 files changed, 46 insertions, 60 deletions
diff --git a/src/zenutil/rpcrecording.cpp b/src/zenutil/rpcrecording.cpp
index 54f27dee7..3c273abb6 100644
--- a/src/zenutil/rpcrecording.cpp
+++ b/src/zenutil/rpcrecording.cpp
@@ -13,11 +13,13 @@
#include <zencore/testutils.h>
ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/deque.h>
#include <fmt/format.h>
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
-#include <deque>
+#include <condition_variable>
+#include <mutex>
#include <thread>
namespace zen::cache {
@@ -282,7 +284,6 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
// for performance
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
-const int64_t MaximumBacklogCount = 2000;
std::string
MakeSegmentPath(uint64_t SegmentIndex)
@@ -366,11 +367,11 @@ private:
};
std::unique_ptr<std::thread> m_WriterThread;
- std::atomic_bool m_IsWriterReady{false};
std::atomic_bool m_IsActive{false};
- std::atomic_int64_t m_PendingRequests{0};
- RwLock m_RequestQueueLock;
- std::deque<QueuedRequest> m_RequestQueue;
+ std::mutex m_QueueMutex;
+ std::condition_variable m_QueueCondition;
+ bool m_IsWriterReady = false;
+ eastl::deque<QueuedRequest> m_RequestQueue;
void WriterThreadMain();
};
@@ -660,7 +661,8 @@ RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
- m_IsWriterReady.wait(false);
+ std::unique_lock<std::mutex> Lock(m_QueueMutex);
+ m_QueueCondition.wait(Lock, [this] { return m_IsWriterReady; });
}
void
@@ -668,15 +670,19 @@ RecordedRequestsWriter::EndWrite()
{
if (m_WriterThread)
{
- m_IsActive = false;
- const int64_t PendingCount = m_PendingRequests.fetch_add(1);
- m_PendingRequests.notify_all();
-
- if (PendingCount)
{
- ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
+ m_IsActive = false;
+ const size_t PendingCount = m_RequestQueue.size();
+
+ if (PendingCount)
+ {
+ ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ }
}
+ m_QueueCondition.notify_all();
+
if (m_WriterThread->joinable())
{
m_WriterThread->join();
@@ -695,12 +701,11 @@ RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, con
OwnedRequest.MakeOwned();
{
- RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)});
- m_PendingRequests.fetch_add(1);
}
- m_PendingRequests.notify_all();
+ m_QueueCondition.notify_one();
}
}
@@ -710,55 +715,36 @@ RecordedRequestsWriter::WriterThreadMain()
SetCurrentThreadName("rpc_writer");
EnsureCurrentSegment();
- m_IsWriterReady.store(true);
- m_IsWriterReady.notify_all();
+ {
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
+ m_IsWriterReady = true;
+ }
+ m_QueueCondition.notify_all();
- while (m_IsActive)
+ while (true)
{
- m_PendingRequests.wait(0);
+ QueuedRequest Request;
- while (m_PendingRequests)
{
- RwLock::ExclusiveLockScope _(m_RequestQueueLock);
- if (!m_RequestQueue.empty())
- {
- bool DrainBacklog = false;
-
- do
- {
- QueuedRequest Request = m_RequestQueue.front();
-
- m_RequestQueue.pop_front();
- m_PendingRequests.fetch_sub(1);
-
- // For a sufficiently large backlog, keep blocking queueing operations
- // until we get below the threshold
- DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount;
-
- if (!DrainBacklog)
- {
- _.ReleaseNow();
- }
-
- try
- {
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
- Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
- }
- catch (const std::exception&)
- {
- // TODO: what's the right behaviour here? The most likely cause would
- // be some I/O error and we probably ought to just shut down recording
- // at that point
- }
- } while (DrainBacklog);
- }
- else
+ std::unique_lock<std::mutex> Lock(m_QueueMutex);
+ m_QueueCondition.wait(Lock, [this] { return !m_RequestQueue.empty() || !m_IsActive; });
+
+ if (m_RequestQueue.empty())
{
- // shutdown increments this counter so we need to decrement it
- // here even though we didn't process any request
- m_PendingRequests.fetch_sub(1);
+ break;
}
+
+ Request = std::move(m_RequestQueue.front());
+ m_RequestQueue.pop_front();
+ }
+
+ try
+ {
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+ Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
+ }
+ catch (const std::exception&)
+ {
}
}
@@ -1119,7 +1105,7 @@ rpcrecord_forcelink()
{
}
-TEST_SUITE_BEGIN("rpc.recording");
+TEST_SUITE_BEGIN("util.rpcrecording");
TEST_CASE("rpc.record")
{