aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cache
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2024-01-31 15:55:45 +0100
committerGitHub <[email protected]>2024-01-31 15:55:45 +0100
commit8cb2b93f31c59b3261951a70c48e5da548194002 (patch)
treeb4e981b148e2cb1c31ade0fcabb1901caf13ae6b /src/zenutil/cache
parentonly try to traverse an objectstore bucket if it really exists (#646) (diff)
downloadzen-8cb2b93f31c59b3261951a70c48e5da548194002.tar.xz
zen-8cb2b93f31c59b3261951a70c48e5da548194002.zip
changed RPC recording to MPSC setup (#638)
fixes rare race condition when using RPC recording for long periods of time
Diffstat (limited to 'src/zenutil/cache')
-rw-r--r--src/zenutil/cache/rpcrecording.cpp235
1 files changed, 171 insertions, 64 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 5e00e8852..759af792d 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -16,6 +16,9 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <deque>
+#include <thread>
+
namespace zen::cache {
const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
@@ -84,7 +87,7 @@ struct RecordedRequestsWriter
m_Entries.clear();
}
- uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
RwLock::ExclusiveLockScope Lock(m_Lock);
uint64_t RequestIndex = m_Entries.size();
@@ -115,22 +118,21 @@ struct RecordedRequestsWriter
if (Ec)
{
Entry.Length = 0;
- return ~0ull;
}
- return RequestIndex;
}
- Lock.ReleaseNow();
-
- BasicFile RequestFile;
- RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
- std::error_code Ec;
- RequestFile.WriteAll(RequestBuffer, Ec);
- if (Ec)
+ else
{
- Entry.Length = 0;
- return ~0ull;
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ }
}
- return RequestIndex;
}
std::filesystem::path m_BasePath;
@@ -214,12 +216,10 @@ public:
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
private:
- virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
RecordedRequestsWriter m_RecordedRequests;
};
@@ -239,7 +239,6 @@ public:
{
return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
}
- virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
private:
std::uint64_t m_RequestCount;
@@ -282,6 +281,7 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
// for performance
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+const int64_t MaximumBacklogCount = 2000;
std::string
MakeSegmentPath(uint64_t SegmentIndex)
@@ -297,9 +297,9 @@ struct RecordedRequestsSegmentWriter
RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
- void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
- uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
- void EndWrite();
+ void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ void EndWrite();
inline uint64_t GetRequestCount() const
{
@@ -335,12 +335,12 @@ private:
struct RecordedRequestsWriter
{
- void BeginWrite(const std::filesystem::path& BasePath);
- RecordedRequestsSegmentWriter& EnsureCurrentSegment();
- void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
- void EndWrite();
- void WriteRecordingMetadata();
- uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ RecordedRequestsWriter();
+ ~RecordedRequestsWriter();
+
+ void BeginWrite(const std::filesystem::path& BasePath);
+ void EndWrite();
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
private:
std::filesystem::path m_BasePath;
@@ -351,6 +351,26 @@ private:
std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
std::vector<uint64_t> m_SegmentBaseIndex;
uint64_t m_NextSegmentBaseIndex = 0;
+
+ RecordedRequestsSegmentWriter& EnsureCurrentSegment();
+ void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
+ void WriteRecordingMetadata();
+
+ // I/O thread state
+
+ struct QueuedRequest
+ {
+ RecordedRequestInfo RequestInfo;
+ IoBuffer RequestBuffer;
+ };
+
+ std::unique_ptr<std::thread> m_WriterThread;
+ std::atomic_bool m_IsActive{false};
+ std::atomic_int64_t m_PendingRequests{0};
+ RwLock m_RequestQueueLock;
+ std::deque<QueuedRequest> m_RequestQueue;
+
+ void WriterThreadMain();
};
//////////////////////////////////////////////////////////////////////////
@@ -454,7 +474,7 @@ RecordedRequestsSegmentWriter::EndWrite()
swap(m_Entries, EmptyEntries);
}
-uint64_t
+void
RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
const uint64_t RequestBufferSize = RequestBuffer.GetSize();
@@ -502,12 +522,12 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
// location
RwLock::ExclusiveLockScope _(m_Lock);
m_Entries[RequestIndex].Length = 0;
- return ~0ull;
+ return;
}
m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
- return RequestIndex;
+ return;
}
}
@@ -536,18 +556,16 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
{
RwLock::ExclusiveLockScope _(m_Lock);
m_Entries[RequestIndex].Length = 0;
- return ~0ull;
}
-
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
-
- return RequestIndex;
+ else
+ {
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ }
}
catch (std::exception&)
{
RwLock::ExclusiveLockScope _(m_Lock);
m_Entries[RequestIndex].Length = 0;
- return ~0ull;
}
}
@@ -621,11 +639,125 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB
return RequestInfo;
}
+//////////////////////////////////////////////////////////////////////////
+
+RecordedRequestsWriter::RecordedRequestsWriter()
+{
+}
+
+RecordedRequestsWriter::~RecordedRequestsWriter()
+{
+ EndWrite();
+}
+
void
RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
{
m_BasePath = BasePath;
+ m_IsActive = true;
+
+ m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
+}
+
+void
+RecordedRequestsWriter::EndWrite()
+{
+ if (m_WriterThread)
+ {
+ m_IsActive = false;
+ const int64_t PendingCount = m_PendingRequests.fetch_add(1);
+ m_PendingRequests.notify_all();
+
+ if (PendingCount)
+ {
+ ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ }
+
+ if (m_WriterThread->joinable())
+ {
+ m_WriterThread->join();
+ }
+
+ m_WriterThread.reset();
+ }
+}
+
+void
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ if (m_IsActive)
+ {
+ IoBuffer OwnedRequest = RequestBuffer;
+ OwnedRequest.MakeOwned();
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)});
+ m_PendingRequests.fetch_add(1);
+ }
+
+ m_PendingRequests.notify_all();
+ }
+}
+
+void
+RecordedRequestsWriter::WriterThreadMain()
+{
+ SetCurrentThreadName("rpc_writer");
EnsureCurrentSegment();
+
+ while (m_IsActive)
+ {
+ m_PendingRequests.wait(0);
+
+ while (m_PendingRequests)
+ {
+ RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ if (!m_RequestQueue.empty())
+ {
+ bool DrainBacklog = false;
+
+ do
+ {
+ QueuedRequest Request = m_RequestQueue.front();
+
+ m_RequestQueue.pop_front();
+ m_PendingRequests.fetch_sub(1);
+
+ // For a sufficiently large backlog, keep blocking queueing operations
+ // until we get below the threshold
+ DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount;
+
+ if (!DrainBacklog)
+ {
+ _.ReleaseNow();
+ }
+
+ try
+ {
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+ Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
+ }
+ catch (std::exception&)
+ {
+ // TODO: what's the right behaviour here? The most likely cause would
+ // be some I/O error and we probably ought to just shut down recording
+ // at that point
+ }
+ } while (DrainBacklog);
+ }
+ else
+ {
+ // shutdown increments this counter so we need to decrement it
+ // here even though we didn't process any request
+ m_PendingRequests.fetch_sub(1);
+ }
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+ CommitCurrentSegment(_);
+ WriteRecordingMetadata();
}
RecordedRequestsSegmentWriter&
@@ -693,26 +825,6 @@ RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
}
void
-RecordedRequestsWriter::EndWrite()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- CommitCurrentSegment(_);
-
- WriteRecordingMetadata();
-}
-
-uint64_t
-RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
-{
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
-
- const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
-
- return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
-}
-
-void
RecordedRequestsWriter::WriteRecordingMetadata()
{
try
@@ -918,12 +1030,10 @@ public:
DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
- virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
private:
RecordedRequestsWriter m_RecordedRequests;
@@ -964,7 +1074,6 @@ public:
{
return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
}
- virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
private:
std::uint64_t m_RequestCount;
@@ -1029,9 +1138,7 @@ TEST_CASE("rpc.record")
CbObject Req = RequestPayload.Save();
IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
- const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer);
-
- CHECK(Index == i);
+ Recorder.RecordRequest(RequestInfo, RequestBuffer);
}
}