diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 10:13:21 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 10:13:21 +0100 |
| commit | 6aff05d23520dc8883973b9a29aa77b4a4638205 (patch) | |
| tree | f88d6e1da180674962c802f8d42fffb9c17a8b09 /src/zenutil/cache | |
| parent | Fix crash bug when trying to inspect non-open block file in GC (#614) (diff) | |
| download | zen-6aff05d23520dc8883973b9a29aa77b4a4638205.tar.xz zen-6aff05d23520dc8883973b9a29aa77b4a4638205.zip | |
cache RPC recorder threading fixes (#617)
* ensure all access to m_Entries is done while holding lock
* RPC recorder concurrency fixes - setup/teardown of recorder needs to be done while holding an exclusive lock. Calls into recorder should be done while holding a shared lock.
Diffstat (limited to 'src/zenutil/cache')
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 87 |
1 files changed, 48 insertions, 39 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index c782f0920..b8f9d65ef 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -308,6 +308,7 @@ struct RecordedRequestsSegmentWriter return m_RequestCount; } + RwLock::SharedLockScope _(m_Lock); return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } @@ -452,53 +453,58 @@ uint64_t RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + uint64_t RequestIndex = ~0ull; - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, - .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), - .ContentType = RequestInfo.ContentType, - .AcceptType = RequestInfo.AcceptType, - .OffsetHigh = 0, - .Padding2 = 0, - .SessionId = RequestInfo.SessionId}); - - if (Entry.Length < StandaloneFileSizeThreshold) { - const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + RwLock::ExclusiveLockScope Lock(m_Lock); + RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .OffsetHigh = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); - if (BlockIndex == m_BlockFiles.size()) + if (Entry.Length < StandaloneFileSizeThreshold) { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - ++m_FileCount; - } + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } - // Note that this is the overall logical offset, not the offset within a single file - const uint64_t ChunkWriteOffset = m_ChunkOffset; - m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); - Entry.SetOffset(ChunkWriteOffset); + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); + Entry.SetOffset(ChunkWriteOffset); + Lock.ReleaseNow(); - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); + if (Ec) + { + // We cannot simply use `Entry` here because the vector may + // have been reallocated causing the entry to be in a different + // location + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + return ~0ull; + } - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); - return RequestIndex; + return RequestIndex; + } } - Lock.ReleaseNow(); // Write request data to standalone file @@ -512,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read - Entry.Length = std::numeric_limits<uint32_t>::max(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max(); } ++m_FileCount; @@ -522,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn if (Ec) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } @@ -532,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn } catch (std::exception&) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } } |