diff options
| author | Dan Engelbrecht <[email protected]> | 2025-12-04 16:20:58 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-12-04 16:20:58 +0100 |
| commit | ec69a0d3d8febf15aa5df347e514c5071c19ff71 (patch) | |
| tree | 51a38b7394243281d1378f803a75a277c098edf9 /src/zenstore/cache | |
| parent | add checks to protect against access violation due to failed disk read (#675) (diff) | |
| download | zen-ec69a0d3d8febf15aa5df347e514c5071c19ff71.tar.xz zen-ec69a0d3d8febf15aa5df347e514c5071c19ff71.zip | |
batch op not in destructor (#676)
* use fixed vectors for batch requests
* refactor cache batch value put/get to not execute code that can throw execeptions in destructor
* extend test with multi-bucket requests
Diffstat (limited to 'src/zenstore/cache')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 589 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 19 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 122 |
3 files changed, 402 insertions, 328 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 46dfbd912..57712a706 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1355,28 +1355,35 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(ZenCachePutResultVec_t& OutResults) : OutResults(OutResults) {} struct Entry { - std::vector<IoHash> HashKeyAndReferences; - bool Overwrite; + eastl::fixed_vector<IoHash, 8> HashKeyAndReferences; + bool Overwrite; }; - std::vector<ZenCacheValue> Buffers; - std::vector<Entry> Entries; - std::vector<size_t> EntryResultIndexes; - std::vector<ZenCacheDiskLayer::PutResult>& OutResults; + eastl::fixed_vector<ZenCacheValue, 32> Buffers; + eastl::fixed_vector<Entry, 32> Entries; + eastl::fixed_vector<size_t, 32> EntryResultIndexes; + + ZenCachePutResultVec_t& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) +ZenCacheDiskLayer::CacheBucket::CreatePutBatch(ZenCachePutResultVec_t& OutResults) { - ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); + ZEN_TRACE_CPU("Z$::Bucket::CreatePutBatch"); return new PutBatchHandle(OutResults); } void -ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept +ZenCacheDiskLayer::CacheBucket::DeletePutBatch(PutBatchHandle* Batch) noexcept +{ + delete Batch; +} + +void +ZenCacheDiskLayer::CacheBucket::CommitPutBatch(PutBatchHandle* Batch) { ZEN_TRACE_CPU("Z$::Bucket::EndPutBatch"); @@ -1392,7 +1399,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept BuffersToCommit.reserve(Batch->Buffers.size()); for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + const eastl::fixed_vector<IoHash, 8>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() >= 1); ZenCacheValue& Value = Batch->Buffers[Index]; @@ -1426,7 +1433,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept for (size_t Index = 0; Index < Locations.size(); Index++) { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector<IoHash>& HashKeyAndReferences = + const eastl::fixed_vector<IoHash, 8>& HashKeyAndReferences = Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() >= 1); const IoHash HashKey = HashKeyAndReferences[0]; @@ -1466,7 +1473,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept IndexOffset += Locations.size(); }); } - delete Batch; } catch (const std::exception& Ex) { @@ -1491,85 +1497,64 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle }; ZenCacheDiskLayer::CacheBucket::GetBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult) +ZenCacheDiskLayer::CacheBucket::CreateGetBatch(ZenCacheValueVec_t& OutResult) { - ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch"); + ZEN_TRACE_CPU("Z$::Bucket::CreateGetBatch"); return new GetBatchHandle(OutResult); } void -ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept +ZenCacheDiskLayer::CacheBucket::CommitGetBatch(GetBatchHandle* Batch) { - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch"); + ZEN_TRACE_CPU("Z$::Bucket::CommitGetBatch"); - try - { - ZEN_ASSERT(Batch); - ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size()); + ZEN_ASSERT(Batch); + ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size()); - metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); - if (!Batch->ResultIndexes.empty()) + if (!Batch->ResultIndexes.empty()) + { + eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations; + eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes; + eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes; + eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations; + eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations; + eastl::fixed_vector<size_t, 16> InlineKeyIndexes; + eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false); { - eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations; - eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes; - eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes; - eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations; - eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations; - eastl::fixed_vector<size_t, 16> InlineKeyIndexes; - eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false); + RwLock::SharedLockScope IndexLock(m_IndexLock); + for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) { - RwLock::SharedLockScope IndexLock(m_IndexLock); - for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) + const IoHash& HashKey = Batch->Keys[KeyIndex]; + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) { - const IoHash& HashKey = Batch->Keys[KeyIndex]; - auto It = m_Index.find(HashKey); - if (It != m_Index.end()) - { - size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; - ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; - const PayloadIndex PayloadIdx = It.value(); - m_AccessTimes[PayloadIdx] = GcClock::TickCount(); - const BucketPayload& Payload = m_Payloads[PayloadIdx]; - const DiskLocation& Location = Payload.Location; + const PayloadIndex PayloadIdx = It.value(); + m_AccessTimes[PayloadIdx] = GcClock::TickCount(); + const BucketPayload& Payload = m_Payloads[PayloadIdx]; + const DiskLocation& Location = Payload.Location; - FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); - if (Payload.MetaData) - { - const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; - OutValue.RawHash = MetaData.RawHash; - OutValue.RawSize = MetaData.RawSize; - FillRawHashAndRawSize[KeyIndex] = false; - } + FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); + if (Payload.MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; + OutValue.RawHash = MetaData.RawHash; + OutValue.RawSize = MetaData.RawSize; + FillRawHashAndRawSize[KeyIndex] = false; + } - if (Payload.MemCached) - { - OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload; - if (FillRawHashAndRawSize[KeyIndex]) - { - MemCachedKeyIndexes.push_back(KeyIndex); - } - m_MemoryHitCount++; - } - else + if (Payload.MemCached) + { + OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload; + if (FillRawHashAndRawSize[KeyIndex]) { - if (m_Configuration.MemCacheSizeThreshold > 0) - { - m_MemoryMissCount++; - } - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneDiskLocations.push_back(Location); - StandaloneKeyIndexes.push_back(KeyIndex); - } - else - { - InlineDiskLocations.push_back(Location); - InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment)); - InlineKeyIndexes.push_back(KeyIndex); - } + MemCachedKeyIndexes.push_back(KeyIndex); } + m_MemoryHitCount++; } else { @@ -1577,45 +1562,197 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { m_MemoryMissCount++; } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneDiskLocations.push_back(Location); + StandaloneKeyIndexes.push_back(KeyIndex); + } + else + { + InlineDiskLocations.push_back(Location); + InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment)); + InlineKeyIndexes.push_back(KeyIndex); + } + } + } + else + { + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; } } } + } - // MemCached and MetaData are set independently so we need to check if meta data has been set or if we need to set it even - // if we found the data as memcached. - // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth - // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata + // MemCached and MetaData are set independently so we need to check if meta data has been set or if we need to set it even + // if we found the data as memcached. + // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth + // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata - auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) { - if (!Value) + auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) { + if (!Value) + { + return; + } + const IoHash& Key = Batch->Keys[KeyIndex]; + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; + OutValue.Value = std::move(Value); + OutValue.Value.SetContentType(Location.GetContentType()); + + bool AddToMemCache = false; + bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + size_t ValueSize = OutValue.Value.GetSize(); + if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { - return; + OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); + AddToMemCache = true; } - const IoHash& Key = Batch->Keys[KeyIndex]; - size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; - ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; - OutValue.Value = std::move(Value); - OutValue.Value.SetContentType(Location.GetContentType()); + } - bool AddToMemCache = false; - bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; - if (m_Configuration.MemCacheSizeThreshold > 0) + if (AddToMemCache || UsesTemporaryMemory) + { + // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer + OutValue.Value.MakeOwned(); + } + + if (SetMetaInfo) + { + // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the + // metadata separately, check if it had time to set the metadata + RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); + if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { - size_t ValueSize = OutValue.Value.GetSize(); - if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) + BucketPayload& Payload = m_Payloads[UpdateIt->second]; + if (Payload.MetaData) { - OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); - AddToMemCache = true; + const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; + OutValue.RawHash = MetaData.RawHash; + OutValue.RawSize = MetaData.RawSize; + SetMetaInfo = false; } } + } - if (AddToMemCache || UsesTemporaryMemory) + if (SetMetaInfo) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) + { + OutValue = ZenCacheValue{}; + AddToMemCache = false; + SetMetaInfo = false; + } + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + if (OutValue.RawSize > std::numeric_limits<std::uint32_t>::max()) { - // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer - OutValue.Value.MakeOwned(); + SetMetaInfo = false; } + } - if (SetMetaInfo) + if (SetMetaInfo || AddToMemCache) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache"); + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); + { + if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) + { + BucketPayload& Payload = m_Payloads[UpdateIt->second]; + + // Only update if it has not already been updated by other thread + if (!Payload.MetaData && SetMetaInfo) + { + SetMetaData(UpdateIndexLock, + Payload, + {.RawSize = gsl::narrow<uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash}); + } + if (!Payload.MemCached && AddToMemCache) + { + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); + } + } + } + } + }; + + // We don't want to read into memory if they are to big since we might only want to touch the compressed + // header before sending it along + if (!InlineDiskLocations.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); + m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration + const uint64_t LargeChunkSizeLimit = + m_Configuration.MemCacheSizeThreshold == 0 + ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u) + : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u); + + m_BlockStore.IterateBlock( + std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + ChunkIndexes, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + const void* Data, + uint64_t Size) -> bool { + if (Data != nullptr) + { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)), + /*UsesTemporaryMemory*/ true); + } + return true; + }, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + BlockStoreFile& File, + uint64_t Offset, + uint64_t Size) -> bool { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + File.GetChunk(Offset, Size), + /*UsesTemporaryMemory*/ false); + return true; + }, + LargeChunkSizeLimit); + return true; + }); + } + + if (!StandaloneDiskLocations.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone"); + for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++) + { + size_t KeyIndex = StandaloneKeyIndexes[Index]; + const DiskLocation& Location = StandaloneDiskLocations[Index]; + FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false); + } + } + + if (!MemCachedKeyIndexes.empty()) + { + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCached"); + for (size_t KeyIndex : MemCachedKeyIndexes) + { + const IoHash& Key = Batch->Keys[KeyIndex]; + bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; + ZEN_ASSERT(SetMetaInfo); + ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); + size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; + ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the // metadata separately, check if it had time to set the metadata @@ -1635,17 +1772,14 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (SetMetaInfo) { - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); - if (Location.IsFlagSet(DiskLocation::kCompressed)) + if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { - OutValue = ZenCacheValue{}; - AddToMemCache = false; - SetMetaInfo = false; + OutValue = ZenCacheValue{}; } } else @@ -1653,179 +1787,57 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } - if (OutValue.RawSize > std::numeric_limits<std::uint32_t>::max()) - { - SetMetaInfo = false; - } - } - if (SetMetaInfo || AddToMemCache) - { - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache"); - RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); + if (OutValue.RawSize <= std::numeric_limits<std::uint32_t>::max()) { - if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); { - BucketPayload& Payload = m_Payloads[UpdateIt->second]; - - // Only update if it has not already been updated by other thread - if (!Payload.MetaData && SetMetaInfo) - { - SetMetaData(UpdateIndexLock, - Payload, - {.RawSize = gsl::narrow<uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash}); - } - if (!Payload.MemCached && AddToMemCache) + if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { - SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); - } - } - } - } - }; - - // We don't want to read into memory if they are to big since we might only want to touch the compressed - // header before sending it along - if (!InlineDiskLocations.empty()) - { - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); - m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, - [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { - // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration - const uint64_t LargeChunkSizeLimit = - m_Configuration.MemCacheSizeThreshold == 0 - ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u) - : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u); - - m_BlockStore.IterateBlock( - std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, - ChunkIndexes, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - const void* Data, - uint64_t Size) -> bool { - if (Data != nullptr) - { - FillOne(InlineDiskLocations[ChunkIndex], - InlineKeyIndexes[ChunkIndex], - IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)), - /*UsesTemporaryMemory*/ true); - } - return true; - }, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - BlockStoreFile& File, - uint64_t Offset, - uint64_t Size) -> bool { - FillOne(InlineDiskLocations[ChunkIndex], - InlineKeyIndexes[ChunkIndex], - File.GetChunk(Offset, Size), - /*UsesTemporaryMemory*/ false); - return true; - }, - LargeChunkSizeLimit); - return true; - }); - } - - if (!StandaloneDiskLocations.empty()) - { - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone"); - for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++) - { - size_t KeyIndex = StandaloneKeyIndexes[Index]; - const DiskLocation& Location = StandaloneDiskLocations[Index]; - FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false); - } - } - - if (!MemCachedKeyIndexes.empty()) - { - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCached"); - for (size_t KeyIndex : MemCachedKeyIndexes) - { - const IoHash& Key = Batch->Keys[KeyIndex]; - bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; - ZEN_ASSERT(SetMetaInfo); - ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); - size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; - ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; - { - // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the - // metadata separately, check if it had time to set the metadata - RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); - if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) - { - BucketPayload& Payload = m_Payloads[UpdateIt->second]; - if (Payload.MetaData) - { - const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; - OutValue.RawHash = MetaData.RawHash; - OutValue.RawSize = MetaData.RawSize; - SetMetaInfo = false; - } - } - } - - if (SetMetaInfo) - { - if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, - OutValue.RawHash, - OutValue.RawSize, - /*OutOptionalTotalCompressedSize*/ nullptr)) - { - OutValue = ZenCacheValue{}; - } - } - else - { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); - } + BucketPayload& Payload = m_Payloads[UpdateIt->second]; - if (OutValue.RawSize <= std::numeric_limits<std::uint32_t>::max()) - { - RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); - { - if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) + // Only update if it has not already been updated by other thread + if (!Payload.MetaData) { - BucketPayload& Payload = m_Payloads[UpdateIt->second]; - - // Only update if it has not already been updated by other thread - if (!Payload.MetaData) - { - SetMetaData(UpdateIndexLock, - Payload, - {.RawSize = static_cast<std::uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash}); - } + SetMetaData(UpdateIndexLock, + Payload, + {.RawSize = static_cast<std::uint32_t>(OutValue.RawSize), .RawHash = OutValue.RawHash}); } } } } } } + } - for (size_t ResultIndex : Batch->ResultIndexes) + for (size_t ResultIndex : Batch->ResultIndexes) + { + bool Hit = !!Batch->OutResults[ResultIndex].Value; + if (Hit) { - bool Hit = !!Batch->OutResults[ResultIndex].Value; - if (Hit) - { - m_DiskHitCount++; - StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize()); - } - else - { - m_DiskMissCount++; - } + m_DiskHitCount++; + StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize()); + } + else + { + m_DiskMissCount++; } } + } +} +void +ZenCacheDiskLayer::CacheBucket::DeleteGetBatch(GetBatchHandle* Batch) noexcept +{ + ZEN_TRACE_CPU("Z$::Bucket::DeleteGetBatch"); + + try + { delete Batch; } catch (const std::exception& Ex) { - ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndGetBatch: '{}'", Ex.what()); + ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::DeleteGetBatch: '{}'", Ex.what()); } } @@ -2954,9 +2966,9 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); - PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); - CurrentEntry.Overwrite = Overwrite; - std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; + eastl::fixed_vector<IoHash, 8>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); @@ -3969,7 +3981,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(ZenCachePutResultVec_t& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -4006,7 +4018,7 @@ struct ZenCacheDiskLayer::PutBatchHandle } } - CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); + CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->CreatePutBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; @@ -4020,7 +4032,7 @@ struct ZenCacheDiskLayer::PutBatchHandle CacheBucket::PutBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); - Bucket->EndPutBatch(NewBucketHandle); + Bucket->DeletePutBatch(NewBucketHandle); return Result; } @@ -4028,30 +4040,35 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<ZenCacheDiskLayer::PutResult>& OutResults; + RwLock Lock; + eastl::fixed_vector<BucketHandle, 4> BucketHandles; + ZenCachePutResultVec_t& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults) +ZenCacheDiskLayer::CreatePutBatch(ZenCachePutResultVec_t& OutResults) { + ZEN_TRACE_CPU("Z$::CreatePutBatch"); return new PutBatchHandle(OutResults); } void -ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept +ZenCacheDiskLayer::CommitPutBatch(PutBatchHandle* Batch) { - try + ZEN_TRACE_CPU("Z$::CommitPutBatch"); + + ZEN_ASSERT(Batch); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->CommitPutBatch(Handle); }); +} + +void +ZenCacheDiskLayer::DeletePutBatch(PutBatchHandle* Batch) noexcept +{ + if (Batch) { - ZEN_ASSERT(Batch); - Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->DeletePutBatch(Handle); }); delete Batch; } - catch (const std::exception& Ex) - { - ZEN_ERROR("Exception in ZenCacheDiskLayer::EndPutBatch: '{}'", Ex.what()); - } } struct ZenCacheDiskLayer::GetBatchHandle @@ -4093,7 +4110,7 @@ struct ZenCacheDiskLayer::GetBatchHandle } } - CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults); + CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->CreateGetBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; @@ -4107,7 +4124,7 @@ struct ZenCacheDiskLayer::GetBatchHandle CacheBucket::GetBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); - Bucket->EndGetBatch(NewBucketHandle); + Bucket->DeleteGetBatch(NewBucketHandle); return Result; } @@ -4122,25 +4139,35 @@ struct ZenCacheDiskLayer::GetBatchHandle }; ZenCacheDiskLayer::GetBatchHandle* -ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults) +ZenCacheDiskLayer::CreateGetBatch(ZenCacheValueVec_t& OutResults) { + ZEN_TRACE_CPU("Z$::CreateGetBatch"); return new GetBatchHandle(OutResults); } void -ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept +ZenCacheDiskLayer::CommitGetBatch(GetBatchHandle* Batch) { - ZEN_TRACE_CPU("Z$::EndGetBatch"); + ZEN_TRACE_CPU("Z$::CommitGetBatch"); + + ZEN_ASSERT(Batch); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->CommitGetBatch(Handle); }); + TryMemCacheTrim(); +} + +void +ZenCacheDiskLayer::DeleteGetBatch(GetBatchHandle* Batch) noexcept +{ + ZEN_TRACE_CPU("Z$::DeleteGetBatch"); try { ZEN_ASSERT(Batch); - Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); }); - TryMemCacheTrim(); + Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->DeleteGetBatch(Handle); }); delete Batch; } catch (const std::exception& Ex) { - ZEN_ERROR("Exception in ZenCacheDiskLayer::EndGetBatch: '{}'", Ex.what()); + ZEN_ERROR("Exception in ZenCacheDiskLayer::DeleteGetBatch: '{}'", Ex.what()); } } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 9e57b41c3..6469d6c31 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -820,6 +820,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = true; + for (ValueRequestData& Value : Request.Values) { if (Value.Exists) @@ -993,7 +994,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheStore::PutResult> BatchResults; + ZenCacheStore::ZenCachePutResultVec_t BatchResults; eastl::fixed_vector<size_t, 32> BatchResultIndexes; eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; @@ -1124,6 +1125,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con Valid ? "Added"sv : "Invalid", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } + if (Batch) + { + Batch->Commit(); + Batch.reset(); + } } if (Results.empty()) { @@ -1268,7 +1274,11 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO CacheValues.push_back({}); } } - Batch.reset(); + if (Batch) + { + Batch->Commit(); + Batch.reset(); + } ZEN_ASSERT(CacheValues.size() == RequestsArray.Num()); } for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++) @@ -1818,6 +1828,11 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, } Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); } + if (Batch) + { + Batch->Commit(); + Batch.reset(); + } } for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++) { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index c0b433c51..f1cf6e4a4 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -142,25 +142,32 @@ struct ZenCacheNamespace::PutBatchHandle }; ZenCacheNamespace::PutBatchHandle* -ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult) +ZenCacheNamespace::CreatePutBatch(ZenCachePutResultVec_t& OutResult) { ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle; - Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); + Handle->DiskLayerHandle = m_DiskLayer.CreatePutBatch(OutResult); return Handle; } void -ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept +ZenCacheNamespace::CommitPutBatch(PutBatchHandle* Batch) +{ + ZEN_ASSERT(Batch); + m_DiskLayer.CommitPutBatch(Batch->DiskLayerHandle); +} + +void +ZenCacheNamespace::DeletePutBatch(PutBatchHandle* Batch) noexcept { try { ZEN_ASSERT(Batch); - m_DiskLayer.EndPutBatch(Batch->DiskLayerHandle); + m_DiskLayer.DeletePutBatch(Batch->DiskLayerHandle); delete Batch; } catch (const std::exception& Ex) { - ZEN_ERROR("Exception in ZenCacheNamespace::EndPutBatch: '{}'", Ex.what()); + ZEN_ERROR("Exception in ZenCacheNamespace::DeletePutBatch: '{}'", Ex.what()); } } @@ -172,39 +179,46 @@ struct ZenCacheNamespace::GetBatchHandle }; ZenCacheNamespace::GetBatchHandle* -ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult) +ZenCacheNamespace::CreateGetBatch(ZenCacheValueVec_t& OutResult) { ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult); - Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult); + Handle->DiskLayerHandle = m_DiskLayer.CreateGetBatch(OutResult); return Handle; } void -ZenCacheNamespace::EndGetBatch(GetBatchHandle* Batch) noexcept +ZenCacheNamespace::CommitGetBatch(GetBatchHandle* Batch) { - try - { - ZEN_ASSERT(Batch); - m_DiskLayer.EndGetBatch(Batch->DiskLayerHandle); + ZEN_ASSERT(Batch); + m_DiskLayer.CommitGetBatch(Batch->DiskLayerHandle); - metrics::RequestStats::Scope StatsScope(m_GetOps, 0); - for (const ZenCacheValue& Result : Batch->Results) + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + for (const ZenCacheValue& Result : Batch->Results) + { + if (Result.Value) { - if (Result.Value) - { - m_HitCount++; - StatsScope.SetBytes(Result.Value.Size()); - } - else - { - m_MissCount++; - } + m_HitCount++; + StatsScope.SetBytes(Result.Value.Size()); } + else + { + m_MissCount++; + } + } +} + +void +ZenCacheNamespace::DeleteGetBatch(GetBatchHandle* Batch) noexcept +{ + try + { + ZEN_ASSERT(Batch); + m_DiskLayer.DeleteGetBatch(Batch->DiskLayerHandle); delete Batch; } catch (const std::exception& Ex) { - ZEN_ERROR("Exception in ZenCacheNamespace::EndGetBatch: '{}'", Ex.what()); + ZEN_ERROR("Exception in ZenCacheNamespace::DeleteGetBatch: '{}'", Ex.what()); } } @@ -543,13 +557,24 @@ ZenCacheStore::LogWorker() } } -ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult) +ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCachePutResultVec_t& OutResult) : m_CacheStore(CacheStore) { ZEN_MEMSCOPE(GetCacheStoreTag()); if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store) { - m_NamespaceBatchHandle = m_Store->BeginPutBatch(OutResult); + m_NamespaceBatchHandle = m_Store->CreatePutBatch(OutResult); + } +} + +void +ZenCacheStore::PutBatch::Commit() +{ + if (m_Store) + { + ZEN_MEMSCOPE(GetCacheStoreTag()); + ZEN_ASSERT(m_NamespaceBatchHandle); + m_Store->CommitPutBatch(m_NamespaceBatchHandle); } } @@ -557,12 +582,8 @@ ZenCacheStore::PutBatch::~PutBatch() { try { - if (m_Store) - { - ZEN_MEMSCOPE(GetCacheStoreTag()); - ZEN_ASSERT(m_NamespaceBatchHandle); - m_Store->EndPutBatch(m_NamespaceBatchHandle); - } + ZEN_ASSERT(m_NamespaceBatchHandle); + m_Store->DeletePutBatch(m_NamespaceBatchHandle); } catch (const std::exception& Ex) { @@ -577,7 +598,29 @@ ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view In ZEN_MEMSCOPE(GetCacheStoreTag()); if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store) { - m_NamespaceBatchHandle = m_Store->BeginGetBatch(OutResult); + m_NamespaceBatchHandle = m_Store->CreateGetBatch(OutResult); + } +} + +void +ZenCacheStore::GetBatch::Commit() +{ + if (m_Store) + { + ZEN_MEMSCOPE(GetCacheStoreTag()); + ZEN_ASSERT(m_NamespaceBatchHandle); + m_Store->CommitGetBatch(m_NamespaceBatchHandle); + + metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0); + for (const ZenCacheValue& Result : Results) + { + if (Result.Value) + { + m_CacheStore.m_HitCount++; + OpScope.SetBytes(Result.Value.GetSize()); + } + m_CacheStore.m_MissCount++; + } } } @@ -589,18 +632,7 @@ ZenCacheStore::GetBatch::~GetBatch() { ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_ASSERT(m_NamespaceBatchHandle); - m_Store->EndGetBatch(m_NamespaceBatchHandle); - - metrics::RequestStats::Scope OpScope(m_CacheStore.m_GetOps, 0); - for (const ZenCacheValue& Result : Results) - { - if (Result.Value) - { - m_CacheStore.m_HitCount++; - OpScope.SetBytes(Result.Value.GetSize()); - } - m_CacheStore.m_MissCount++; - } + m_Store->DeleteGetBatch(m_NamespaceBatchHandle); } } catch (const std::exception& Ex) |