diff options
| author | Liam Mitchell <[email protected]> | 2025-08-21 23:58:51 +0000 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2025-08-21 23:58:51 +0000 |
| commit | 33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch) | |
| tree | cfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenstore/cache/cachedisklayer.cpp | |
| parent | Fix changelog merge issues (diff) | |
| parent | avoid new in static IoBuffer (#472) (diff) | |
| download | zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip | |
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 320 |
1 files changed, 250 insertions, 70 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 15a1c9650..cacbbd966 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,28 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +UpdateValueWithRawSizeAndHash(ZenCacheValue& Value) +{ + if ((Value.RawSize == 0) && (Value.RawHash == IoHash::Zero)) + { + if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize); + } + else + { + Value.RawSize = Value.Value.GetSize(); + Value.RawHash = IoHash::HashBuffer(Value.Value); + return true; + } + } + else + { + return true; + } +} + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -348,11 +370,20 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + if (KeyArray.Num() != Count) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) array size and 'Count' ({}) in {}, skipping metadata", + KeyArray.Num(), + Count, + ManifestPath); + return; + } + std::vector<PayloadIndex> KeysIndexes; KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) @@ -367,19 +398,43 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) + if (KeysIndexes.size() != TimeStampArray.Num()) { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'Timestamps' ({}) arrays in {}, skipping timestamps", + KeysIndexes.size(), + TimeStampArray.Num(), + ManifestPath); + } + else + { + for (CbFieldView& TimeStampView : TimeStampArray) { - AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } } } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) + if (RawHashArray.Num() != KeysIndexes.size()) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawHash' ({}) arrays in {}, skipping meta data", + KeysIndexes.size(), + RawHashArray.Num(), + ManifestPath); + } + else if (RawSizeArray.Num() != KeysIndexes.size()) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawSize' ({}) arrays in {}, skipping meta data", + KeysIndexes.size(), + RawSizeArray.Num(), + ManifestPath); + } + else { auto RawHashIt = RawHashArray.CreateViewIterator(); auto RawSizeIt = RawSizeArray.CreateViewIterator(); @@ -404,10 +459,6 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck RawSizeIt++; } } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } } Oid @@ -747,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); + m_Gc.AddGcStorage(this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -761,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket() { ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); } + m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferencer(*this); } @@ -1286,20 +1339,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; + bool Overwrite; }; - std::vector<IoBuffer> Buffers; - std::vector<Entry> Entries; - std::vector<size_t> EntryResultIndexes; + std::vector<ZenCacheValue> Buffers; + std::vector<Entry> Entries; + std::vector<size_t> EntryResultIndexes; - std::vector<bool>& OutResults; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1315,23 +1369,40 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector<uint8_t> EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector<uint8_t> EntryFlags; + std::vector<size_t> BufferToEntryIndexes; + std::vector<IoBuffer> BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); + + ZenCacheValue& Value = Batch->Buffers[Index]; + std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], Value, Batch->Entries[Index].Overwrite, OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(Value.Value); + + uint8_t Flags = 0; + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; { @@ -1339,8 +1410,9 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept for (size_t Index = 0; Index < Locations.size(); Index++) { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + const std::vector<IoHash>& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -1375,12 +1447,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; - } IndexOffset += Locations.size(); }); } @@ -1876,30 +1942,128 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + ZenCacheValue& InOutValue, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) +{ + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + const PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + const DiskLocation Location = m_Payloads[EntryIndex].Location; + + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + IndexLock.ReleaseNow(); + if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue)) + { + OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"}; + return true; + } + else if (MetaData.RawSize != InOutValue.RawSize || MetaData.RawHash != InOutValue.RawHash) + { + OutPutResult = PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; + } + return false; + } + } + + ZenCacheValue ExistingValue; + if (Payload->MemCached) + { + ExistingValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; + IndexLock.ReleaseNow(); + } + else + { + IndexLock.ReleaseNow(); + + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ExistingValue.Value = GetStandaloneCacheValue(Location, HashKey); + } + else + { + ExistingValue.Value = GetInlineCacheValue(Location); + } + } + + if (ExistingValue.Value) + { + if (cache::impl::UpdateValueWithRawSizeAndHash(ExistingValue)) + { + if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue)) + { + OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"}; + return true; + } + + if (ExistingValue.RawSize != InOutValue.RawSize || ExistingValue.RawHash != InOutValue.RawHash) + { + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", + ExistingValue.RawSize, + ExistingValue.RawHash)}; + return true; + } + } + } + } + } + return false; +} + +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + PutResult Result{zen::PutStatus::Success}; + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - PutStandaloneCacheValue(HashKey, Value, References); + ZenCacheValue AcceptedValue = Value; + if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } + PutStandaloneCacheValue(HashKey, AcceptedValue, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; + return Result; } uint64_t @@ -2425,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, @@ -2748,38 +2912,49 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { - OptionalBatchHandle->Buffers.push_back(Value.Value); + OptionalBatchHandle->Buffers.push_back(Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); - return; + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); + return Result; } + + ZenCacheValue AcceptedValue = Value; + if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result)) + { + return Result; + } + uint8_t EntryFlags = 0; - if (Value.Value.GetContentType() == ZenContentType::kCbObject) + if (AcceptedValue.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + else if (AcceptedValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), - Value.Value.Size(), + m_BlockStore.WriteChunk(AcceptedValue.Value.Data(), + AcceptedValue.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { ZEN_MEMSCOPE(GetCacheDiskTag()); @@ -2816,6 +2991,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string @@ -3752,7 +3928,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3811,13 +3987,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<bool>& OutResults; + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults) { return new PutBatchHandle(OutResults); } @@ -3954,21 +4130,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void @@ -4241,8 +4419,9 @@ ZenCacheDiskLayer::Flush() } } +#if ZEN_WITH_TESTS void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); @@ -4253,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { -#if 1 +# if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else +# else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); -#endif +# endif } for (auto& Result : Results) @@ -4275,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) } } } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheDiskLayer::StorageSize() const +CacheStoreSize +ZenCacheDiskLayer::TotalSize() const { - GcStorageSize StorageSize{}; + CacheStoreSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4295,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); @@ -4319,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4334,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()}; } return {}; } |