diff options
| author | zousar <[email protected]> | 2025-06-24 00:42:13 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 00:42:13 -0600 |
| commit | 081b55a5cf3d9af66d4d0be64fc38ea0055acede (patch) | |
| tree | bb192956333884a7d724e9d924980912d7cf668b /src | |
| parent | Establish TODOs and unit test for rejected PUT propagation (diff) | |
| download | zen-081b55a5cf3d9af66d4d0be64fc38ea0055acede.tar.xz zen-081b55a5cf3d9af66d4d0be64fc38ea0055acede.zip | |
Change to PutResult structure
Result structure contains status and a string message (may be empty)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 89 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 67 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 157 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 32 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 11 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacheshared.h | 8 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 40 |
8 files changed, 258 insertions, 166 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 932e456d2..224cc6678 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,9 +996,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - if (m_CacheStore - .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = + m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1086,15 +1088,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - if (m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; @@ -1204,6 +1207,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::InsufficientStorage); } + auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) { + ZEN_UNUSED(PutResult); + + HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError; + switch (PutResult.Status) + { + case zen::PutStatus::Conflict: + ResponseCode = HttpResponseCode::Conflict; + break; + case zen::PutStatus::Invalid: + ResponseCode = HttpResponseCode::BadRequest; + break; + } + + return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode) + : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message); + }; + const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); @@ -1234,16 +1255,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con } const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; @@ -1296,16 +1318,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; @@ -1405,9 +1428,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d3748e70f..72a767645 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -234,6 +234,25 @@ using namespace std::literals; namespace zen::cache::impl { static bool +GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + OutValueRawSize = Value.RawSize; + OutValueRawHash = Value.RawHash; + return true; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); + } + + OutValueRawSize = Value.Value.GetSize(); + OutValueRawHash = IoHash::HashBuffer(Value.Value); + return true; +} + +static bool ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider) { if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) @@ -1257,7 +1276,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct Entry { std::vector<IoHash> HashKeyAndReferences; @@ -1266,11 +1285,11 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle std::vector<Entry> Entries; std::vector<size_t> EntryResultIndexes; - std::vector<bool>& OutResults; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1350,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; + Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; } IndexOffset += Locations.size(); }); @@ -1829,7 +1848,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, @@ -1862,9 +1881,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); } - return false; + return PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; } ComparisonComplete = true; } @@ -1876,11 +1899,17 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back( + PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); } - return false; + return PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; } } } @@ -1891,7 +1920,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else @@ -1900,7 +1929,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; - return true; + return PutResult{zen::PutStatus::Success}; } uint64_t @@ -2742,7 +2771,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); @@ -3717,7 +3746,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3776,13 +3805,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<bool>& OutResults; + RwLock Lock; + std::vector<BucketHandle> BucketHandles; + std::vector<ZenCacheDiskLayer::PutResult>& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults) { return new PutBatchHandle(OutResults); } @@ -3919,7 +3948,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -3928,7 +3957,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - bool RetVal = false; + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5b36437f2..20c244250 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -327,13 +327,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest); - if (Result == PutResult::Invalid) + if (Result == PutStatus::Invalid) { return CbPackage{}; } - Results.push_back(Result == PutResult::Success); + Results.push_back(Result == PutStatus::Success); } if (Results.empty()) { @@ -353,7 +353,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return RpcResponse; } -PutResult +PutStatus CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { CbObjectView Record = Request.RecordObject; @@ -415,7 +415,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (Count.Invalid > 0) { - return PutResult::Invalid; + return PutStatus::Invalid; } ZenCacheValue CacheValue; @@ -425,16 +425,17 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(Request.Context, - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return PutResult::Conflict; + return PutResult.Status; } m_CacheStats.WriteCount++; @@ -472,7 +473,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } - return PutResult::Success; + return PutStatus::Success; } CbPackage @@ -772,14 +773,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -928,11 +930,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> BatchResults; - std::vector<size_t> BatchResultIndexes; - std::vector<bool> Results; - std::vector<CacheKey> UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector<ZenCacheStore::PutResult> BatchResults; + std::vector<size_t> BatchResultIndexes; + std::vector<ZenCacheStore::PutResult> Results; + std::vector<CacheKey> UpstreamCacheKeys; + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr<ZenCacheStore::PutBatch> Batch; @@ -987,32 +989,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { RawSize = Chunk.DecodeRawSize(); } - bool PutSucceeded = m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - Batch.get()); - if (PutSucceeded) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } if (Batch) { BatchResultIndexes.push_back(Results.size()); - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail}); } else { - Results.push_back(PutSucceeded); + Results.push_back(PutResult); } TransferredSize = Chunk.GetCompressedSize(); } else { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); } Valid = true; } @@ -1028,12 +1030,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); Valid = true; } else { - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)}); } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. @@ -1068,13 +1070,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { size_t BatchResultIndex = BatchResultIndexes[Index]; ZEN_ASSERT(BatchResultIndex < Results.size()); - ZEN_ASSERT(Results[BatchResultIndex] == false); + ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success); Results[BatchResultIndex] = BatchResults[Index]; } for (std::size_t Index = 0; Index < Results.size(); Index++) { - if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); @@ -1084,11 +1086,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + bool bAnyErrors = false; + for (const ZenCacheStore::PutResult& Value : Results) { - ResponseObject.AddBool(Value); + if (Value.Status == zen::PutStatus::Success) + { + ResponseObject.AddBool(true); + } + else + { + bAnyErrors = true; + ResponseObject.AddBool(false); + } } ResponseObject.EndArray(); + if (bAnyErrors) + { + ResponseObject.BeginArray("ErrorMessages"sv); + for (const ZenCacheStore::PutResult& Value : Results) + { + if (Value.Status != zen::PutStatus::Success) + { + ResponseObject.AddString(Value.Message); + } + } + ResponseObject.EndArray(); + } CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); @@ -1259,15 +1282,16 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - if (m_CacheStore.Put( - Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1565,14 +1589,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - Namespace, - Key.Bucket, - Key.Hash, - {.Value = Record.CacheValue}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -1794,14 +1819,16 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, } else { - if (m_CacheStore.Put(Context, + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(Context, Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, {}, Overwrite, - nullptr)) + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 25fd23b93..a3f80099f 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -154,7 +154,7 @@ struct ZenCacheNamespace::PutBatchHandle }; ZenCacheNamespace::PutBatchHandle* -ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult) +ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult) { ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle; Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); @@ -252,7 +252,7 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -bool +ZenCacheNamespace::PutResult ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -269,8 +269,8 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); - if (RetVal) + PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal.Status == zen::PutStatus::Success) { m_WriteCount++; } @@ -558,7 +558,7 @@ ZenCacheStore::LogWorker() } } -ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult) +ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult) : m_CacheStore(CacheStore) { ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -721,7 +721,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -bool +ZenCacheStore::PutResult ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, @@ -736,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return false; + return PutResult{zen::PutStatus::Invalid, "Bad bucket name"}; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -766,8 +766,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); - if (RetVal) + PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal.Status == zen::PutStatus::Success) { m_WriteCount++; } @@ -783,7 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); - return false; + return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)}; } bool diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 239b0d1aa..4f5c905ee 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -166,6 +166,12 @@ public: uint64_t MemorySize; }; + struct PutResult + { + zen::PutStatus Status; + std::string Message; + }; + explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); @@ -176,19 +182,19 @@ public: void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - bool Put(std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle); - bool Drop(); - bool DropBucket(std::string_view Bucket); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutResult Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + bool Drop(); + bool DropBucket(std::string_view Bucket); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); void DiscoverBuckets(); GcStorageSize StorageSize() const; @@ -230,9 +236,9 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - bool Put(const IoHash& HashKey, + PutResult Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, bool Overwrite, diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index d489a5386..104746aba 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -4,6 +4,7 @@ #include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zenstore/cache/cacheshared.h> #include <zenutil/cache/cache.h> #include <atomic> @@ -56,14 +57,6 @@ struct CacheStats std::atomic_uint64_t RpcChunkBatchRequests{}; }; -enum class PutResult -{ - Success, - Fail, - Conflict, - Invalid, -}; - /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. @@ -108,7 +101,7 @@ private: CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest); - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(std::string& Namespace, diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 9b45c7b21..dc0c341d0 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -65,6 +65,14 @@ struct CacheContentStats std::vector<IoHash> Attachments; }; +enum class PutStatus +{ + Success, + Fail, + Conflict, + Invalid, +}; + bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 1b5e0b76b..581f7861b 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -78,25 +78,27 @@ public: ZenCacheDiskLayer::DiskStats DiskStats; }; + using PutResult = ZenCacheDiskLayer::PutResult; + ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults); + PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults); void EndPutBatch(PutBatchHandle* Batch) noexcept; struct GetBatchHandle; GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResults); void EndGetBatch(GetBatchHandle* Batch) noexcept; - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - bool Put(std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle = nullptr); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); + PutResult Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); void EnumerateBucketContents(std::string_view Bucket, @@ -197,6 +199,8 @@ public: std::vector<NamedNamespaceStats> NamespaceStats; }; + using PutResult = ZenCacheNamespace::PutResult; + ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& BasePath, @@ -207,7 +211,7 @@ public: class PutBatch { public: - PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult); + PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult); ~PutBatch(); private: @@ -244,14 +248,14 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - bool Put(const CacheRequestContext& Context, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatch* OptionalBatchHandle = nullptr); + PutResult Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); |