aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZousar Shaker <[email protected]>2025-08-08 12:51:37 -0600
committerGitHub Enterprise <[email protected]>2025-08-08 12:51:37 -0600
commitb23e5165112848284fbc0c62fdeb2e6207693e9f (patch)
tree1dff0546e0ae7ae31a8cf0f36771639a4e68d19c /src
parent5.6.15 (diff)
parentMerge branch 'main' into zs/put-overwrite-policy (diff)
downloadzen-b23e5165112848284fbc0c62fdeb2e6207693e9f.tar.xz
zen-b23e5165112848284fbc0c62fdeb2e6207693e9f.zip
Merge pull request #434 from ue-foundation/zs/put-overwrite-policy
Zs/put overwrite policy
Diffstat (limited to 'src')
-rw-r--r--src/zenserver-test/zenserver-test.cpp354
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp116
-rw-r--r--src/zenserver/config.cpp8
-rw-r--r--src/zenserver/config.h1
-rw-r--r--src/zenserver/zenserver.cpp2
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp227
-rw-r--r--src/zenstore/cache/cacherpc.cpp234
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp66
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h46
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h10
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h8
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h38
12 files changed, 896 insertions, 214 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index d0c73f91b..9ccdf3b5e 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -553,9 +553,13 @@ namespace utils {
return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)};
}
- static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort)
+ static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "")
{
- return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
+ return New(Port,
+ fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}",
+ Args,
+ Args.length() > 0 ? " " : "",
+ UpstreamPort));
}
static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug)
@@ -1301,8 +1305,10 @@ TEST_CASE("zcache.rpc")
std::string_view Namespace,
std::string_view Bucket,
size_t Num,
- size_t PayloadSize = 1024,
- size_t KeyOffset = 1) -> std::vector<CacheKey> {
+ size_t PayloadSize = 1024,
+ size_t KeyOffset = 1,
+ CachePolicy PutPolicy = CachePolicy::Default,
+ std::vector<CbPackage>* OutPackages = nullptr) -> std::vector<CacheKey> {
std::vector<zen::CacheKey> OutKeys;
for (uint32_t Key = 1; Key <= Num; ++Key)
@@ -1312,7 +1318,7 @@ TEST_CASE("zcache.rpc")
const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
- AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy);
OutKeys.push_back(CacheKey);
CbPackage Package;
@@ -1324,6 +1330,10 @@ TEST_CASE("zcache.rpc")
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
CHECK(Result.status_code == 200);
+ if (OutPackages)
+ {
+ OutPackages->emplace_back(std::move(Package));
+ }
}
return OutKeys;
@@ -1502,6 +1512,340 @@ TEST_CASE("zcache.rpc")
}
}
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(!ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and not overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and not overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<CbPackage> Packages;
+ std::vector<zen::CacheKey> Keys =
+ PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages);
+
+ for (const CbPackage& Package : Packages)
+ {
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and unchanged in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and unchanged in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ // TODO: Propagation for rejected PUTs
+ // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to
+ // upstream")
+ // {
+ // using namespace utils;
+
+ // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ // ZenServerInstance UpstreamServer(TestEnv);
+ // SpawnServer(UpstreamServer, UpstreamCfg);
+
+ // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ // ZenServerInstance LocalServer(TestEnv);
+ // SpawnServer(LocalServer, LocalCfg);
+
+ // size_t PayloadSize = 1024;
+ // std::string_view Namespace("ue4.ddc"sv);
+ // std::string_view Bucket("mastodon"sv);
+ // const size_t NumRecords = 4;
+ // std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1,
+ // CachePolicy::Local);
+
+ // for (const zen::CacheKey& CacheKey : Keys)
+ // {
+ // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ // CbPackage Package;
+ // CHECK(Request.Format(Package));
+
+ // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ // cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ // CHECK(Result.status_code == 200);
+ // cacherequests::PutCacheRecordsResult ParsedResult;
+ // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ // CHECK(!Response.IsNull());
+ // CHECK(ParsedResult.Parse(Response));
+ // for (bool ResponseSuccess : ParsedResult.Success)
+ // {
+ // CHECK(!ResponseSuccess);
+ // }
+ // }
+
+ // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) {
+ // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ // CHECK(Result.Result.Results.size() == Keys.size());
+
+ // for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ // {
+ // CHECK(Record);
+ // const CacheKey& ExpectedKey = Keys[Index++];
+ // CHECK(Record->Key == ExpectedKey);
+ // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ // {
+ // CHECK(Value.RawSize == ExpectedPayloadSize);
+ // }
+ // }
+ // };
+
+ // // Check that the records are present and not overwritten in the local server
+ // CheckRecordCorrectness(LocalCfg, PayloadSize);
+
+ // // Check that the records are present and are the newer size in the upstream server
+ // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2);
+ // }
+
SUBCASE("RpcAcceptOptions")
{
using namespace utils;
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index bb0c55618..acb6053d9 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -997,8 +997,14 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr);
- m_CacheStats.WriteCount++;
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore
+ .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
else if (AcceptType == ZenContentType::kCbPackage)
@@ -1083,30 +1089,35 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue,
- ReferencedAttachments,
- nullptr);
- m_CacheStats.WriteCount++;
-
- if (!WriteAttachmentBuffers.empty())
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
- std::vector<CidStore::InsertResult> InsertResults =
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (const CidStore::InsertResult& Result : InsertResults)
+ m_CacheStats.WriteCount++;
+
+ if (!WriteAttachmentBuffers.empty())
{
- if (Result.New)
+ std::vector<CidStore::InsertResult> InsertResults =
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
{
- Count.New++;
+ if (Result.New)
+ {
+ Count.New++;
+ }
}
}
- }
- WriteAttachmentBuffers = {};
- WriteRawHashes = {};
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
+ }
}
BinaryWriter MemStream;
@@ -1197,6 +1208,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
}
+ auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) {
+ ZEN_UNUSED(PutResult);
+
+ HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError;
+ switch (PutResult.Status)
+ {
+ case zen::PutStatus::Conflict:
+ ResponseCode = HttpResponseCode::Conflict;
+ break;
+ case zen::PutStatus::Invalid:
+ ResponseCode = HttpResponseCode::BadRequest;
+ break;
+ }
+
+ return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode)
+ : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message);
+ };
+
const HttpContentType ContentType = Request.RequestContentType();
Body.SetContentType(ContentType);
@@ -1225,13 +1254,20 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
RawHash = IoHash::HashBuffer(SharedBuffer(Body));
}
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {},
- nullptr);
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
@@ -1280,7 +1316,21 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
TotalCount++;
});
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr);
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
@@ -1373,10 +1423,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+
ZenCacheValue CacheValue;
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 1f9ae5fb6..d53bedad0 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -518,6 +518,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled);
LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"sv);
LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"sv);
+ LuaOptions.AddOption("cache.limitoverwrites"sv, ServerOptions.StructuredCacheConfig.LimitOverwrites, "cache-limit-overwrites"sv);
LuaOptions.AddOption("cache.memlayer.sizethreshold"sv,
ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold,
@@ -1060,6 +1061,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.AccessLogEnabled)->default_value("false"),
"");
+ options.add_option("cache",
+ "",
+ "cache-limit-overwrites",
+ "Whether to require policy flag pattern before allowing overwrites",
+ cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.LimitOverwrites)->default_value("false"),
+ "");
+
options.add_option(
"cache",
"",
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 9753e3ae2..7d90aa4c1 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -132,6 +132,7 @@ struct ZenStructuredCacheConfig
bool Enabled = true;
bool WriteLogEnabled = false;
bool AccessLogEnabled = false;
+ bool LimitOverwrites = false;
std::vector<std::pair<std::string, ZenStructuredCacheBucketConfig>> PerBucketConfigs;
ZenStructuredCacheBucketConfig BucketConfig;
uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024;
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 54da51d77..6c9f2ed21 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -552,6 +552,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
Config.AllowAutomaticCreationOfNamespaces = true;
Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled,
.EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled};
+
for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs)
{
const std::string& BucketName = It.first;
@@ -569,6 +570,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold,
Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold =
ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.LimitOverwrites;
Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes;
Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds;
Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index fdf879e1f..e0030230f 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -233,6 +233,61 @@ using namespace std::literals;
namespace zen::cache::impl {
+static bool
+GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ OutValueRawSize = Value.RawSize;
+ OutValueRawHash = Value.RawHash;
+ return true;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize);
+ }
+
+ OutValueRawSize = Value.Value.GetSize();
+ OutValueRawHash = IoHash::HashBuffer(Value.Value);
+ return true;
+}
+
+static bool
+ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash));
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t ValueRawSize = 0;
+ IoHash ValueRawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) &&
+ (RawHashProvider() == ValueRawHash);
+ }
+
+ return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value));
+}
+
+static bool
+ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2)
+{
+ if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero))
+ {
+ return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; });
+ }
+ else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t Value1RawSize = 0;
+ IoHash Value1RawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) &&
+ ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; });
+ }
+
+ return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); });
+}
+
class BucketManifestSerializer
{
using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
@@ -1315,20 +1370,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
+ bool Overwrite;
};
std::vector<IoBuffer> Buffers;
std::vector<Entry> Entries;
std::vector<size_t> EntryResultIndexes;
- std::vector<bool>& OutResults;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch");
return new PutBatchHandle(OutResults);
@@ -1344,23 +1400,41 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
ZEN_ASSERT(Batch);
if (!Batch->Buffers.empty())
{
- std::vector<uint8_t> EntryFlags;
- for (const IoBuffer& Buffer : Batch->Buffers)
+ ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size());
+ std::vector<uint8_t> EntryFlags;
+ std::vector<size_t> BufferToEntryIndexes;
+ std::vector<IoBuffer> BuffersToCommit;
+ BuffersToCommit.reserve(Batch->Buffers.size());
+ for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
- uint8_t Flags = 0;
- if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
+
+ ZenCacheValue TemporaryValue;
+ TemporaryValue.Value = Batch->Buffers[Index];
+ std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]];
+ OutResult = PutResult{zen::PutStatus::Success};
+ if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult))
{
- Flags |= DiskLocation::kStructured;
- }
- else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
- {
- Flags |= DiskLocation::kCompressed;
+ BufferToEntryIndexes.push_back(Index);
+ BuffersToCommit.push_back(TemporaryValue.Value);
+
+ uint8_t Flags = 0;
+ if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
}
- EntryFlags.push_back(Flags);
}
size_t IndexOffset = 0;
- m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
{
@@ -1368,7 +1442,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
for (size_t Index = 0; Index < Locations.size(); Index++)
{
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ const std::vector<IoHash>& HashKeyAndReferences =
+ Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
@@ -1404,12 +1479,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
}
m_SlogFile.Append(DiskEntries);
- for (size_t Index = 0; Index < Locations.size(); Index++)
- {
- size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
- ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = true;
- }
IndexOffset += Locations.size();
});
}
@@ -1905,30 +1974,105 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-void
+bool
+ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<const IoHash> References,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult)
+{
+ ZEN_UNUSED(References);
+
+ const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite;
+ if (CheckExisting)
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+
+ bool ComparisonComplete = false;
+ const BucketPayload* Payload = &m_Payloads[EntryIndex];
+ if (Payload->MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData];
+ if (MetaData)
+ {
+ if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; }))
+ {
+ OutPutResult = PutResult{
+ zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
+ return true;
+ }
+ ComparisonComplete = true;
+ }
+ }
+
+ if (!ComparisonComplete)
+ {
+ // We must release the index lock before calling Get as that will acquire it.
+ // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions
+ // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we
+ // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing
+ // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to
+ // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock.
+ IndexLock.ReleaseNow();
+ ZenCacheValue ExistingValue;
+ if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue))
+ {
+ uint64_t RawSize = 0;
+ IoHash RawHash = IoHash::Zero;
+ cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash);
+ OutPutResult = PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)};
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+ PutResult Result{zen::PutStatus::Success};
+
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(Result);
+ }
+ return Result;
+ }
PutStandaloneCacheValue(HashKey, Value, References);
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(true);
+ OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success});
}
}
else
{
- PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
+ Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle);
}
m_DiskWriteCount++;
+ return Result;
}
uint64_t
@@ -2777,25 +2921,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
return {};
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
+ PutResult Result{zen::PutStatus::Success};
if (OptionalBatchHandle != nullptr)
{
OptionalBatchHandle->Buffers.push_back(Value.Value);
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
+ PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
+ CurrentEntry.Overwrite = Overwrite;
std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
HashKeyAndReferences.reserve(1 + References.size());
HashKeyAndReferences.push_back(HashKey);
HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end());
- return;
+ return Result;
+ }
+
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ return Result;
}
+
uint8_t EntryFlags = 0;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
@@ -2845,6 +2999,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
+ return Result;
}
std::string
@@ -3781,7 +3936,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
struct ZenCacheDiskLayer::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3840,13 +3995,13 @@ struct ZenCacheDiskLayer::PutBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<bool>& OutResults;
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::PutBatchHandle*
-ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults)
{
return new PutBatchHandle(OutResults);
}
@@ -3983,21 +4138,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
}
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
-
+ PutResult RetVal = {zen::PutStatus::Fail};
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
- Bucket->Put(HashKey, Value, References, BucketBatchHandle);
+ RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle);
TryMemCacheTrim();
}
+ return RetVal;
}
void
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index de4b0a37c..5d9a68919 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -334,13 +334,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+ PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutResult::Invalid)
+ if (Result == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutResult::Success);
+ Results.push_back(Result == PutStatus::Success);
}
if (Results.empty())
{
@@ -360,7 +360,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return RpcResponse;
}
-PutResult
+PutStatus
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -422,14 +422,28 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutResult::Invalid;
+ return PutStatus::Invalid;
}
ZenCacheValue CacheValue;
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
+ bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
+ !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return PutResult.Status;
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -466,7 +480,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
.Key = Request.Key,
.ValueContentIds = std::move(ValidAttachments)});
}
- return PutResult::Success;
+ return PutStatus::Success;
}
CbPackage
@@ -765,18 +779,24 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal);
std::vector<IoHash> ReferencedAttachments;
ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- nullptr);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -924,10 +944,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- eastl::fixed_vector<size_t, 32> BatchResultIndexes;
- eastl::fixed_vector<bool, 32> Results;
- eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ eastl::fixed_vector<size_t, 32> BatchResultIndexes;
+ eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
+ eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
uint64_t RequestCount = RequestsArray.Num();
{
@@ -977,34 +997,39 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
if (RawSize == 0)
{
RawSize = Chunk.DecodeRawSize();
}
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Batch.get());
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail});
}
else
{
- Results.push_back(true);
+ Results.push_back(PutResult);
}
TransferredSize = Chunk.GetCompressedSize();
}
else
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
}
Valid = true;
}
@@ -1020,12 +1045,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
Valid = true;
}
else
{
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1060,13 +1085,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
size_t BatchResultIndex = BatchResultIndexes[Index];
ZEN_ASSERT(BatchResultIndex < Results.size());
- ZEN_ASSERT(Results[BatchResultIndex] == false);
+ ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success);
Results[BatchResultIndex] = BatchResults[Index];
}
for (std::size_t Index = 0; Index < Results.size(); Index++)
{
- if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
@@ -1076,11 +1101,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ bool bAnyErrors = false;
+ for (const ZenCacheStore::PutResult& Value : Results)
{
- ResponseObject.AddBool(Value);
+ if (Value.Status == zen::PutStatus::Success)
+ {
+ ResponseObject.AddBool(true);
+ }
+ else
+ {
+ bAnyErrors = true;
+ ResponseObject.AddBool(false);
+ }
}
ResponseObject.EndArray();
+ if (bAnyErrors)
+ {
+ ResponseObject.BeginArray("ErrorMessages"sv);
+ for (const ZenCacheStore::PutResult& Value : Results)
+ {
+ if (Value.Status != zen::PutStatus::Success)
+ {
+ ResponseObject.AddString(Value.Message);
+ }
+ }
+ ResponseObject.EndArray();
+ }
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
@@ -1239,6 +1285,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal);
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -1249,14 +1296,19 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
@@ -1527,36 +1579,48 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
- CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal);
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1774,20 +1838,26 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
m_CidStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index d956384ca..973af52b2 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -154,7 +154,7 @@ struct ZenCacheNamespace::PutBatchHandle
};
ZenCacheNamespace::PutBatchHandle*
-ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
+ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult)
{
ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
@@ -252,11 +252,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
return;
}
-void
+ZenCacheNamespace::PutResult
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put");
@@ -268,8 +269,12 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
- m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
- m_WriteCount++;
+ PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ return RetVal;
}
bool
@@ -557,7 +562,7 @@ ZenCacheStore::LogWorker()
}
}
-ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult)
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult)
: m_CacheStore(CacheStore)
{
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -720,13 +725,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
m_MissCount++;
}
-void
+ZenCacheStore::PutResult
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatch* OptionalBatchHandle)
{
// Ad hoc rejection of known bad usage patterns for DDC bucket names
@@ -734,7 +740,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return;
+ return PutResult{zen::PutStatus::Invalid, "Bad bucket name"};
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -764,9 +770,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
- Store->Put(Bucket, HashKey, Value, References, BatchHandle);
- m_WriteCount++;
- return;
+ PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ else
+ {
+ m_RejectedWriteCount++;
+ }
+ return RetVal;
}
ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
@@ -774,6 +787,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
+ return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)};
}
bool
@@ -1378,7 +1392,7 @@ TEST_CASE("cachestore.store")
Value.Value = Obj.GetBuffer().AsIoBuffer();
Value.Value.SetContentType(ZenContentType::kCbObject);
- Zcs.Put("test_bucket"sv, Key, Value, {});
+ Zcs.Put("test_bucket"sv, Key, Value, {}, false);
}
for (int i = 0; i < kIterationCount; ++i)
@@ -1432,7 +1446,7 @@ TEST_CASE("cachestore.size")
const size_t Bucket = Key % 4;
std::string BucketName = fmt::format("test_bucket-{}", Bucket);
IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
CacheSize = Zcs.StorageSize();
@@ -1486,7 +1500,7 @@ TEST_CASE("cachestore.size")
for (size_t Key = 0; Key < Count; ++Key)
{
const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
CacheSize = Zcs.StorageSize();
@@ -1569,7 +1583,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : Chunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
});
}
@@ -1650,7 +1664,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
});
@@ -1755,14 +1769,14 @@ TEST_CASE("cachestore.namespaces")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false);
ZenCacheValue GetValue;
CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
// This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false);
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
}
@@ -1778,7 +1792,7 @@ TEST_CASE("cachestore.namespaces")
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false);
ZenCacheValue GetValue;
CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
@@ -1820,7 +1834,7 @@ TEST_CASE("cachestore.drop.bucket")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1893,7 +1907,7 @@ TEST_CASE("cachestore.drop.namespace")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1979,7 +1993,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
size_t Key = Buffer.Size();
IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false);
ZenCacheValue BufferGet;
CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
@@ -1989,7 +2003,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
Buffer2.SetContentType(ZenContentType::kCbObject);
// We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false);
MemoryView OldView = BufferGet.Value.GetView();
@@ -2080,7 +2094,7 @@ TEST_CASE("cachestore.scrub")
AttachmentHashes.push_back(Attachment.DecodeRawHash());
CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
}
- Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false);
}
};
@@ -2129,7 +2143,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = Record.second,
.RawSize = Record.second.GetSize(),
.RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
- AttachmentKeys);
+ AttachmentKeys,
+ false);
for (const auto& Attachment : Attachments)
{
CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
@@ -2145,7 +2160,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = CacheValue.second,
.RawSize = CacheValue.second.GetSize(),
.RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
- {});
+ {},
+ false);
CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
return Key;
};
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 3cd2d6423..023dd1ffa 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -115,6 +115,7 @@ public:
uint32_t PayloadAlignment = 1u << 4;
uint64_t MemCacheSizeThreshold = 1 * 1024;
uint64_t LargeObjectThreshold = 128 * 1024;
+ bool LimitOverwrites = false;
};
struct Configuration
@@ -170,6 +171,12 @@ public:
uint64_t MemorySize;
};
+ struct PutResult
+ {
+ zen::PutStatus Status;
+ std::string Message;
+ };
+
explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
@@ -180,13 +187,14 @@ public:
void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(std::string_view Bucket,
+ PutResult Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle);
std::function<void()> Drop();
std::function<void()> DropBucket(std::string_view Bucket);
@@ -236,10 +244,14 @@ public:
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
- void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle);
- uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+ PutResult Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
std::function<void()> Drop();
void Flush();
void ScrubStorage(ScrubContext& Ctx);
@@ -384,14 +396,20 @@ public:
virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatchHandle* OptionalBatchHandle = nullptr);
- IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
+ bool ShouldRejectPut(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<const IoHash> References,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
+ PutResult PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
+ IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
void SetMetaData(RwLock::ExclusiveLockScope&,
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index da8cf69fe..104746aba 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zenstore/cache/cacheshared.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -56,13 +57,6 @@ struct CacheStats
std::atomic_uint64_t RpcChunkBatchRequests{};
};
-enum class PutResult
-{
- Success,
- Fail,
- Invalid,
-};
-
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
@@ -107,7 +101,7 @@ private:
CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest);
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+ PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
bool ParseGetCacheChunksRequest(std::string& Namespace,
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index ef1b803de..8f40ae727 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -69,6 +69,14 @@ struct CacheContentStats
std::vector<IoHash> Attachments;
};
+enum class PutStatus
+{
+ Success,
+ Fail,
+ Conflict,
+ Invalid,
+};
+
bool IsKnownBadBucketName(std::string_view BucketName);
bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 48fc17960..b6e8e7565 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -78,24 +78,27 @@ public:
ZenCacheDiskLayer::DiskStats DiskStats;
};
+ using PutResult = ZenCacheDiskLayer::PutResult;
+
ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
struct GetBatchHandle;
GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
- void Put(std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatchHandle* OptionalBatchHandle = nullptr);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
+ PutResult Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
void EnumerateBucketContents(std::string_view Bucket,
@@ -196,6 +199,8 @@ public:
std::vector<NamedNamespaceStats> NamespaceStats;
};
+ using PutResult = ZenCacheNamespace::PutResult;
+
ZenCacheStore(GcManager& Gc,
JobQueue& JobQueue,
const std::filesystem::path& BasePath,
@@ -206,7 +211,7 @@ public:
class PutBatch
{
public:
- PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult);
+ PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult);
~PutBatch();
private:
@@ -243,13 +248,14 @@ public:
const IoHash& HashKey,
GetBatch& BatchHandle);
- void Put(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- PutBatch* OptionalBatchHandle = nullptr);
+ PutResult Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatch* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);