diff options
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 227 |
1 files changed, 192 insertions, 35 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index fdf879e1f..e0030230f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,61 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + OutValueRawSize = Value.RawSize; + OutValueRawHash = Value.RawHash; + return true; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); + } + + OutValueRawSize = Value.Value.GetSize(); + OutValueRawHash = IoHash::HashBuffer(Value.Value); + return true; +} + +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -1315,20 +1370,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; + bool Overwrite; }; std::vector<IoBuffer> Buffers; std::vector<Entry> Entries; std::vector<size_t> EntryResultIndexes; - std::vector<bool>& OutResults; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1344,23 +1400,41 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector<uint8_t> EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector<uint8_t> EntryFlags; + std::vector<size_t> BufferToEntryIndexes; + std::vector<IoBuffer> BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[Index]; + std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(TemporaryValue.Value); + + uint8_t Flags = 0; + if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; { @@ -1368,7 +1442,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept for (size_t Index = 0; Index < Locations.size(); Index++) { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + const std::vector<IoHash>& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() >= 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); @@ -1404,12 +1479,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; - } IndexOffset += Locations.size(); }); } @@ -1905,30 +1974,105 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<const IoHash> References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) +{ + ZEN_UNUSED(References); + + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool ComparisonComplete = false; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + { + OutPutResult = PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; + } + ComparisonComplete = true; + } + } + + if (!ComparisonComplete) + { + // We must release the index lock before calling Get as that will acquire it. + // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions + // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we + // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing + // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to + // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock. + IndexLock.ReleaseNow(); + ZenCacheValue ExistingValue; + if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) + { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + return true; + } + } + } + } + return false; +} + +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + PutResult Result{zen::PutStatus::Success}; + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; + return Result; } uint64_t @@ -2777,25 +2921,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); - return; + return Result; + } + + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + return Result; } + uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -2845,6 +2999,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string @@ -3781,7 +3936,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3840,13 +3995,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<bool>& OutResults; + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults) { return new PutBatchHandle(OutResults); } @@ -3983,21 +4138,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void |