diff options
| author | zousar <[email protected]> | 2025-06-24 00:42:13 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 00:42:13 -0600 |
| commit | 081b55a5cf3d9af66d4d0be64fc38ea0055acede (patch) | |
| tree | bb192956333884a7d724e9d924980912d7cf668b /src/zenstore/cache | |
| parent | Establish TODOs and unit test for rejected PUT propagation (diff) | |
| download | zen-081b55a5cf3d9af66d4d0be64fc38ea0055acede.tar.xz zen-081b55a5cf3d9af66d4d0be64fc38ea0055acede.zip | |
Change to PutResult structure
Result structure contains status and a string message (may be empty)
Diffstat (limited to 'src/zenstore/cache')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 67 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 157 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 20 |
3 files changed, 150 insertions, 94 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d3748e70f..72a767645 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -234,6 +234,25 @@ using namespace std::literals; namespace zen::cache::impl { static bool +GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + OutValueRawSize = Value.RawSize; + OutValueRawHash = Value.RawHash; + return true; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); + } + + OutValueRawSize = Value.Value.GetSize(); + OutValueRawHash = IoHash::HashBuffer(Value.Value); + return true; +} + +static bool ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider) { if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) @@ -1257,7 +1276,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; @@ -1266,11 +1285,11 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle std::vector<Entry> Entries; std::vector<size_t> EntryResultIndexes; - std::vector<bool>& OutResults; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1350,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; + Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; } IndexOffset += Locations.size(); }); @@ -1829,7 +1848,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, @@ -1862,9 +1881,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); } - return false; + return PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; } ComparisonComplete = true; } @@ -1876,11 +1899,17 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back( + PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); } - return false; + return PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; } } } @@ -1891,7 +1920,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else @@ -1900,7 +1929,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; - return true; + return PutResult{zen::PutStatus::Success}; } uint64_t @@ -2742,7 +2771,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); @@ -3717,7 +3746,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3776,13 +3805,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<bool>& OutResults; + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults) { return new PutBatchHandle(OutResults); } @@ -3919,7 +3948,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -3928,7 +3957,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - bool RetVal = false; + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5b36437f2..20c244250 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -327,13 +327,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest); - if (Result == PutResult::Invalid) + if (Result == PutStatus::Invalid) { return CbPackage{}; } - Results.push_back(Result == PutResult::Success); + Results.push_back(Result == PutStatus::Success); } if (Results.empty()) { @@ -353,7 +353,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return RpcResponse; } -PutResult +PutStatus CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { CbObjectView Record = Request.RecordObject; @@ -415,7 +415,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (Count.Invalid > 0) { - return PutResult::Invalid; + return PutStatus::Invalid; } ZenCacheValue CacheValue; @@ -425,16 +425,17 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(Request.Context, - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return PutResult::Conflict; + return PutResult.Status; } m_CacheStats.WriteCount++; @@ -472,7 +473,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } - return PutResult::Success; + return PutStatus::Success; } CbPackage @@ -772,14 +773,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -928,11 +930,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> BatchResults; - std::vector<size_t> BatchResultIndexes; - std::vector<bool> Results; - std::vector<CacheKey> UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector<ZenCacheStore::PutResult> BatchResults; + std::vector<size_t> BatchResultIndexes; + std::vector<ZenCacheStore::PutResult> Results; + std::vector<CacheKey> UpstreamCacheKeys; + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr<ZenCacheStore::PutBatch> Batch; @@ -987,32 +989,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { RawSize = Chunk.DecodeRawSize(); } - bool PutSucceeded = m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - Batch.get()); - if (PutSucceeded) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } if (Batch) { BatchResultIndexes.push_back(Results.size()); - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail}); } else { - Results.push_back(PutSucceeded); + Results.push_back(PutResult); } TransferredSize = Chunk.GetCompressedSize(); } else { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); } Valid = true; } @@ -1028,12 +1030,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); Valid = true; } else { - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)}); } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. @@ -1068,13 +1070,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { size_t BatchResultIndex = BatchResultIndexes[Index]; ZEN_ASSERT(BatchResultIndex < Results.size()); - ZEN_ASSERT(Results[BatchResultIndex] == false); + ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success); Results[BatchResultIndex] = BatchResults[Index]; } for (std::size_t Index = 0; Index < Results.size(); Index++) { - if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); @@ -1084,11 +1086,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + bool bAnyErrors = false; + for (const ZenCacheStore::PutResult& Value : Results) { - ResponseObject.AddBool(Value); + if (Value.Status == zen::PutStatus::Success) + { + ResponseObject.AddBool(true); + } + else + { + bAnyErrors = true; + ResponseObject.AddBool(false); + } } ResponseObject.EndArray(); + if (bAnyErrors) + { + ResponseObject.BeginArray("ErrorMessages"sv); + for (const ZenCacheStore::PutResult& Value : Results) + { + if (Value.Status != zen::PutStatus::Success) + { + ResponseObject.AddString(Value.Message); + } + } + ResponseObject.EndArray(); + } CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); @@ -1259,15 +1282,16 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - if (m_CacheStore.Put( - Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1565,14 +1589,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - Namespace, - Key.Bucket, - Key.Hash, - {.Value = Record.CacheValue}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -1794,14 +1819,16 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, } else { - if (m_CacheStore.Put(Context, + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(Context, Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, {}, Overwrite, - nullptr)) + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 25fd23b93..a3f80099f 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -154,7 +154,7 @@ struct ZenCacheNamespace::PutBatchHandle }; ZenCacheNamespace::PutBatchHandle* -ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult) +ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult) { ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle; Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); @@ -252,7 +252,7 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -bool +ZenCacheNamespace::PutResult ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -269,8 +269,8 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); - if (RetVal) + PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal.Status == zen::PutStatus::Success) { m_WriteCount++; } @@ -558,7 +558,7 @@ ZenCacheStore::LogWorker() } } -ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult) +ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult) : m_CacheStore(CacheStore) { ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -721,7 +721,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -bool +ZenCacheStore::PutResult ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, @@ -736,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return false; + return PutResult{zen::PutStatus::Invalid, "Bad bucket name"}; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -766,8 +766,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); - if (RetVal) + PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal.Status == zen::PutStatus::Success) { m_WriteCount++; } @@ -783,7 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); - return false; + return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)}; } bool |