aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/include
diff options
context:
space:
mode:
authorZousar Shaker <[email protected]>2025-08-08 12:51:37 -0600
committerGitHub Enterprise <[email protected]>2025-08-08 12:51:37 -0600
commitb23e5165112848284fbc0c62fdeb2e6207693e9f (patch)
tree1dff0546e0ae7ae31a8cf0f36771639a4e68d19c /src/zenstore/include
parent5.6.15 (diff)
parentMerge branch 'main' into zs/put-overwrite-policy (diff)
downloadzen-b23e5165112848284fbc0c62fdeb2e6207693e9f.tar.xz
zen-b23e5165112848284fbc0c62fdeb2e6207693e9f.zip
Merge pull request #434 from ue-foundation/zs/put-overwrite-policy
Zs/put overwrite policy
Diffstat (limited to 'src/zenstore/include')
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h46
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h10
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h8
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h38
4 files changed, 64 insertions, 38 deletions
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 3cd2d6423..023dd1ffa 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -115,6 +115,7 @@ public:
uint32_t PayloadAlignment = 1u << 4;
uint64_t MemCacheSizeThreshold = 1 * 1024;
uint64_t LargeObjectThreshold = 128 * 1024;
+ bool LimitOverwrites = false;
};
struct Configuration
@@ -170,6 +171,12 @@ public:
uint64_t MemorySize;
};
+ struct PutResult
+ {
+ zen::PutStatus Status;
+ std::string Message;
+ };
+
explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
@@ -180,13 +187,14 @@ public:
void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(std::string_view Bucket,
+ PutResult Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle);
std::function<void()> Drop();
std::function<void()> DropBucket(std::string_view Bucket);
@@ -236,10 +244,14 @@ public:
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
- void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle);
- uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+ PutResult Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
std::function<void()> Drop();
void Flush();
void ScrubStorage(ScrubContext& Ctx);
@@ -384,14 +396,20 @@ public:
virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatchHandle* OptionalBatchHandle = nullptr);
- IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
+ bool ShouldRejectPut(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<const IoHash> References,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
+ PutResult PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
+ IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
void SetMetaData(RwLock::ExclusiveLockScope&,
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index da8cf69fe..104746aba 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zenstore/cache/cacheshared.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -56,13 +57,6 @@ struct CacheStats
std::atomic_uint64_t RpcChunkBatchRequests{};
};
-enum class PutResult
-{
- Success,
- Fail,
- Invalid,
-};
-
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
@@ -107,7 +101,7 @@ private:
CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest);
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+ PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
bool ParseGetCacheChunksRequest(std::string& Namespace,
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index ef1b803de..8f40ae727 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -69,6 +69,14 @@ struct CacheContentStats
std::vector<IoHash> Attachments;
};
+enum class PutStatus
+{
+ Success,
+ Fail,
+ Conflict,
+ Invalid,
+};
+
bool IsKnownBadBucketName(std::string_view BucketName);
bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 48fc17960..b6e8e7565 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -78,24 +78,27 @@ public:
ZenCacheDiskLayer::DiskStats DiskStats;
};
+ using PutResult = ZenCacheDiskLayer::PutResult;
+
ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
struct GetBatchHandle;
GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
- void Put(std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatchHandle* OptionalBatchHandle = nullptr);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
+ PutResult Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
void EnumerateBucketContents(std::string_view Bucket,
@@ -196,6 +199,8 @@ public:
std::vector<NamedNamespaceStats> NamespaceStats;
};
+ using PutResult = ZenCacheNamespace::PutResult;
+
ZenCacheStore(GcManager& Gc,
JobQueue& JobQueue,
const std::filesystem::path& BasePath,
@@ -206,7 +211,7 @@ public:
class PutBatch
{
public:
- PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult);
+ PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult);
~PutBatch();
private:
@@ -243,13 +248,14 @@ public:
const IoHash& HashKey,
GetBatch& BatchHandle);
- void Put(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatch* OptionalBatchHandle = nullptr);
+ PutResult Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatch* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);