diff options
| author | zousar <[email protected]> | 2025-03-02 00:15:35 -0700 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-03-02 00:15:35 -0700 |
| commit | 6d07b0437ccb7800652708f76a7ee84e551f43cf (patch) | |
| tree | 15d4ef7c9b69da20ea5da3dc18240d75bb64d17e /src | |
| parent | Move utility methods in cachedisklayer (diff) | |
| download | zen-6d07b0437ccb7800652708f76a7ee84e551f43cf.tar.xz zen-6d07b0437ccb7800652708f76a7ee84e551f43cf.zip | |
Control overwrite enforcement with a config setting
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 185 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 8 | ||||
| -rw-r--r-- | src/zenserver/config.h | 1 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 1 |
6 files changed, 154 insertions, 47 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index a41eaed75..f469fc60e 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -549,9 +549,13 @@ namespace utils { return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)}; } - static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort) + static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "") { - return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); + return New(Port, + fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", + Args, + Args.length() > 0 ? " " : "", + UpstreamPort)); } static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug) @@ -1503,7 +1507,7 @@ TEST_CASE("zcache.rpc") } } - SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value") + SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites") { using namespace utils; @@ -1519,7 +1523,7 @@ TEST_CASE("zcache.rpc") std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); for (const zen::CacheKey& CacheKey : Keys) { @@ -1530,7 +1534,7 @@ TEST_CASE("zcache.rpc") CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); @@ -1541,28 +1545,103 @@ TEST_CASE("zcache.rpc") CHECK(ParsedResult.Parse(Response)); for (bool ResponseSuccess : ParsedResult.Success) { - CHECK(!ResponseSuccess); + CHECK(ResponseSuccess); } } - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - CHECK(Result.Result.Results.size() == Keys.size()); + CHECK(Result.Result.Results.size() == Keys.size()); - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } + } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) { - CHECK(Value.RawSize == PayloadSize); + CHECK(!ResponseSuccess); } } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + }; + + // Check that the records are present and not overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and not overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); } - SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value") + SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites") { using namespace utils; @@ -1570,7 +1649,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1578,7 +1657,7 @@ TEST_CASE("zcache.rpc") std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); for (const zen::CacheKey& CacheKey : Keys) { @@ -1589,7 +1668,7 @@ TEST_CASE("zcache.rpc") CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); @@ -1604,24 +1683,32 @@ TEST_CASE("zcache.rpc") } } - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - CHECK(Result.Result.Results.size() == Keys.size()); + CHECK(Result.Result.Results.size() == Keys.size()); - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) { - CHECK(Value.RawSize == PayloadSize * 2); + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } } - } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); } - SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value") + SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites") { using namespace utils; @@ -1629,7 +1716,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1638,12 +1725,12 @@ TEST_CASE("zcache.rpc") std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector<CbPackage> Packages; - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages); + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages); for (const CbPackage& Package : Packages) { IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); @@ -1658,21 +1745,29 @@ TEST_CASE("zcache.rpc") } } - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - CHECK(Result.Result.Results.size() == Keys.size()); + CHECK(Result.Result.Results.size() == Keys.size()); - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) { - CHECK(Value.RawSize == PayloadSize); + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } } - } + }; + + // Check that the records are present and unchanged in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and unchanged in the upstream server + CheckRecordCorrectness(UpstreamCfg); } SUBCASE("RpcAcceptOptions") diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 809092378..c8949f5fd 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -408,6 +408,7 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled); LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"sv); LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"sv); + LuaOptions.AddOption("cache.limitoverwrites"sv, ServerOptions.StructuredCacheConfig.LimitOverwrites, "cache-limit-overwrites"sv); LuaOptions.AddOption("cache.memlayer.sizethreshold"sv, ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold, @@ -859,6 +860,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.AccessLogEnabled)->default_value("false"), ""); + options.add_option("cache", + "", + "cache-limit-overwrites", + "Whether to require policy flag pattern before allowing overwrites", + cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.LimitOverwrites)->default_value("false"), + ""); + options.add_option( "cache", "", diff --git a/src/zenserver/config.h b/src/zenserver/config.h index c7781aada..4ce265c78 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -118,6 +118,7 @@ struct ZenStructuredCacheConfig bool Enabled = true; bool WriteLogEnabled = false; bool AccessLogEnabled = false; + bool LimitOverwrites = false; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024; uint64_t MemTrimIntervalSeconds = 60; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index f84bc0b00..cc1be13e2 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -530,7 +530,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) Config.AllowAutomaticCreationOfNamespaces = true; Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled, .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}; - Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold; + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.LimitOverwrites; Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes; Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds; Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 54f0c4bfc..d3748e70f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1840,7 +1840,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - if (!Overwrite) + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) { RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 7de707a7f..239b0d1aa 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -113,6 +113,7 @@ public: uint32_t PayloadAlignment = 1u << 4; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t LargeObjectThreshold = 128 * 1024; + bool LimitOverwrites = false; }; struct Configuration |