aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-08-07 01:06:03 -0600
committerzousar <[email protected]>2025-08-07 01:06:03 -0600
commitf9b2a1e64e4077925c4a57ac3fc2367c698dada1 (patch)
tree228955c39520e668e551a67520bf4740b24a61c7 /src/zenstore/cache/cachedisklayer.cpp
parentMoving put rejections to happen in batch handling (diff)
downloadzen-f9b2a1e64e4077925c4a57ac3fc2367c698dada1.tar.xz
zen-f9b2a1e64e4077925c4a57ac3fc2367c698dada1.zip
Avoid committing chunks for batch rejected puts
Previously rejected puts would put the chunks, but not write them to the index, which was wrong.
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp77
1 files changed, 35 insertions, 42 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 33d8489c6..4535f785b 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1400,56 +1400,55 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
ZEN_ASSERT(Batch);
if (!Batch->Buffers.empty())
{
- std::vector<uint8_t> EntryFlags;
- for (const IoBuffer& Buffer : Batch->Buffers)
+ ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size());
+ std::vector<uint8_t> EntryFlags;
+ std::vector<size_t> BufferToEntryIndexes;
+ std::vector<IoBuffer> BuffersToCommit;
+ BuffersToCommit.reserve(Batch->Buffers.size());
+ for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
- uint8_t Flags = 0;
- if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() > 0);
+
+ ZenCacheValue TemporaryValue;
+ TemporaryValue.Value = Batch->Buffers[Index];
+ std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]];
+ OutResult = PutResult{zen::PutStatus::Success};
+ if (!ShouldRejectPut(HashKeyAndReferences[0],
+ TemporaryValue,
+ ReferenceSpan,
+ Batch->Entries[Index].Overwrite,
+ OutResult))
{
- Flags |= DiskLocation::kStructured;
- }
- else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
- {
- Flags |= DiskLocation::kCompressed;
+ BufferToEntryIndexes.push_back(Index);
+ BuffersToCommit.push_back(TemporaryValue.Value);
+
+ uint8_t Flags = 0;
+ if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
}
- EntryFlags.push_back(Flags);
}
size_t IndexOffset = 0;
- m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
- std::vector<ZenCacheDiskLayer::PutResult> OutResults;
- OutResults.reserve(Locations.size());
{
- // Initial pass without an exclusive index lock to process put rejections
- for (size_t Index = 0; Index < Locations.size(); Index++)
- {
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
- ZEN_ASSERT(HashKeyAndReferences.size() > 0);
-
- ZenCacheValue TemporaryValue;
- TemporaryValue.Value = Batch->Buffers[IndexOffset + Index];
- std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
- OutResults.push_back({zen::PutStatus::Success});
- ShouldRejectPut(HashKeyAndReferences[0],
- TemporaryValue,
- ReferenceSpan,
- Batch->Entries[IndexOffset + Index].Overwrite,
- OutResults.back());
- }
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
for (size_t Index = 0; Index < Locations.size(); Index++)
{
- if (OutResults[Index].Status != zen::PutStatus::Success)
- {
- // The put was rejected, skip any effort to commit it.
- continue;
- }
-
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ const std::vector<IoHash>& HashKeyAndReferences =
+ Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
ZEN_ASSERT(HashKeyAndReferences.size() > 0);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
@@ -1485,12 +1484,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
}
m_SlogFile.Append(DiskEntries);
- for (size_t Index = 0; Index < Locations.size(); Index++)
- {
- size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
- ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = std::move(OutResults[Index]);
- }
IndexOffset += Locations.size();
});
}