aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-03-02 00:15:35 -0700
committerzousar <[email protected]>2025-03-02 00:15:35 -0700
commit6d07b0437ccb7800652708f76a7ee84e551f43cf (patch)
tree15d4ef7c9b69da20ea5da3dc18240d75bb64d17e /src
parentMove utility methods in cachedisklayer (diff)
downloadzen-6d07b0437ccb7800652708f76a7ee84e551f43cf.tar.xz
zen-6d07b0437ccb7800652708f76a7ee84e551f43cf.zip
Control overwrite enforcement with a config setting
Diffstat (limited to 'src')
-rw-r--r--src/zenserver-test/zenserver-test.cpp185
-rw-r--r--src/zenserver/config.cpp8
-rw-r--r--src/zenserver/config.h1
-rw-r--r--src/zenserver/zenserver.cpp3
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp3
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h1
6 files changed, 154 insertions, 47 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index a41eaed75..f469fc60e 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -549,9 +549,13 @@ namespace utils {
return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)};
}
- static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort)
+ static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "")
{
- return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
+ return New(Port,
+ fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}",
+ Args,
+ Args.length() > 0 ? " " : "",
+ UpstreamPort));
}
static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug)
@@ -1503,7 +1507,7 @@ TEST_CASE("zcache.rpc")
}
}
- SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value")
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites")
{
using namespace utils;
@@ -1519,7 +1523,7 @@ TEST_CASE("zcache.rpc")
std::string_view Namespace("ue4.ddc"sv);
std::string_view Bucket("mastodon"sv);
const size_t NumRecords = 4;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
for (const zen::CacheKey& CacheKey : Keys)
{
@@ -1530,7 +1534,7 @@ TEST_CASE("zcache.rpc")
CHECK(Request.Format(Package));
IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)},
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
@@ -1541,28 +1545,103 @@ TEST_CASE("zcache.rpc")
CHECK(ParsedResult.Parse(Response));
for (bool ResponseSuccess : ParsedResult.Success)
{
- CHECK(!ResponseSuccess);
+ CHECK(ResponseSuccess);
}
}
- CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
- GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
- CHECK(Result.Result.Results.size() == Keys.size());
+ CHECK(Result.Result.Results.size() == Keys.size());
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
{
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
{
- CHECK(Value.RawSize == PayloadSize);
+ CHECK(!ResponseSuccess);
}
}
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and not overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and not overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
}
- SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value")
+ SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites")
{
using namespace utils;
@@ -1570,7 +1649,7 @@ TEST_CASE("zcache.rpc")
ZenServerInstance UpstreamServer(TestEnv);
SpawnServer(UpstreamServer, UpstreamCfg);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
ZenServerInstance LocalServer(TestEnv);
SpawnServer(LocalServer, LocalCfg);
@@ -1578,7 +1657,7 @@ TEST_CASE("zcache.rpc")
std::string_view Namespace("ue4.ddc"sv);
std::string_view Bucket("mastodon"sv);
const size_t NumRecords = 4;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
for (const zen::CacheKey& CacheKey : Keys)
{
@@ -1589,7 +1668,7 @@ TEST_CASE("zcache.rpc")
CHECK(Request.Format(Package));
IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)},
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
@@ -1604,24 +1683,32 @@ TEST_CASE("zcache.rpc")
}
}
- CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
- GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
- CHECK(Result.Result.Results.size() == Keys.size());
+ CHECK(Result.Result.Results.size() == Keys.size());
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
{
- CHECK(Value.RawSize == PayloadSize * 2);
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
}
- }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
}
- SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value")
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites")
{
using namespace utils;
@@ -1629,7 +1716,7 @@ TEST_CASE("zcache.rpc")
ZenServerInstance UpstreamServer(TestEnv);
SpawnServer(UpstreamServer, UpstreamCfg);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
ZenServerInstance LocalServer(TestEnv);
SpawnServer(LocalServer, LocalCfg);
@@ -1638,12 +1725,12 @@ TEST_CASE("zcache.rpc")
std::string_view Bucket("mastodon"sv);
const size_t NumRecords = 4;
std::vector<CbPackage> Packages;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages);
for (const CbPackage& Package : Packages)
{
IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)},
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
@@ -1658,21 +1745,29 @@ TEST_CASE("zcache.rpc")
}
}
- CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
- GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
- CHECK(Result.Result.Results.size() == Keys.size());
+ CHECK(Result.Result.Results.size() == Keys.size());
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
{
- CHECK(Value.RawSize == PayloadSize);
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
}
- }
+ };
+
+ // Check that the records are present and unchanged in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and unchanged in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
}
SUBCASE("RpcAcceptOptions")
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 809092378..c8949f5fd 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -408,6 +408,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled);
LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"sv);
LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"sv);
+ LuaOptions.AddOption("cache.limitoverwrites"sv, ServerOptions.StructuredCacheConfig.LimitOverwrites, "cache-limit-overwrites"sv);
LuaOptions.AddOption("cache.memlayer.sizethreshold"sv,
ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold,
@@ -859,6 +860,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.AccessLogEnabled)->default_value("false"),
"");
+ options.add_option("cache",
+ "",
+ "cache-limit-overwrites",
+ "Whether to require policy flag pattern before allowing overwrites",
+ cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.LimitOverwrites)->default_value("false"),
+ "");
+
options.add_option(
"cache",
"",
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index c7781aada..4ce265c78 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -118,6 +118,7 @@ struct ZenStructuredCacheConfig
bool Enabled = true;
bool WriteLogEnabled = false;
bool AccessLogEnabled = false;
+ bool LimitOverwrites = false;
uint64_t MemCacheSizeThreshold = 1 * 1024;
uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024;
uint64_t MemTrimIntervalSeconds = 60;
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index f84bc0b00..cc1be13e2 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -530,7 +530,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
Config.AllowAutomaticCreationOfNamespaces = true;
Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled,
.EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled};
- Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold;
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.LimitOverwrites;
Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes;
Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds;
Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 54f0c4bfc..d3748e70f 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1840,7 +1840,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
- if (!Overwrite)
+ const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite;
+ if (CheckExisting)
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
auto It = m_Index.find(HashKey);
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 7de707a7f..239b0d1aa 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -113,6 +113,7 @@ public:
uint32_t PayloadAlignment = 1u << 4;
uint64_t MemCacheSizeThreshold = 1 * 1024;
uint64_t LargeObjectThreshold = 128 * 1024;
+ bool LimitOverwrites = false;
};
struct Configuration