diff options
| author | Zousar Shaker <[email protected]> | 2025-08-08 12:51:37 -0600 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-08 12:51:37 -0600 |
| commit | b23e5165112848284fbc0c62fdeb2e6207693e9f (patch) | |
| tree | 1dff0546e0ae7ae31a8cf0f36771639a4e68d19c /src/zenstore/include | |
| parent | 5.6.15 (diff) | |
| parent | Merge branch 'main' into zs/put-overwrite-policy (diff) | |
| download | zen-b23e5165112848284fbc0c62fdeb2e6207693e9f.tar.xz zen-b23e5165112848284fbc0c62fdeb2e6207693e9f.zip | |
Merge pull request #434 from ue-foundation/zs/put-overwrite-policy
Zs/put overwrite policy
Diffstat (limited to 'src/zenstore/include')
4 files changed, 64 insertions, 38 deletions
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 3cd2d6423..023dd1ffa 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -115,6 +115,7 @@ public: uint32_t PayloadAlignment = 1u << 4; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t LargeObjectThreshold = 128 * 1024; + bool LimitOverwrites = false; }; struct Configuration @@ -170,6 +171,12 @@ public: uint64_t MemorySize; }; + struct PutResult + { + zen::PutStatus Status; + std::string Message; + }; + explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); @@ -180,13 +187,14 @@ public: void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(std::string_view Bucket, + PutResult Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle); std::function<void()> Drop(); std::function<void()> DropBucket(std::string_view Bucket); @@ -236,10 +244,14 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + PutResult Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); std::function<void()> Drop(); void Flush(); void ScrubStorage(ScrubContext& Ctx); @@ -384,14 +396,20 @@ public: virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - PutBatchHandle* OptionalBatchHandle = nullptr); - IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + bool ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<const IoHash> References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; + PutResult PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; void SetMetaData(RwLock::ExclusiveLockScope&, diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index da8cf69fe..104746aba 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -4,6 +4,7 @@ #include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zenstore/cache/cacheshared.h> #include <zenutil/cache/cache.h> #include <atomic> @@ -56,13 +57,6 @@ struct CacheStats std::atomic_uint64_t RpcChunkBatchRequests{}; }; -enum class PutResult -{ - Success, - Fail, - Invalid, -}; - /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. @@ -107,7 +101,7 @@ private: CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest); - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(std::string& Namespace, diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index ef1b803de..8f40ae727 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -69,6 +69,14 @@ struct CacheContentStats std::vector<IoHash> Attachments; }; +enum class PutStatus +{ + Success, + Fail, + Conflict, + Invalid, +}; + bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 48fc17960..b6e8e7565 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -78,24 +78,27 @@ public: ZenCacheDiskLayer::DiskStats DiskStats; }; + using PutResult = ZenCacheDiskLayer::PutResult; + ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults); + PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults); void EndPutBatch(PutBatchHandle* Batch) noexcept; struct GetBatchHandle; GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults); void EndGetBatch(GetBatchHandle* Batch) noexcept; - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - void Put(std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - PutBatchHandle* OptionalBatchHandle = nullptr); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); + PutResult Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); void EnumerateBucketContents(std::string_view Bucket, @@ -196,6 +199,8 @@ public: std::vector<NamedNamespaceStats> NamespaceStats; }; + using PutResult = ZenCacheNamespace::PutResult; + ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& BasePath, @@ -206,7 +211,7 @@ public: class PutBatch { public: - PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult); + PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult); ~PutBatch(); private: @@ -243,13 +248,14 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - void Put(const CacheRequestContext& Context, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - PutBatch* OptionalBatchHandle = nullptr); + PutResult Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); |