diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 10:13:21 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 10:13:21 +0100 |
| commit | 6aff05d23520dc8883973b9a29aa77b4a4638205 (patch) | |
| tree | f88d6e1da180674962c802f8d42fffb9c17a8b09 /src | |
| parent | Fix crash bug when trying to inspect non-open block file in GC (#614) (diff) | |
| download | zen-6aff05d23520dc8883973b9a29aa77b4a4638205.tar.xz zen-6aff05d23520dc8883973b9a29aa77b4a4638205.zip | |
cache RPC recorder threading fixes (#617)
* ensure all access to m_Entries is done while holding lock
* RPC recorder concurrency fixes - setup/teardown of recorder needs to be done while holding an exclusive lock. Calls into recorder should be done while holding a shared lock.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 71 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.h | 6 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 87 |
3 files changed, 110 insertions, 54 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 8db96f914..f61fbd8bc 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); @@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) if (Key == HttpZCacheUtilStartRecording) { - m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + m_RequestRecordingEnabled.store(true); + } + ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilStopRecording) { - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + ZEN_INFO("cache RPC recording STOPPED"); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilReplayRecording) { CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); @@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) ThreadCount = gsl::narrow<uint32_t>(Value.value()); } } + + ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); + + ZEN_INFO("cache RPC replay STARTED"); + Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key.starts_with(HttpZCacheDetailsPrefix)) { HandleDetailsRequest(Request); @@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { uint64_t RequestIndex = ~0ull; - if (m_RequestRecorder) + if (m_RequestRecordingEnabled) { - RequestIndex = m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + RequestIndex = m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } } uint32_t AcceptMagic = 0; @@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } @@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 57a533029..2feaaead8 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -190,6 +190,12 @@ private: void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); + // This exists to avoid taking locks when recording is not enabled + std::atomic_bool m_RequestRecordingEnabled{false}; + + // This lock should be taken in SHARED mode when calling into the recorder, + // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed + RwLock m_RequestRecordingLock; std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; }; diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index c782f0920..b8f9d65ef 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -308,6 +308,7 @@ struct RecordedRequestsSegmentWriter return m_RequestCount; } + RwLock::SharedLockScope _(m_Lock); return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } @@ -452,53 +453,58 @@ uint64_t RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + uint64_t RequestIndex = ~0ull; - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, - .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), - .ContentType = RequestInfo.ContentType, - .AcceptType = RequestInfo.AcceptType, - .OffsetHigh = 0, - .Padding2 = 0, - .SessionId = RequestInfo.SessionId}); - - if (Entry.Length < StandaloneFileSizeThreshold) { - const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + RwLock::ExclusiveLockScope Lock(m_Lock); + RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .OffsetHigh = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); - if (BlockIndex == m_BlockFiles.size()) + if (Entry.Length < StandaloneFileSizeThreshold) { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - ++m_FileCount; - } + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } - // Note that this is the overall logical offset, not the offset within a single file - const uint64_t ChunkWriteOffset = m_ChunkOffset; - m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); - Entry.SetOffset(ChunkWriteOffset); + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); + Entry.SetOffset(ChunkWriteOffset); + Lock.ReleaseNow(); - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); + if (Ec) + { + // We cannot simply use `Entry` here because the vector may + // have been reallocated causing the entry to be in a different + // location + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + return ~0ull; + } - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); - return RequestIndex; + return RequestIndex; + } } - Lock.ReleaseNow(); // Write request data to standalone file @@ -512,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read - Entry.Length = std::numeric_limits<uint32_t>::max(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max(); } ++m_FileCount; @@ -522,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn if (Ec) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } @@ -532,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn } catch (std::exception&) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } } |