diff options
| author | Stefan Boberg <[email protected]> | 2024-01-31 15:55:45 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-31 15:55:45 +0100 |
| commit | 8cb2b93f31c59b3261951a70c48e5da548194002 (patch) | |
| tree | b4e981b148e2cb1c31ade0fcabb1901caf13ae6b /src | |
| parent | only try to traverse an objectstore bucket if it really exists (#646) (diff) | |
| download | zen-8cb2b93f31c59b3261951a70c48e5da548194002.tar.xz zen-8cb2b93f31c59b3261951a70c48e5da548194002.zip | |
changed RPC recording to MPSC setup (#638)
fixes rare race condition when using RPC recording for long periods of time
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/iobuffer.h | 2 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 22 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 235 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 9 |
4 files changed, 176 insertions, 92 deletions
diff --git a/src/zencore/include/zencore/iobuffer.h b/src/zencore/include/zencore/iobuffer.h index b9e503354..dcf1b4db8 100644 --- a/src/zencore/include/zencore/iobuffer.h +++ b/src/zencore/include/zencore/iobuffer.h @@ -379,7 +379,7 @@ public: inline explicit operator bool() const { return !m_Core->IsNull(); } inline operator MemoryView() const& { return MemoryView(m_Core->DataPointer(), m_Core->DataBytes()); } - inline void MakeOwned() { return m_Core->MakeOwned(); } + inline void MakeOwned() const { return m_Core->MakeOwned(); } [[nodiscard]] inline bool IsOwned() const { return m_Core->IsOwned(); } [[nodiscard]] inline bool IsWholeFile() const { return m_Core->IsWholeFile(); } [[nodiscard]] void* MutableData() const { return m_Core->MutableDataPointer(); } diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index c62b5325e..95c85d6c8 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1665,14 +1665,12 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) auto HandleRpc = [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - uint64_t RequestIndex = ~0ull; - if (m_RequestRecordingEnabled) { RwLock::SharedLockScope _(m_RequestRecordingLock); if (m_RequestRecorder) { - RequestIndex = m_RequestRecorder->RecordRequest( + m_RequestRecorder->RecordRequest( {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, Body); } @@ -1712,14 +1710,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); } CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); - if (RequestIndex != ~0ull) - { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); - } - } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else @@ -1727,16 +1717,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) BinaryWriter MemStream; RpcResult.Save(MemStream); - if (RequestIndex != ~0ull) - { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 5e00e8852..759af792d 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -16,6 +16,9 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <deque> +#include <thread> + namespace zen::cache { const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, @@ -84,7 +87,7 @@ struct RecordedRequestsWriter m_Entries.clear(); } - uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) + void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); @@ -115,22 +118,21 @@ struct RecordedRequestsWriter if (Ec) { Entry.Length = 0; - return ~0ull; } - return RequestIndex; } - Lock.ReleaseNow(); - - BasicFile RequestFile; - RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); - std::error_code Ec; - RequestFile.WriteAll(RequestBuffer, Ec); - if (Ec) + else { - Entry.Length = 0; - return ~0ull; + Lock.ReleaseNow(); + + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + if (Ec) + { + Entry.Length = 0; + } } - return RequestIndex; } std::filesystem::path m_BasePath; @@ -214,12 +216,10 @@ public: virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: - virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { - return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } - virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} RecordedRequestsWriter m_RecordedRequests; }; @@ -239,7 +239,6 @@ public: { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } - virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } private: std::uint64_t m_RequestCount; @@ -282,6 +281,7 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try // for performance const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; +const int64_t MaximumBacklogCount = 2000; std::string MakeSegmentPath(uint64_t SegmentIndex) @@ -297,9 +297,9 @@ struct RecordedRequestsSegmentWriter RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; - void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); - uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); - void EndWrite(); + void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); + void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + void EndWrite(); inline uint64_t GetRequestCount() const { @@ -335,12 +335,12 @@ private: struct RecordedRequestsWriter { - void BeginWrite(const std::filesystem::path& BasePath); - RecordedRequestsSegmentWriter& EnsureCurrentSegment(); - void CommitCurrentSegment(RwLock::ExclusiveLockScope&); - void EndWrite(); - void WriteRecordingMetadata(); - uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + RecordedRequestsWriter(); + ~RecordedRequestsWriter(); + + void BeginWrite(const std::filesystem::path& BasePath); + void EndWrite(); + void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); private: std::filesystem::path m_BasePath; @@ -351,6 +351,26 @@ private: std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments; std::vector<uint64_t> m_SegmentBaseIndex; uint64_t m_NextSegmentBaseIndex = 0; + + RecordedRequestsSegmentWriter& EnsureCurrentSegment(); + void CommitCurrentSegment(RwLock::ExclusiveLockScope&); + void WriteRecordingMetadata(); + + // I/O thread state + + struct QueuedRequest + { + RecordedRequestInfo RequestInfo; + IoBuffer RequestBuffer; + }; + + std::unique_ptr<std::thread> m_WriterThread; + std::atomic_bool m_IsActive{false}; + std::atomic_int64_t m_PendingRequests{0}; + RwLock m_RequestQueueLock; + std::deque<QueuedRequest> m_RequestQueue; + + void WriterThreadMain(); }; ////////////////////////////////////////////////////////////////////////// @@ -454,7 +474,7 @@ RecordedRequestsSegmentWriter::EndWrite() swap(m_Entries, EmptyEntries); } -uint64_t +void RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); @@ -502,12 +522,12 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn // location RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; - return ~0ull; + return; } m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); - return RequestIndex; + return; } } @@ -536,18 +556,16 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; - return ~0ull; } - - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); - - return RequestIndex; + else + { + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + } } catch (std::exception&) { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; - return ~0ull; } } @@ -621,11 +639,125 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB return RequestInfo; } +////////////////////////////////////////////////////////////////////////// + +RecordedRequestsWriter::RecordedRequestsWriter() +{ +} + +RecordedRequestsWriter::~RecordedRequestsWriter() +{ + EndWrite(); +} + void RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; + m_IsActive = true; + + m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); +} + +void +RecordedRequestsWriter::EndWrite() +{ + if (m_WriterThread) + { + m_IsActive = false; + const int64_t PendingCount = m_PendingRequests.fetch_add(1); + m_PendingRequests.notify_all(); + + if (PendingCount) + { + ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); + } + + if (m_WriterThread->joinable()) + { + m_WriterThread->join(); + } + + m_WriterThread.reset(); + } +} + +void +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + if (m_IsActive) + { + IoBuffer OwnedRequest = RequestBuffer; + OwnedRequest.MakeOwned(); + + { + RwLock::ExclusiveLockScope _(m_RequestQueueLock); + m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); + m_PendingRequests.fetch_add(1); + } + + m_PendingRequests.notify_all(); + } +} + +void +RecordedRequestsWriter::WriterThreadMain() +{ + SetCurrentThreadName("rpc_writer"); EnsureCurrentSegment(); + + while (m_IsActive) + { + m_PendingRequests.wait(0); + + while (m_PendingRequests) + { + RwLock::ExclusiveLockScope _(m_RequestQueueLock); + if (!m_RequestQueue.empty()) + { + bool DrainBacklog = false; + + do + { + QueuedRequest Request = m_RequestQueue.front(); + + m_RequestQueue.pop_front(); + m_PendingRequests.fetch_sub(1); + + // For a sufficiently large backlog, keep blocking queueing operations + // until we get below the threshold + DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; + + if (!DrainBacklog) + { + _.ReleaseNow(); + } + + try + { + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); + } + catch (std::exception&) + { + // TODO: what's the right behaviour here? The most likely cause would + // be some I/O error and we probably ought to just shut down recording + // at that point + } + } while (DrainBacklog); + } + else + { + // shutdown increments this counter so we need to decrement it + // here even though we didn't process any request + m_PendingRequests.fetch_sub(1); + } + } + } + + RwLock::ExclusiveLockScope _(m_Lock); + CommitCurrentSegment(_); + WriteRecordingMetadata(); } RecordedRequestsSegmentWriter& @@ -693,26 +825,6 @@ RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) } void -RecordedRequestsWriter::EndWrite() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - CommitCurrentSegment(_); - - WriteRecordingMetadata(); -} - -uint64_t -RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) -{ - RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); - - const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); - - return Writer.GetBaseRequestIndex() + SegmentLocalIndex; -} - -void RecordedRequestsWriter::WriteRecordingMetadata() { try @@ -918,12 +1030,10 @@ public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } - virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { - return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } - virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} private: RecordedRequestsWriter m_RecordedRequests; @@ -964,7 +1074,6 @@ public: { return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } - virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } private: std::uint64_t m_RequestCount; @@ -1029,9 +1138,7 @@ TEST_CASE("rpc.record") CbObject Req = RequestPayload.Save(); IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); - const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer); - - CHECK(Index == i); + Recorder.RecordRequest(RequestInfo, RequestBuffer); } } diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index ab9b92dd3..f1ad35413 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -24,18 +24,15 @@ class IRpcRequestRecorder { public: virtual ~IRpcRequestRecorder() {} - virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0; - virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; - virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; + virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0; }; class IRpcRequestReplayer { public: virtual ~IRpcRequestReplayer() {} - virtual uint64_t GetRequestCount() const = 0; - virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; - virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual uint64_t GetRequestCount() const = 0; + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; }; std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); |