aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2024-01-31 15:55:45 +0100
committerGitHub <[email protected]>2024-01-31 15:55:45 +0100
commit8cb2b93f31c59b3261951a70c48e5da548194002 (patch)
treeb4e981b148e2cb1c31ade0fcabb1901caf13ae6b /src
parentonly try to traverse an objectstore bucket if it really exists (#646) (diff)
downloadzen-8cb2b93f31c59b3261951a70c48e5da548194002.tar.xz
zen-8cb2b93f31c59b3261951a70c48e5da548194002.zip
changed RPC recording to MPSC setup (#638)
fixes rare race condition when using RPC recording for long periods of time
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/iobuffer.h2
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp22
-rw-r--r--src/zenutil/cache/rpcrecording.cpp235
-rw-r--r--src/zenutil/include/zenutil/cache/rpcrecording.h9
4 files changed, 176 insertions, 92 deletions
diff --git a/src/zencore/include/zencore/iobuffer.h b/src/zencore/include/zencore/iobuffer.h
index b9e503354..dcf1b4db8 100644
--- a/src/zencore/include/zencore/iobuffer.h
+++ b/src/zencore/include/zencore/iobuffer.h
@@ -379,7 +379,7 @@ public:
inline explicit operator bool() const { return !m_Core->IsNull(); }
inline operator MemoryView() const& { return MemoryView(m_Core->DataPointer(), m_Core->DataBytes()); }
- inline void MakeOwned() { return m_Core->MakeOwned(); }
+ inline void MakeOwned() const { return m_Core->MakeOwned(); }
[[nodiscard]] inline bool IsOwned() const { return m_Core->IsOwned(); }
[[nodiscard]] inline bool IsWholeFile() const { return m_Core->IsWholeFile(); }
[[nodiscard]] void* MutableData() const { return m_Core->MutableDataPointer(); }
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index c62b5325e..95c85d6c8 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1665,14 +1665,12 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
auto HandleRpc =
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
- uint64_t RequestIndex = ~0ull;
-
if (m_RequestRecordingEnabled)
{
RwLock::SharedLockScope _(m_RequestRecordingLock);
if (m_RequestRecorder)
{
- RequestIndex = m_RequestRecorder->RecordRequest(
+ m_RequestRecorder->RecordRequest(
{.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
Body);
}
@@ -1712,14 +1710,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId);
}
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
- if (RequestIndex != ~0ull)
- {
- RwLock::SharedLockScope _(m_RequestRecordingLock);
- if (m_RequestRecorder)
- {
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- }
AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
@@ -1727,16 +1717,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
BinaryWriter MemStream;
RpcResult.Save(MemStream);
- if (RequestIndex != ~0ull)
- {
- RwLock::SharedLockScope _(m_RequestRecordingLock);
- if (m_RequestRecorder)
- {
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- }
AsyncRequest.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 5e00e8852..759af792d 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -16,6 +16,9 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <deque>
+#include <thread>
+
namespace zen::cache {
const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
@@ -84,7 +87,7 @@ struct RecordedRequestsWriter
m_Entries.clear();
}
- uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
RwLock::ExclusiveLockScope Lock(m_Lock);
uint64_t RequestIndex = m_Entries.size();
@@ -115,22 +118,21 @@ struct RecordedRequestsWriter
if (Ec)
{
Entry.Length = 0;
- return ~0ull;
}
- return RequestIndex;
}
- Lock.ReleaseNow();
-
- BasicFile RequestFile;
- RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
- std::error_code Ec;
- RequestFile.WriteAll(RequestBuffer, Ec);
- if (Ec)
+ else
{
- Entry.Length = 0;
- return ~0ull;
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ }
}
- return RequestIndex;
}
std::filesystem::path m_BasePath;
@@ -214,12 +216,10 @@ public:
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
private:
- virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
RecordedRequestsWriter m_RecordedRequests;
};
@@ -239,7 +239,6 @@ public:
{
return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
}
- virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
private:
std::uint64_t m_RequestCount;
@@ -282,6 +281,7 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
// for performance
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+const int64_t MaximumBacklogCount = 2000;
std::string
MakeSegmentPath(uint64_t SegmentIndex)
@@ -297,9 +297,9 @@ struct RecordedRequestsSegmentWriter
RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
- void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
- uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
- void EndWrite();
+ void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ void EndWrite();
inline uint64_t GetRequestCount() const
{
@@ -335,12 +335,12 @@ private:
struct RecordedRequestsWriter
{
- void BeginWrite(const std::filesystem::path& BasePath);
- RecordedRequestsSegmentWriter& EnsureCurrentSegment();
- void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
- void EndWrite();
- void WriteRecordingMetadata();
- uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ RecordedRequestsWriter();
+ ~RecordedRequestsWriter();
+
+ void BeginWrite(const std::filesystem::path& BasePath);
+ void EndWrite();
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
private:
std::filesystem::path m_BasePath;
@@ -351,6 +351,26 @@ private:
std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
std::vector<uint64_t> m_SegmentBaseIndex;
uint64_t m_NextSegmentBaseIndex = 0;
+
+ RecordedRequestsSegmentWriter& EnsureCurrentSegment();
+ void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
+ void WriteRecordingMetadata();
+
+ // I/O thread state
+
+ struct QueuedRequest
+ {
+ RecordedRequestInfo RequestInfo;
+ IoBuffer RequestBuffer;
+ };
+
+ std::unique_ptr<std::thread> m_WriterThread;
+ std::atomic_bool m_IsActive{false};
+ std::atomic_int64_t m_PendingRequests{0};
+ RwLock m_RequestQueueLock;
+ std::deque<QueuedRequest> m_RequestQueue;
+
+ void WriterThreadMain();
};
//////////////////////////////////////////////////////////////////////////
@@ -454,7 +474,7 @@ RecordedRequestsSegmentWriter::EndWrite()
swap(m_Entries, EmptyEntries);
}
-uint64_t
+void
RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
const uint64_t RequestBufferSize = RequestBuffer.GetSize();
@@ -502,12 +522,12 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
// location
RwLock::ExclusiveLockScope _(m_Lock);
m_Entries[RequestIndex].Length = 0;
- return ~0ull;
+ return;
}
m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
- return RequestIndex;
+ return;
}
}
@@ -536,18 +556,16 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
{
RwLock::ExclusiveLockScope _(m_Lock);
m_Entries[RequestIndex].Length = 0;
- return ~0ull;
}
-
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
-
- return RequestIndex;
+ else
+ {
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ }
}
catch (std::exception&)
{
RwLock::ExclusiveLockScope _(m_Lock);
m_Entries[RequestIndex].Length = 0;
- return ~0ull;
}
}
@@ -621,11 +639,125 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB
return RequestInfo;
}
+//////////////////////////////////////////////////////////////////////////
+
+RecordedRequestsWriter::RecordedRequestsWriter()
+{
+}
+
+RecordedRequestsWriter::~RecordedRequestsWriter()
+{
+ EndWrite();
+}
+
void
RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
{
m_BasePath = BasePath;
+ m_IsActive = true;
+
+ m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
+}
+
+void
+RecordedRequestsWriter::EndWrite()
+{
+ if (m_WriterThread)
+ {
+ m_IsActive = false;
+ const int64_t PendingCount = m_PendingRequests.fetch_add(1);
+ m_PendingRequests.notify_all();
+
+ if (PendingCount)
+ {
+ ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ }
+
+ if (m_WriterThread->joinable())
+ {
+ m_WriterThread->join();
+ }
+
+ m_WriterThread.reset();
+ }
+}
+
+void
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ if (m_IsActive)
+ {
+ IoBuffer OwnedRequest = RequestBuffer;
+ OwnedRequest.MakeOwned();
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)});
+ m_PendingRequests.fetch_add(1);
+ }
+
+ m_PendingRequests.notify_all();
+ }
+}
+
+void
+RecordedRequestsWriter::WriterThreadMain()
+{
+ SetCurrentThreadName("rpc_writer");
EnsureCurrentSegment();
+
+ while (m_IsActive)
+ {
+ m_PendingRequests.wait(0);
+
+ while (m_PendingRequests)
+ {
+ RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ if (!m_RequestQueue.empty())
+ {
+ bool DrainBacklog = false;
+
+ do
+ {
+ QueuedRequest Request = m_RequestQueue.front();
+
+ m_RequestQueue.pop_front();
+ m_PendingRequests.fetch_sub(1);
+
+ // For a sufficiently large backlog, keep blocking queueing operations
+ // until we get below the threshold
+ DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount;
+
+ if (!DrainBacklog)
+ {
+ _.ReleaseNow();
+ }
+
+ try
+ {
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+ Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
+ }
+ catch (std::exception&)
+ {
+ // TODO: what's the right behaviour here? The most likely cause would
+ // be some I/O error and we probably ought to just shut down recording
+ // at that point
+ }
+ } while (DrainBacklog);
+ }
+ else
+ {
+ // shutdown increments this counter so we need to decrement it
+ // here even though we didn't process any request
+ m_PendingRequests.fetch_sub(1);
+ }
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+ CommitCurrentSegment(_);
+ WriteRecordingMetadata();
}
RecordedRequestsSegmentWriter&
@@ -693,26 +825,6 @@ RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
}
void
-RecordedRequestsWriter::EndWrite()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- CommitCurrentSegment(_);
-
- WriteRecordingMetadata();
-}
-
-uint64_t
-RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
-{
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
-
- const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
-
- return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
-}
-
-void
RecordedRequestsWriter::WriteRecordingMetadata()
{
try
@@ -918,12 +1030,10 @@ public:
DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
- virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
private:
RecordedRequestsWriter m_RecordedRequests;
@@ -964,7 +1074,6 @@ public:
{
return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
}
- virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
private:
std::uint64_t m_RequestCount;
@@ -1029,9 +1138,7 @@ TEST_CASE("rpc.record")
CbObject Req = RequestPayload.Save();
IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
- const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer);
-
- CHECK(Index == i);
+ Recorder.RecordRequest(RequestInfo, RequestBuffer);
}
}
diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h
index ab9b92dd3..f1ad35413 100644
--- a/src/zenutil/include/zenutil/cache/rpcrecording.h
+++ b/src/zenutil/include/zenutil/cache/rpcrecording.h
@@ -24,18 +24,15 @@ class IRpcRequestRecorder
{
public:
virtual ~IRpcRequestRecorder() {}
- virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0;
- virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0;
- virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0;
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0;
};
class IRpcRequestReplayer
{
public:
virtual ~IRpcRequestReplayer() {}
- virtual uint64_t GetRequestCount() const = 0;
- virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
- virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+ virtual uint64_t GetRequestCount() const = 0;
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
};
std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);