aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 10:13:21 +0100
committerGitHub <[email protected]>2023-12-19 10:13:21 +0100
commit6aff05d23520dc8883973b9a29aa77b4a4638205 (patch)
treef88d6e1da180674962c802f8d42fffb9c17a8b09 /src
parentFix crash bug when trying to inspect non-open block file in GC (#614) (diff)
downloadzen-6aff05d23520dc8883973b9a29aa77b4a4638205.tar.xz
zen-6aff05d23520dc8883973b9a29aa77b4a4638205.zip
cache RPC recorder threading fixes (#617)
* ensure all access to m_Entries is done while holding lock * RPC recorder concurrency fixes - setup/teardown of recorder needs to be done while holding an exclusive lock. Calls into recorder should be done while holding a shared lock.
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp71
-rw-r--r--src/zenserver/cache/httpstructuredcache.h6
-rw-r--r--src/zenutil/cache/rpcrecording.cpp87
3 files changed, 110 insertions, 54 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 8db96f914..f61fbd8bc 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
HttpStructuredCacheService::~HttpStructuredCacheService()
{
ZEN_INFO("closing structured cache");
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
m_StatsService.UnregisterHandler("z$", *this);
m_StatusService.UnregisterHandler("z$", *this);
@@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
if (Key == HttpZCacheUtilStartRecording)
{
- m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+ m_RequestRecordingEnabled.store(true);
+ }
+ ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilStopRecording)
{
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+ ZEN_INFO("cache RPC recording STOPPED");
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilReplayRecording)
{
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
uint32_t ThreadCount = std::thread::hardware_concurrency();
@@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
ThreadCount = gsl::narrow<uint32_t>(Value.value());
}
}
+
+ ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath);
+
std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+
+ ZEN_INFO("cache RPC replay STARTED");
+
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key.starts_with(HttpZCacheDetailsPrefix))
{
HandleDetailsRequest(Request);
@@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
uint64_t RequestIndex = ~0ull;
- if (m_RequestRecorder)
+ if (m_RequestRecordingEnabled)
{
- RequestIndex = m_RequestRecorder->RecordRequest(
- {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
- Body);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ RequestIndex = m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
+ }
}
uint32_t AcceptMagic = 0;
@@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
@@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 57a533029..2feaaead8 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -190,6 +190,12 @@ private:
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
+ // This exists to avoid taking locks when recording is not enabled
+ std::atomic_bool m_RequestRecordingEnabled{false};
+
+ // This lock should be taken in SHARED mode when calling into the recorder,
+ // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed
+ RwLock m_RequestRecordingLock;
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index c782f0920..b8f9d65ef 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -308,6 +308,7 @@ struct RecordedRequestsSegmentWriter
return m_RequestCount;
}
+ RwLock::SharedLockScope _(m_Lock);
return m_Entries.size();
}
inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
@@ -452,53 +453,58 @@ uint64_t
RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+ uint64_t RequestIndex = ~0ull;
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
- .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
- .ContentType = RequestInfo.ContentType,
- .AcceptType = RequestInfo.AcceptType,
- .OffsetHigh = 0,
- .Padding2 = 0,
- .SessionId = RequestInfo.SessionId});
-
- if (Entry.Length < StandaloneFileSizeThreshold)
{
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .OffsetHigh = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
- if (BlockIndex == m_BlockFiles.size())
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- ++m_FileCount;
- }
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
- // Note that this is the overall logical offset, not the offset within a single file
- const uint64_t ChunkWriteOffset = m_ChunkOffset;
- m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
- Entry.SetOffset(ChunkWriteOffset);
+ // Note that this is the overall logical offset, not the offset within a single file
+ const uint64_t ChunkWriteOffset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
+ Entry.SetOffset(ChunkWriteOffset);
+ Lock.ReleaseNow();
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
+ if (Ec)
+ {
+ // We cannot simply use `Entry` here because the vector may
+ // have been reallocated causing the entry to be in a different
+ // location
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ return ~0ull;
+ }
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
- return RequestIndex;
+ return RequestIndex;
+ }
}
- Lock.ReleaseNow();
// Write request data to standalone file
@@ -512,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
// The exact value of the entry is not important, we will use
// the size of the standalone file regardless when performing
// the read
- Entry.Length = std::numeric_limits<uint32_t>::max();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max();
}
++m_FileCount;
@@ -522,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
if (Ec)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
@@ -532,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
}
catch (std::exception&)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
}