diff options
| author | zousar <[email protected]> | 2025-08-07 01:06:03 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-08-07 01:06:03 -0600 |
| commit | f9b2a1e64e4077925c4a57ac3fc2367c698dada1 (patch) | |
| tree | 228955c39520e668e551a67520bf4740b24a61c7 | |
| parent | Moving put rejections to happen in batch handling (diff) | |
| download | zen-f9b2a1e64e4077925c4a57ac3fc2367c698dada1.tar.xz zen-f9b2a1e64e4077925c4a57ac3fc2367c698dada1.zip | |
Avoid committing chunks for batch rejected puts
Previously rejected puts would put the chunks, but not write them to the index, which was wrong.
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 77 |
1 files changed, 35 insertions, 42 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 33d8489c6..4535f785b 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1400,56 +1400,55 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector<uint8_t> EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector<uint8_t> EntryFlags; + std::vector<size_t> BufferToEntryIndexes; + std::vector<IoBuffer> BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[Index]; + std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[Index].Overwrite, + OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(TemporaryValue.Value); + + uint8_t Flags = 0; + if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; - std::vector<ZenCacheDiskLayer::PutResult> OutResults; - OutResults.reserve(Locations.size()); { - // Initial pass without an exclusive index lock to process put rejections - for (size_t Index = 0; Index < Locations.size(); Index++) - { - const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 0); - - ZenCacheValue TemporaryValue; - TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; - std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); - OutResults.push_back({zen::PutStatus::Success}); - ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[IndexOffset + Index].Overwrite, - OutResults.back()); - } RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { - if (OutResults[Index].Status != zen::PutStatus::Success) - { - // The put was rejected, skip any effort to commit it. - continue; - } - DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + const std::vector<IoHash>& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); @@ -1485,12 +1484,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); - } IndexOffset += Locations.size(); }); } |