aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cache
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 10:13:21 +0100
committerGitHub <[email protected]>2023-12-19 10:13:21 +0100
commit6aff05d23520dc8883973b9a29aa77b4a4638205 (patch)
treef88d6e1da180674962c802f8d42fffb9c17a8b09 /src/zenutil/cache
parentFix crash bug when trying to inspect non-open block file in GC (#614) (diff)
downloadzen-6aff05d23520dc8883973b9a29aa77b4a4638205.tar.xz
zen-6aff05d23520dc8883973b9a29aa77b4a4638205.zip
cache RPC recorder threading fixes (#617)
* ensure all access to m_Entries is done while holding lock * RPC recorder concurrency fixes - setup/teardown of recorder needs to be done while holding an exclusive lock. Calls into recorder should be done while holding a shared lock.
Diffstat (limited to 'src/zenutil/cache')
-rw-r--r--src/zenutil/cache/rpcrecording.cpp87
1 files changed, 48 insertions, 39 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index c782f0920..b8f9d65ef 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -308,6 +308,7 @@ struct RecordedRequestsSegmentWriter
return m_RequestCount;
}
+ RwLock::SharedLockScope _(m_Lock);
return m_Entries.size();
}
inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
@@ -452,53 +453,58 @@ uint64_t
RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+ uint64_t RequestIndex = ~0ull;
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
- .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
- .ContentType = RequestInfo.ContentType,
- .AcceptType = RequestInfo.AcceptType,
- .OffsetHigh = 0,
- .Padding2 = 0,
- .SessionId = RequestInfo.SessionId});
-
- if (Entry.Length < StandaloneFileSizeThreshold)
{
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .OffsetHigh = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
- if (BlockIndex == m_BlockFiles.size())
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- ++m_FileCount;
- }
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
- // Note that this is the overall logical offset, not the offset within a single file
- const uint64_t ChunkWriteOffset = m_ChunkOffset;
- m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
- Entry.SetOffset(ChunkWriteOffset);
+ // Note that this is the overall logical offset, not the offset within a single file
+ const uint64_t ChunkWriteOffset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
+ Entry.SetOffset(ChunkWriteOffset);
+ Lock.ReleaseNow();
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
+ if (Ec)
+ {
+ // We cannot simply use `Entry` here because the vector may
+ // have been reallocated causing the entry to be in a different
+ // location
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ return ~0ull;
+ }
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
- return RequestIndex;
+ return RequestIndex;
+ }
}
- Lock.ReleaseNow();
// Write request data to standalone file
@@ -512,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
// The exact value of the entry is not important, we will use
// the size of the standalone file regardless when performing
// the read
- Entry.Length = std::numeric_limits<uint32_t>::max();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max();
}
++m_FileCount;
@@ -522,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
if (Ec)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
@@ -532,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
}
catch (std::exception&)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
}