aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-12-04 16:20:58 +0100
committerGitHub Enterprise <[email protected]>2025-12-04 16:20:58 +0100
commitec69a0d3d8febf15aa5df347e514c5071c19ff71 (patch)
tree51a38b7394243281d1378f803a75a277c098edf9 /src/zenstore/cache
parentadd checks to protect against access violation due to failed disk read (#675) (diff)
downloadzen-ec69a0d3d8febf15aa5df347e514c5071c19ff71.tar.xz
zen-ec69a0d3d8febf15aa5df347e514c5071c19ff71.zip
batch op not in destructor (#676)
* use fixed vectors for batch requests * refactor cache batch value put/get to not execute code that can throw execeptions in destructor * extend test with multi-bucket requests
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp589
-rw-r--r--src/zenstore/cache/cacherpc.cpp19
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp122
3 files changed, 402 insertions, 328 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 46dfbd912..57712a706 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1355,28 +1355,35 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(ZenCachePutResultVec_t& OutResults) : OutResults(OutResults) {}
struct Entry
{
- std::vector<IoHash> HashKeyAndReferences;
- bool Overwrite;
+ eastl::fixed_vector<IoHash, 8> HashKeyAndReferences;
+ bool Overwrite;
};
- std::vector<ZenCacheValue> Buffers;
- std::vector<Entry> Entries;
- std::vector<size_t> EntryResultIndexes;
- std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
+ eastl::fixed_vector<ZenCacheValue, 32> Buffers;
+ eastl::fixed_vector<Entry, 32> Entries;
+ eastl::fixed_vector<size_t, 32> EntryResultIndexes;
+
+ ZenCachePutResultVec_t& OutResults;
};
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults)
+ZenCacheDiskLayer::CacheBucket::CreatePutBatch(ZenCachePutResultVec_t& OutResults)
{
- ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch");
+ ZEN_TRACE_CPU("Z$::Bucket::CreatePutBatch");
return new PutBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CacheBucket::DeletePutBatch(PutBatchHandle* Batch) noexcept
+{
+ delete Batch;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::CommitPutBatch(PutBatchHandle* Batch)
{
ZEN_TRACE_CPU("Z$::Bucket::EndPutBatch");
@@ -1392,7 +1399,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
BuffersToCommit.reserve(Batch->Buffers.size());
for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
+ const eastl::fixed_vector<IoHash, 8>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
ZenCacheValue& Value = Batch->Buffers[Index];
@@ -1426,7 +1433,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
for (size_t Index = 0; Index < Locations.size(); Index++)
{
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
- const std::vector<IoHash>& HashKeyAndReferences =
+ const eastl::fixed_vector<IoHash, 8>& HashKeyAndReferences =
Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
const IoHash HashKey = HashKeyAndReferences[0];
@@ -1466,7 +1473,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
IndexOffset += Locations.size();
});
}
- delete Batch;
}
catch (const std::exception& Ex)
{
@@ -1491,85 +1497,64 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
};
ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult)
+ZenCacheDiskLayer::CacheBucket::CreateGetBatch(ZenCacheValueVec_t& OutResult)
{
- ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch");
+ ZEN_TRACE_CPU("Z$::Bucket::CreateGetBatch");
return new GetBatchHandle(OutResult);
}
void
-ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CacheBucket::CommitGetBatch(GetBatchHandle* Batch)
{
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch");
+ ZEN_TRACE_CPU("Z$::Bucket::CommitGetBatch");
- try
- {
- ZEN_ASSERT(Batch);
- ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
+ ZEN_ASSERT(Batch);
+ ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
- metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
- if (!Batch->ResultIndexes.empty())
+ if (!Batch->ResultIndexes.empty())
+ {
+ eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations;
+ eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes;
+ eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes;
+ eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations;
+ eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations;
+ eastl::fixed_vector<size_t, 16> InlineKeyIndexes;
+ eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false);
{
- eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations;
- eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes;
- eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes;
- eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations;
- eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations;
- eastl::fixed_vector<size_t, 16> InlineKeyIndexes;
- eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
{
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
+ const IoHash& HashKey = Batch->Keys[KeyIndex];
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
{
- const IoHash& HashKey = Batch->Keys[KeyIndex];
- auto It = m_Index.find(HashKey);
- if (It != m_Index.end())
- {
- size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
- ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
- const PayloadIndex PayloadIdx = It.value();
- m_AccessTimes[PayloadIdx] = GcClock::TickCount();
- const BucketPayload& Payload = m_Payloads[PayloadIdx];
- const DiskLocation& Location = Payload.Location;
+ const PayloadIndex PayloadIdx = It.value();
+ m_AccessTimes[PayloadIdx] = GcClock::TickCount();
+ const BucketPayload& Payload = m_Payloads[PayloadIdx];
+ const DiskLocation& Location = Payload.Location;
- FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
- if (Payload.MetaData)
- {
- const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
- OutValue.RawHash = MetaData.RawHash;
- OutValue.RawSize = MetaData.RawSize;
- FillRawHashAndRawSize[KeyIndex] = false;
- }
+ FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (Payload.MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ FillRawHashAndRawSize[KeyIndex] = false;
+ }
- if (Payload.MemCached)
- {
- OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
- if (FillRawHashAndRawSize[KeyIndex])
- {
- MemCachedKeyIndexes.push_back(KeyIndex);
- }
- m_MemoryHitCount++;
- }
- else
+ if (Payload.MemCached)
+ {
+ OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
+ if (FillRawHashAndRawSize[KeyIndex])
{
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- m_MemoryMissCount++;
- }
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- StandaloneDiskLocations.push_back(Location);
- StandaloneKeyIndexes.push_back(KeyIndex);
- }
- else
- {
- InlineDiskLocations.push_back(Location);
- InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
- InlineKeyIndexes.push_back(KeyIndex);
- }
+ MemCachedKeyIndexes.push_back(KeyIndex);
}
+ m_MemoryHitCount++;
}
else
{
@@ -1577,45 +1562,197 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
m_MemoryMissCount++;
}
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneDiskLocations.push_back(Location);
+ StandaloneKeyIndexes.push_back(KeyIndex);
+ }
+ else
+ {
+ InlineDiskLocations.push_back(Location);
+ InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
+ InlineKeyIndexes.push_back(KeyIndex);
+ }
+ }
+ }
+ else
+ {
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
}
}
}
+ }
- // MemCached and MetaData are set independently so we need to check if meta data has been set or if we need to set it even
- // if we found the data as memcached.
- // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth
- // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata
+ // MemCached and MetaData are set independently so we need to check if meta data has been set or if we need to set it even
+ // if we found the data as memcached.
+ // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth
+ // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata
- auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) {
- if (!Value)
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) {
+ if (!Value)
+ {
+ return;
+ }
+ const IoHash& Key = Batch->Keys[KeyIndex];
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+ OutValue.Value = std::move(Value);
+ OutValue.Value.SetContentType(Location.GetContentType());
+
+ bool AddToMemCache = false;
+ bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
{
- return;
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ AddToMemCache = true;
}
- const IoHash& Key = Batch->Keys[KeyIndex];
- size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
- ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
- OutValue.Value = std::move(Value);
- OutValue.Value.SetContentType(Location.GetContentType());
+ }
- bool AddToMemCache = false;
- bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
- if (m_Configuration.MemCacheSizeThreshold > 0)
+ if (AddToMemCache || UsesTemporaryMemory)
+ {
+ // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer
+ OutValue.Value.MakeOwned();
+ }
+
+ if (SetMetaInfo)
+ {
+ // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
+ // metadata separately, check if it had time to set the metadata
+ RwLock::SharedLockScope UpdateIndexLock(m_IndexLock);
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{
- size_t ValueSize = OutValue.Value.GetSize();
- if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
+ if (Payload.MetaData)
{
- OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
- AddToMemCache = true;
+ const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ SetMetaInfo = false;
}
}
+ }
- if (AddToMemCache || UsesTemporaryMemory)
+ if (SetMetaInfo)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
+ {
+ OutValue = ZenCacheValue{};
+ AddToMemCache = false;
+ SetMetaInfo = false;
+ }
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ if (OutValue.RawSize > std::numeric_limits<std::uint32_t>::max())
{
- // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer
- OutValue.Value.MakeOwned();
+ SetMetaInfo = false;
}
+ }
- if (SetMetaInfo)
+ if (SetMetaInfo || AddToMemCache)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ {
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ {
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
+
+ // Only update if it has not already been updated by other thread
+ if (!Payload.MetaData && SetMetaInfo)
+ {
+ SetMetaData(UpdateIndexLock,
+ Payload,
+ {.RawSize = gsl::narrow<uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash});
+ }
+ if (!Payload.MemCached && AddToMemCache)
+ {
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ }
+ }
+ }
+ }
+ };
+
+ // We don't want to read into memory if they are to big since we might only want to touch the compressed
+ // header before sending it along
+ if (!InlineDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
+ m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration
+ const uint64_t LargeChunkSizeLimit =
+ m_Configuration.MemCacheSizeThreshold == 0
+ ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u)
+ : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u);
+
+ m_BlockStore.IterateBlock(
+ std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ const void* Data,
+ uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)),
+ /*UsesTemporaryMemory*/ true);
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ File.GetChunk(Offset, Size),
+ /*UsesTemporaryMemory*/ false);
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
+ }
+
+ if (!StandaloneDiskLocations.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
+ for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
+ {
+ size_t KeyIndex = StandaloneKeyIndexes[Index];
+ const DiskLocation& Location = StandaloneDiskLocations[Index];
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false);
+ }
+ }
+
+ if (!MemCachedKeyIndexes.empty())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCached");
+ for (size_t KeyIndex : MemCachedKeyIndexes)
+ {
+ const IoHash& Key = Batch->Keys[KeyIndex];
+ bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
+ ZEN_ASSERT(SetMetaInfo);
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
// metadata separately, check if it had time to set the metadata
@@ -1635,17 +1772,14 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (SetMetaInfo)
{
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
- if (Location.IsFlagSet(DiskLocation::kCompressed))
+ if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
OutValue.RawHash,
OutValue.RawSize,
/*OutOptionalTotalCompressedSize*/ nullptr))
{
- OutValue = ZenCacheValue{};
- AddToMemCache = false;
- SetMetaInfo = false;
+ OutValue = ZenCacheValue{};
}
}
else
@@ -1653,179 +1787,57 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
OutValue.RawSize = OutValue.Value.GetSize();
}
- if (OutValue.RawSize > std::numeric_limits<std::uint32_t>::max())
- {
- SetMetaInfo = false;
- }
- }
- if (SetMetaInfo || AddToMemCache)
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
- RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ if (OutValue.RawSize <= std::numeric_limits<std::uint32_t>::max())
{
- if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
{
- BucketPayload& Payload = m_Payloads[UpdateIt->second];
-
- // Only update if it has not already been updated by other thread
- if (!Payload.MetaData && SetMetaInfo)
- {
- SetMetaData(UpdateIndexLock,
- Payload,
- {.RawSize = gsl::narrow<uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash});
- }
- if (!Payload.MemCached && AddToMemCache)
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{
- SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
- }
- }
- }
- }
- };
-
- // We don't want to read into memory if they are to big since we might only want to touch the compressed
- // header before sending it along
- if (!InlineDiskLocations.empty())
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
- m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
- [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
- // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration
- const uint64_t LargeChunkSizeLimit =
- m_Configuration.MemCacheSizeThreshold == 0
- ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u)
- : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u);
-
- m_BlockStore.IterateBlock(
- std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
- ChunkIndexes,
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- const void* Data,
- uint64_t Size) -> bool {
- if (Data != nullptr)
- {
- FillOne(InlineDiskLocations[ChunkIndex],
- InlineKeyIndexes[ChunkIndex],
- IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)),
- /*UsesTemporaryMemory*/ true);
- }
- return true;
- },
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- BlockStoreFile& File,
- uint64_t Offset,
- uint64_t Size) -> bool {
- FillOne(InlineDiskLocations[ChunkIndex],
- InlineKeyIndexes[ChunkIndex],
- File.GetChunk(Offset, Size),
- /*UsesTemporaryMemory*/ false);
- return true;
- },
- LargeChunkSizeLimit);
- return true;
- });
- }
-
- if (!StandaloneDiskLocations.empty())
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
- for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
- {
- size_t KeyIndex = StandaloneKeyIndexes[Index];
- const DiskLocation& Location = StandaloneDiskLocations[Index];
- FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false);
- }
- }
-
- if (!MemCachedKeyIndexes.empty())
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCached");
- for (size_t KeyIndex : MemCachedKeyIndexes)
- {
- const IoHash& Key = Batch->Keys[KeyIndex];
- bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
- ZEN_ASSERT(SetMetaInfo);
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
- size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
- ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
- {
- // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
- // metadata separately, check if it had time to set the metadata
- RwLock::SharedLockScope UpdateIndexLock(m_IndexLock);
- if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
- {
- BucketPayload& Payload = m_Payloads[UpdateIt->second];
- if (Payload.MetaData)
- {
- const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
- OutValue.RawHash = MetaData.RawHash;
- OutValue.RawSize = MetaData.RawSize;
- SetMetaInfo = false;
- }
- }
- }
-
- if (SetMetaInfo)
- {
- if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
- OutValue.RawHash,
- OutValue.RawSize,
- /*OutOptionalTotalCompressedSize*/ nullptr))
- {
- OutValue = ZenCacheValue{};
- }
- }
- else
- {
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
- }
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
- if (OutValue.RawSize <= std::numeric_limits<std::uint32_t>::max())
- {
- RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
- {
- if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ // Only update if it has not already been updated by other thread
+ if (!Payload.MetaData)
{
- BucketPayload& Payload = m_Payloads[UpdateIt->second];
-
- // Only update if it has not already been updated by other thread
- if (!Payload.MetaData)
- {
- SetMetaData(UpdateIndexLock,
- Payload,
- {.RawSize = static_cast<std::uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash});
- }
+ SetMetaData(UpdateIndexLock,
+ Payload,
+ {.RawSize = static_cast<std::uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash});
}
}
}
}
}
}
+ }
- for (size_t ResultIndex : Batch->ResultIndexes)
+ for (size_t ResultIndex : Batch->ResultIndexes)
+ {
+ bool Hit = !!Batch->OutResults[ResultIndex].Value;
+ if (Hit)
{
- bool Hit = !!Batch->OutResults[ResultIndex].Value;
- if (Hit)
- {
- m_DiskHitCount++;
- StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
- }
- else
- {
- m_DiskMissCount++;
- }
+ m_DiskHitCount++;
+ StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
+ }
+ else
+ {
+ m_DiskMissCount++;
}
}
+ }
+}
+void
+ZenCacheDiskLayer::CacheBucket::DeleteGetBatch(GetBatchHandle* Batch) noexcept
+{
+ ZEN_TRACE_CPU("Z$::Bucket::DeleteGetBatch");
+
+ try
+ {
delete Batch;
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndGetBatch: '{}'", Ex.what());
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::DeleteGetBatch: '{}'", Ex.what());
}
}
@@ -2954,9 +2966,9 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
- PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
- CurrentEntry.Overwrite = Overwrite;
- std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
+ PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
+ CurrentEntry.Overwrite = Overwrite;
+ eastl::fixed_vector<IoHash, 8>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
HashKeyAndReferences.reserve(1 + References.size());
HashKeyAndReferences.push_back(HashKey);
HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end());
@@ -3969,7 +3981,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
struct ZenCacheDiskLayer::PutBatchHandle
{
- PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(ZenCachePutResultVec_t& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -4006,7 +4018,7 @@ struct ZenCacheDiskLayer::PutBatchHandle
}
}
- CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
+ CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->CreatePutBatch(OutResults);
if (NewBucketHandle == nullptr)
{
return nullptr;
@@ -4020,7 +4032,7 @@ struct ZenCacheDiskLayer::PutBatchHandle
CacheBucket::PutBatchHandle* Result = It->Handle;
ZEN_ASSERT(Result != nullptr);
_.ReleaseNow();
- Bucket->EndPutBatch(NewBucketHandle);
+ Bucket->DeletePutBatch(NewBucketHandle);
return Result;
}
@@ -4028,30 +4040,35 @@ struct ZenCacheDiskLayer::PutBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
+ RwLock Lock;
+ eastl::fixed_vector<BucketHandle, 4> BucketHandles;
+ ZenCachePutResultVec_t& OutResults;
};
ZenCacheDiskLayer::PutBatchHandle*
-ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults)
+ZenCacheDiskLayer::CreatePutBatch(ZenCachePutResultVec_t& OutResults)
{
+ ZEN_TRACE_CPU("Z$::CreatePutBatch");
return new PutBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CommitPutBatch(PutBatchHandle* Batch)
{
- try
+ ZEN_TRACE_CPU("Z$::CommitPutBatch");
+
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->CommitPutBatch(Handle); });
+}
+
+void
+ZenCacheDiskLayer::DeletePutBatch(PutBatchHandle* Batch) noexcept
+{
+ if (Batch)
{
- ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->DeletePutBatch(Handle); });
delete Batch;
}
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Exception in ZenCacheDiskLayer::EndPutBatch: '{}'", Ex.what());
- }
}
struct ZenCacheDiskLayer::GetBatchHandle
@@ -4093,7 +4110,7 @@ struct ZenCacheDiskLayer::GetBatchHandle
}
}
- CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults);
+ CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->CreateGetBatch(OutResults);
if (NewBucketHandle == nullptr)
{
return nullptr;
@@ -4107,7 +4124,7 @@ struct ZenCacheDiskLayer::GetBatchHandle
CacheBucket::GetBatchHandle* Result = It->Handle;
ZEN_ASSERT(Result != nullptr);
_.ReleaseNow();
- Bucket->EndGetBatch(NewBucketHandle);
+ Bucket->DeleteGetBatch(NewBucketHandle);
return Result;
}
@@ -4122,25 +4139,35 @@ struct ZenCacheDiskLayer::GetBatchHandle
};
ZenCacheDiskLayer::GetBatchHandle*
-ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults)
+ZenCacheDiskLayer::CreateGetBatch(ZenCacheValueVec_t& OutResults)
{
+ ZEN_TRACE_CPU("Z$::CreateGetBatch");
return new GetBatchHandle(OutResults);
}
void
-ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept
+ZenCacheDiskLayer::CommitGetBatch(GetBatchHandle* Batch)
{
- ZEN_TRACE_CPU("Z$::EndGetBatch");
+ ZEN_TRACE_CPU("Z$::CommitGetBatch");
+
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->CommitGetBatch(Handle); });
+ TryMemCacheTrim();
+}
+
+void
+ZenCacheDiskLayer::DeleteGetBatch(GetBatchHandle* Batch) noexcept
+{
+ ZEN_TRACE_CPU("Z$::DeleteGetBatch");
try
{
ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); });
- TryMemCacheTrim();
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->DeleteGetBatch(Handle); });
delete Batch;
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Exception in ZenCacheDiskLayer::EndGetBatch: '{}'", Ex.what());
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::DeleteGetBatch: '{}'", Ex.what());
}
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 9e57b41c3..6469d6c31 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -820,6 +820,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
Request.Complete = true;
+
for (ValueRequestData& Value : Request.Values)
{
if (Value.Exists)
@@ -993,7 +994,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<ZenCacheStore::PutResult> BatchResults;
+ ZenCacheStore::ZenCachePutResultVec_t BatchResults;
eastl::fixed_vector<size_t, 32> BatchResultIndexes;
eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
@@ -1124,6 +1125,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
Valid ? "Added"sv : "Invalid",
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
}
+ if (Batch)
+ {
+ Batch->Commit();
+ Batch.reset();
+ }
}
if (Results.empty())
{
@@ -1268,7 +1274,11 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
CacheValues.push_back({});
}
}
- Batch.reset();
+ if (Batch)
+ {
+ Batch->Commit();
+ Batch.reset();
+ }
ZEN_ASSERT(CacheValues.size() == RequestsArray.Num());
}
for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++)
@@ -1818,6 +1828,11 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
}
Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
}
+ if (Batch)
+ {
+ Batch->Commit();
+ Batch.reset();
+ }
}
for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++)
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index c0b433c51..f1cf6e4a4 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -142,25 +142,32 @@ struct ZenCacheNamespace::PutBatchHandle
};
ZenCacheNamespace::PutBatchHandle*
-ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult)
+ZenCacheNamespace::CreatePutBatch(ZenCachePutResultVec_t& OutResult)
{
ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
- Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
+ Handle->DiskLayerHandle = m_DiskLayer.CreatePutBatch(OutResult);
return Handle;
}
void
-ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept
+ZenCacheNamespace::CommitPutBatch(PutBatchHandle* Batch)
+{
+ ZEN_ASSERT(Batch);
+ m_DiskLayer.CommitPutBatch(Batch->DiskLayerHandle);
+}
+
+void
+ZenCacheNamespace::DeletePutBatch(PutBatchHandle* Batch) noexcept
{
try
{
ZEN_ASSERT(Batch);
- m_DiskLayer.EndPutBatch(Batch->DiskLayerHandle);
+ m_DiskLayer.DeletePutBatch(Batch->DiskLayerHandle);
delete Batch;
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Exception in ZenCacheNamespace::EndPutBatch: '{}'", Ex.what());
+ ZEN_ERROR("Exception in ZenCacheNamespace::DeletePutBatch: '{}'", Ex.what());
}
}
@@ -172,39 +179,46 @@ struct ZenCacheNamespace::GetBatchHandle
};
ZenCacheNamespace::GetBatchHandle*
-ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult)
+ZenCacheNamespace::CreateGetBatch(ZenCacheValueVec_t& OutResult)
{
ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult);
- Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult);
+ Handle->DiskLayerHandle = m_DiskLayer.CreateGetBatch(OutResult);
return Handle;
}
void
-ZenCacheNamespace::EndGetBatch(GetBatchHandle* Batch) noexcept
+ZenCacheNamespace::CommitGetBatch(GetBatchHandle* Batch)
{
- try
- {
- ZEN_ASSERT(Batch);
- m_DiskLayer.EndGetBatch(Batch->DiskLayerHandle);
+ ZEN_ASSERT(Batch);
+ m_DiskLayer.CommitGetBatch(Batch->DiskLayerHandle);
- metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
- for (const ZenCacheValue& Result : Batch->Results)
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+ for (const ZenCacheValue& Result : Batch->Results)
+ {
+ if (Result.Value)
{
- if (Result.Value)
- {
- m_HitCount++;
- StatsScope.SetBytes(Result.Value.Size());
- }
- else
- {
- m_MissCount++;
- }
+ m_HitCount++;
+ StatsScope.SetBytes(Result.Value.Size());
}
+ else
+ {
+ m_MissCount++;
+ }
+ }
+}
+
+void
+ZenCacheNamespace::DeleteGetBatch(GetBatchHandle* Batch) noexcept
+{
+ try
+ {
+ ZEN_ASSERT(Batch);
+ m_DiskLayer.DeleteGetBatch(Batch->DiskLayerHandle);
delete Batch;
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Exception in ZenCacheNamespace::EndGetBatch: '{}'", Ex.what());
+ ZEN_ERROR("Exception in ZenCacheNamespace::DeleteGetBatch: '{}'", Ex.what());
}
}
@@ -543,13 +557,24 @@ ZenCacheStore::LogWorker()
}
}
-ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult)
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCachePutResultVec_t& OutResult)
: m_CacheStore(CacheStore)
{
ZEN_MEMSCOPE(GetCacheStoreTag());
if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store)
{
- m_NamespaceBatchHandle = m_Store->BeginPutBatch(OutResult);
+ m_NamespaceBatchHandle = m_Store->CreatePutBatch(OutResult);
+ }
+}
+
+void
+ZenCacheStore::PutBatch::Commit()
+{
+ if (m_Store)
+ {
+ ZEN_MEMSCOPE(GetCacheStoreTag());
+ ZEN_ASSERT(m_NamespaceBatchHandle);
+ m_Store->CommitPutBatch(m_NamespaceBatchHandle);
}
}
@@ -557,12 +582,8 @@ ZenCacheStore::PutBatch::~PutBatch()
{
try
{
- if (m_Store)
- {
- ZEN_MEMSCOPE(GetCacheStoreTag());
- ZEN_ASSERT(m_NamespaceBatchHandle);
- m_Store->EndPutBatch(m_NamespaceBatchHandle);
- }
+ ZEN_ASSERT(m_NamespaceBatchHandle);
+ m_Store->DeletePutBatch(m_NamespaceBatchHandle);
}
catch (const std::exception& Ex)
{
@@ -577,7 +598,29 @@ ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view In
ZEN_MEMSCOPE(GetCacheStoreTag());
if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store)
{
- m_NamespaceBatchHandle = m_Store->BeginGetBatch(OutResult);
+ m_NamespaceBatchHandle = m_Store->CreateGetBatch(OutResult);
+ }
+}
+
+void
+ZenCacheStore::GetBatch::Commit()
+{
+ if (m_Store)
+ {
+ ZEN_MEMSCOPE(GetCacheStoreTag());
+ ZEN_ASSERT(m_NamespaceBatchHandle);
+ m_Store->CommitGetBatch(m_NamespaceBatchHandle);
+
+ metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0);
+ for (const ZenCacheValue& Result : Results)
+ {
+ if (Result.Value)
+ {
+ m_CacheStore.m_HitCount++;
+ OpScope.SetBytes(Result.Value.GetSize());
+ }
+ m_CacheStore.m_MissCount++;
+ }
}
}
@@ -589,18 +632,7 @@ ZenCacheStore::GetBatch::~GetBatch()
{
ZEN_MEMSCOPE(GetCacheStoreTag());
ZEN_ASSERT(m_NamespaceBatchHandle);
- m_Store->EndGetBatch(m_NamespaceBatchHandle);
-
- metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0);
- for (const ZenCacheValue& Result : Results)
- {
- if (Result.Value)
- {
- m_CacheStore.m_HitCount++;
- OpScope.SetBytes(Result.Value.GetSize());
- }
- m_CacheStore.m_MissCount++;
- }
+ m_Store->DeleteGetBatch(m_NamespaceBatchHandle);
}
}
catch (const std::exception& Ex)