diff options
Diffstat (limited to 'src/zenutil/rpcrecording.cpp')
| -rw-r--r-- | src/zenutil/rpcrecording.cpp | 106 |
1 files changed, 46 insertions, 60 deletions
diff --git a/src/zenutil/rpcrecording.cpp b/src/zenutil/rpcrecording.cpp index 54f27dee7..3c273abb6 100644 --- a/src/zenutil/rpcrecording.cpp +++ b/src/zenutil/rpcrecording.cpp @@ -13,11 +13,13 @@ #include <zencore/testutils.h> ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/deque.h> #include <fmt/format.h> #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END -#include <deque> +#include <condition_variable> +#include <mutex> #include <thread> namespace zen::cache { @@ -282,7 +284,6 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try // for performance const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; -const int64_t MaximumBacklogCount = 2000; std::string MakeSegmentPath(uint64_t SegmentIndex) @@ -366,11 +367,11 @@ private: }; std::unique_ptr<std::thread> m_WriterThread; - std::atomic_bool m_IsWriterReady{false}; std::atomic_bool m_IsActive{false}; - std::atomic_int64_t m_PendingRequests{0}; - RwLock m_RequestQueueLock; - std::deque<QueuedRequest> m_RequestQueue; + std::mutex m_QueueMutex; + std::condition_variable m_QueueCondition; + bool m_IsWriterReady = false; + eastl::deque<QueuedRequest> m_RequestQueue; void WriterThreadMain(); }; @@ -660,7 +661,8 @@ RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); - m_IsWriterReady.wait(false); + std::unique_lock<std::mutex> Lock(m_QueueMutex); + m_QueueCondition.wait(Lock, [this] { return m_IsWriterReady; }); } void @@ -668,15 +670,19 @@ RecordedRequestsWriter::EndWrite() { if (m_WriterThread) { - m_IsActive = false; - const int64_t PendingCount = m_PendingRequests.fetch_add(1); - m_PendingRequests.notify_all(); - - if (PendingCount) { - ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); + std::lock_guard<std::mutex> Lock(m_QueueMutex); + m_IsActive = false; + const size_t PendingCount = m_RequestQueue.size(); + + if (PendingCount) + { + ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); + } } + m_QueueCondition.notify_all(); + if (m_WriterThread->joinable()) { m_WriterThread->join(); @@ -695,12 +701,11 @@ RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, con OwnedRequest.MakeOwned(); { - RwLock::ExclusiveLockScope _(m_RequestQueueLock); + std::lock_guard<std::mutex> Lock(m_QueueMutex); m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); - m_PendingRequests.fetch_add(1); } - m_PendingRequests.notify_all(); + m_QueueCondition.notify_one(); } } @@ -710,55 +715,36 @@ RecordedRequestsWriter::WriterThreadMain() SetCurrentThreadName("rpc_writer"); EnsureCurrentSegment(); - m_IsWriterReady.store(true); - m_IsWriterReady.notify_all(); + { + std::lock_guard<std::mutex> Lock(m_QueueMutex); + m_IsWriterReady = true; + } + m_QueueCondition.notify_all(); - while (m_IsActive) + while (true) { - m_PendingRequests.wait(0); + QueuedRequest Request; - while (m_PendingRequests) { - RwLock::ExclusiveLockScope _(m_RequestQueueLock); - if (!m_RequestQueue.empty()) - { - bool DrainBacklog = false; - - do - { - QueuedRequest Request = m_RequestQueue.front(); - - m_RequestQueue.pop_front(); - m_PendingRequests.fetch_sub(1); - - // For a sufficiently large backlog, keep blocking queueing operations - // until we get below the threshold - DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; - - if (!DrainBacklog) - { - _.ReleaseNow(); - } - - try - { - RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); - Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); - } - catch (const std::exception&) - { - // TODO: what's the right behaviour here? The most likely cause would - // be some I/O error and we probably ought to just shut down recording - // at that point - } - } while (DrainBacklog); - } - else + std::unique_lock<std::mutex> Lock(m_QueueMutex); + m_QueueCondition.wait(Lock, [this] { return !m_RequestQueue.empty() || !m_IsActive; }); + + if (m_RequestQueue.empty()) { - // shutdown increments this counter so we need to decrement it - // here even though we didn't process any request - m_PendingRequests.fetch_sub(1); + break; } + + Request = std::move(m_RequestQueue.front()); + m_RequestQueue.pop_front(); + } + + try + { + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); + } + catch (const std::exception&) + { } } @@ -1119,7 +1105,7 @@ rpcrecord_forcelink() { } -TEST_SUITE_BEGIN("rpc.recording"); +TEST_SUITE_BEGIN("util.rpcrecording"); TEST_CASE("rpc.record") { |