From a38adbd74f70a931884affc00d321e7a2a7cf99b Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Tue, 18 Feb 2025 17:29:47 -0700 Subject: Add unit test for overwrite behavior --- src/zenserver-test/zenserver-test.cpp | 102 ++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 6259c0f37..466f3d6ca 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1498,6 +1498,108 @@ TEST_CASE("zcache.rpc") } } + SUBCASE("policy - 'QueryLocal' on put denies overwrite") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize*2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 409); + } + + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + } + + SUBCASE("policy - no 'QueryLocal' on put allows overwrite") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + } + + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize*2); + } + } + } + SUBCASE("RpcAcceptOptions") { using namespace utils; -- cgit v1.2.3 From 86c8b70f01090698d0d698840d6aad666747ee22 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 10:46:37 -0700 Subject: Expand and fix unit tests for overwrite behavior --- src/zenserver-test/zenserver-test.cpp | 89 ++++++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 7 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 466f3d6ca..a41eaed75 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1297,8 +1297,9 @@ TEST_CASE("zcache.rpc") std::string_view Namespace, std::string_view Bucket, size_t Num, - size_t PayloadSize = 1024, - size_t KeyOffset = 1) -> std::vector { + size_t PayloadSize = 1024, + size_t KeyOffset = 1, + std::vector* OutPackages = nullptr) -> std::vector { std::vector OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) @@ -1320,6 +1321,10 @@ TEST_CASE("zcache.rpc") cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK(Result.status_code == 200); + if (OutPackages) + { + OutPackages->emplace_back(std::move(Package)); + } } return OutKeys; @@ -1498,7 +1503,7 @@ TEST_CASE("zcache.rpc") } } - SUBCASE("policy - 'QueryLocal' on put denies overwrite") + SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value") { using namespace utils; @@ -1519,7 +1524,7 @@ TEST_CASE("zcache.rpc") for (const zen::CacheKey& CacheKey : Keys) { cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize*2, CachePolicy::Default); + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); CbPackage Package; CHECK(Request.Format(Package)); @@ -1529,7 +1534,15 @@ TEST_CASE("zcache.rpc") cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK(Result.status_code == 409); + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(!ResponseSuccess); + } } CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); @@ -1549,7 +1562,7 @@ TEST_CASE("zcache.rpc") } } - SUBCASE("policy - no 'QueryLocal' on put allows overwrite") + SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value") { using namespace utils; @@ -1581,6 +1594,14 @@ TEST_CASE("zcache.rpc") cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } } CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); @@ -1595,7 +1616,61 @@ TEST_CASE("zcache.rpc") CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { - CHECK(Value.RawSize == PayloadSize*2); + CHECK(Value.RawSize == PayloadSize * 2); + } + } + } + + SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector Packages; + std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages); + + for (const CbPackage& Package : Packages) + { + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + } + + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); } } } -- cgit v1.2.3 From 6d07b0437ccb7800652708f76a7ee84e551f43cf Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Sun, 2 Mar 2025 00:15:35 -0700 Subject: Control overwrite enforcement with a config setting --- src/zenserver-test/zenserver-test.cpp | 185 +++++++++++++++++++++++++--------- 1 file changed, 140 insertions(+), 45 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index a41eaed75..f469fc60e 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -549,9 +549,13 @@ namespace utils { return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)}; } - static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort) + static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "") { - return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); + return New(Port, + fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", + Args, + Args.length() > 0 ? " " : "", + UpstreamPort)); } static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span UpstreamPorts, bool Debug) @@ -1503,7 +1507,7 @@ TEST_CASE("zcache.rpc") } } - SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value") + SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites") { using namespace utils; @@ -1519,7 +1523,7 @@ TEST_CASE("zcache.rpc") std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; - std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); for (const zen::CacheKey& CacheKey : Keys) { @@ -1530,7 +1534,7 @@ TEST_CASE("zcache.rpc") CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); @@ -1541,28 +1545,103 @@ TEST_CASE("zcache.rpc") CHECK(ParsedResult.Parse(Response)); for (bool ResponseSuccess : ParsedResult.Success) { - CHECK(!ResponseSuccess); + CHECK(ResponseSuccess); } } - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - CHECK(Result.Result.Results.size() == Keys.size()); + CHECK(Result.Result.Results.size() == Keys.size()); - for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } + } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) { - CHECK(Value.RawSize == PayloadSize); + CHECK(!ResponseSuccess); } } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + }; + + // Check that the records are present and not overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and not overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); } - SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value") + SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites") { using namespace utils; @@ -1570,7 +1649,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1578,7 +1657,7 @@ TEST_CASE("zcache.rpc") std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; - std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); for (const zen::CacheKey& CacheKey : Keys) { @@ -1589,7 +1668,7 @@ TEST_CASE("zcache.rpc") CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); @@ -1604,24 +1683,32 @@ TEST_CASE("zcache.rpc") } } - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - CHECK(Result.Result.Results.size() == Keys.size()); + CHECK(Result.Result.Results.size() == Keys.size()); - for (size_t Index = 0; const std::optional& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { - CHECK(Value.RawSize == PayloadSize * 2); + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } } - } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); } - SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value") + SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites") { using namespace utils; @@ -1629,7 +1716,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1638,12 +1725,12 @@ TEST_CASE("zcache.rpc") std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector Packages; - std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages); + std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages); for (const CbPackage& Package : Packages) { IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", UpstreamCfg.BaseUri)}, + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); @@ -1658,21 +1745,29 @@ TEST_CASE("zcache.rpc") } } - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - CHECK(Result.Result.Results.size() == Keys.size()); + CHECK(Result.Result.Results.size() == Keys.size()); - for (size_t Index = 0; const std::optional& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { - CHECK(Value.RawSize == PayloadSize); + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } } - } + }; + + // Check that the records are present and unchanged in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and unchanged in the upstream server + CheckRecordCorrectness(UpstreamCfg); } SUBCASE("RpcAcceptOptions") -- cgit v1.2.3 From 25985163796ba45b028b40662146e44e8eff47a8 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Mon, 24 Mar 2025 23:30:03 -0600 Subject: Establish TODOs and unit test for rejected PUT propagation --- src/zenserver-test/zenserver-test.cpp | 76 ++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index f469fc60e..01c3144ad 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1303,6 +1303,7 @@ TEST_CASE("zcache.rpc") size_t Num, size_t PayloadSize = 1024, size_t KeyOffset = 1, + CachePolicy PutPolicy = CachePolicy::Default, std::vector* OutPackages = nullptr) -> std::vector { std::vector OutKeys; @@ -1313,7 +1314,7 @@ TEST_CASE("zcache.rpc") const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy); OutKeys.push_back(CacheKey); CbPackage Package; @@ -1725,7 +1726,8 @@ TEST_CASE("zcache.rpc") std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector Packages; - std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, &Packages); + std::vector Keys = + PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages); for (const CbPackage& Package : Packages) { @@ -1770,6 +1772,76 @@ TEST_CASE("zcache.rpc") CheckRecordCorrectness(UpstreamCfg); } + // TODO: Propagation for rejected PUTs + // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to + // upstream") + // { + // using namespace utils; + + // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + // ZenServerInstance UpstreamServer(TestEnv); + // SpawnServer(UpstreamServer, UpstreamCfg); + + // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + // ZenServerInstance LocalServer(TestEnv); + // SpawnServer(LocalServer, LocalCfg); + + // size_t PayloadSize = 1024; + // std::string_view Namespace("ue4.ddc"sv); + // std::string_view Bucket("mastodon"sv); + // const size_t NumRecords = 4; + // std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, + // CachePolicy::Local); + + // for (const zen::CacheKey& CacheKey : Keys) + // { + // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + // CbPackage Package; + // CHECK(Request.Format(Package)); + + // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + // cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + // CHECK(Result.status_code == 200); + // cacherequests::PutCacheRecordsResult ParsedResult; + // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + // CHECK(!Response.IsNull()); + // CHECK(ParsedResult.Parse(Response)); + // for (bool ResponseSuccess : ParsedResult.Success) + // { + // CHECK(!ResponseSuccess); + // } + // } + + // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) { + // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + // CHECK(Result.Result.Results.size() == Keys.size()); + + // for (size_t Index = 0; const std::optional& Record : Result.Result.Results) + // { + // CHECK(Record); + // const CacheKey& ExpectedKey = Keys[Index++]; + // CHECK(Record->Key == ExpectedKey); + // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + // { + // CHECK(Value.RawSize == ExpectedPayloadSize); + // } + // } + // }; + + // // Check that the records are present and not overwritten in the local server + // CheckRecordCorrectness(LocalCfg, PayloadSize); + + // // Check that the records are present and are the newer size in the upstream server + // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2); + // } + SUBCASE("RpcAcceptOptions") { using namespace utils; -- cgit v1.2.3 From 72c4cc46e1bdc147e64b5efca59de7f1560d4788 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 6 Aug 2025 10:52:49 +0200 Subject: refactor blobstore (#458) - Improvement: Refactored build store cache to use existing CidStore implementation instead of implementation specific blob storage - **CAUTION** This will clear any existing cache when updating as the manifest version and storage strategy has changed - Bugfix: BuildStorage cache return "true" for metadata existance for all blobs that had payloads regardless of actual existance for metadata --- src/zenserver-test/zenserver-test.cpp | 475 ++++++++++++++++++++++++++++++++++ 1 file changed, 475 insertions(+) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index ded40a486..d0c73f91b 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +39,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include +#include #undef GetObject ZEN_THIRD_PARTY_INCLUDES_END @@ -3813,6 +3816,478 @@ TEST_CASE("workspaces.share") CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); } +TEST_CASE("buildstore.blobs") +{ + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); }); + + std::string_view Namespace = "ns"sv; + std::string_view Bucket = "bkt"sv; + Oid BuildId = Oid::NewOid(); + + std::vector CompressedBlobsHashes; + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + HttpClient::Response Result = + Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload); + CHECK(Result); + } + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + CHECK(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + CHECK(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + HttpClient::Response Result = + Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload); + CHECK(Result); + } + } + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + CHECK(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } +} + +namespace { + CbObject MakeMetadata(const IoHash& BlobHash, const std::vector>& KeyValues) + { + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, BlobHash); + Writer.BeginObject("values"); + { + for (const auto& V : KeyValues) + { + Writer.AddString(V.first, V.second); + } + } + Writer.EndObject(); // values + return Writer.Save(); + }; + +} // namespace + +TEST_CASE("buildstore.metadata") +{ + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); }); + + std::string_view Namespace = "ns"sv; + std::string_view Bucket = "bkt"sv; + Oid BuildId = Oid::NewOid(); + + std::vector BlobHashes; + std::vector Metadatas; + std::vector MetadataHashes; + + auto GetMetadatas = + [](HttpClient& Client, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, std::vector BlobHashes) { + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/getBlobMetadata", Namespace, Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + CHECK(Result); + + std::vector ResultMetadatas; + + CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + ResultMetadatas.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + ResultMetadatas.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return ResultMetadatas; + }; + + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + const size_t BlobCount = 5; + + for (size_t I = 0; I < BlobCount; I++) + { + BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); + Metadatas.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetadataHashes.push_back(IoHash::HashBuffer(Metadatas.back().GetBuffer().AsIoBuffer())); + } + + { + CbPackage RequestPackage; + std::vector Attachments; + tsl::robin_set AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = Metadatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(Metadatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/putBlobMetadata", Namespace, Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + CHECK(Result); + } + + { + std::vector ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes); + + for (size_t Index = 0; Index < MetadataHashes.size(); Index++) + { + const IoHash& ExpectedHash = MetadataHashes[Index]; + IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer()); + CHECK_EQ(ExpectedHash, Hash); + } + } + } + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + std::vector ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes); + + for (size_t Index = 0; Index < MetadataHashes.size(); Index++) + { + const IoHash& ExpectedHash = MetadataHashes[Index]; + IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer()); + CHECK_EQ(ExpectedHash, Hash); + } + } +} + +TEST_CASE("buildstore.cache") +{ + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + std::filesystem::path TempDir = TestEnv.CreateNewTestDir(); + auto _ = MakeGuard([&SystemRootPath, &TempDir]() { + DeleteDirectories(SystemRootPath); + DeleteDirectories(TempDir); + }); + + std::string_view Namespace = "ns"sv; + std::string_view Bucket = "bkt"sv; + Oid BuildId = Oid::NewOid(); + + std::vector BlobHashes; + std::vector Metadatas; + std::vector MetadataHashes; + + const size_t BlobCount = 5; + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri()); + + BuildStorageCache::Statistics Stats; + std::unique_ptr Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false)); + + { + IoHash NoneBlob = IoHash::HashBuffer("data", 4); + std::vector NoneExists = Cache->BlobsExists(BuildId, std::vector{NoneBlob}); + CHECK(NoneExists.size() == 1); + CHECK(!NoneExists[0].HasBody); + CHECK(!NoneExists[0].HasMetadata); + } + + for (size_t I = 0; I < BlobCount; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + BlobHashes.push_back(CompressedBlob.DecodeRawHash()); + Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed()); + } + + Cache->Flush(500); + Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); + + { + std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount; I++) + { + CHECK(Exists[I].HasBody); + CHECK(!Exists[I].HasMetadata); + } + + std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(0, FetchedMetadatas.size()); + } + + { + for (size_t I = 0; I < BlobCount; I++) + { + IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]); + CHECK(BuildBlob); + CHECK_EQ(BlobHashes[I], + IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer())); + } + } + + { + for (size_t I = 0; I < BlobCount; I++) + { + CbObject Metadata = MakeMetadata(BlobHashes[I], + {{"key", fmt::format("{}", I)}, + {"key_plus_one", fmt::format("{}", I + 1)}, + {"block_hash", fmt::format("{}", BlobHashes[I])}}); + Metadatas.push_back(Metadata); + MetadataHashes.push_back(IoHash::HashBuffer(Metadata.GetBuffer().AsIoBuffer())); + } + Cache->PutBlobMetadatas(BuildId, BlobHashes, Metadatas); + } + + Cache->Flush(500); + Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); + + { + std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount; I++) + { + CHECK(Exists[I].HasBody); + CHECK(Exists[I].HasMetadata); + } + + std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, FetchedMetadatas.size()); + + for (size_t I = 0; I < BlobCount; I++) + { + CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); + } + } + + for (size_t I = 0; I < BlobCount; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + BlobHashes.push_back(CompressedBlob.DecodeRawHash()); + Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed()); + } + + Cache->Flush(500); + Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); + + { + std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount * 2; I++) + { + CHECK(Exists[I].HasBody); + CHECK_EQ(I < BlobCount, Exists[I].HasMetadata); + } + + std::vector MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, MetaDatas.size()); + + std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, FetchedMetadatas.size()); + + for (size_t I = 0; I < BlobCount; I++) + { + CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); + } + } + } + + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri()); + + BuildStorageCache::Statistics Stats; + std::unique_ptr Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false)); + + std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount * 2; I++) + { + CHECK(Exists[I].HasBody); + CHECK_EQ(I < BlobCount, Exists[I].HasMetadata); + } + + for (size_t I = 0; I < BlobCount * 2; I++) + { + IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]); + CHECK(BuildBlob); + CHECK_EQ(BlobHashes[I], + IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer())); + } + + std::vector MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, MetaDatas.size()); + + std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, FetchedMetadatas.size()); + + for (size_t I = 0; I < BlobCount; I++) + { + CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); + } + } +} + # if 0 TEST_CASE("lifetime.owner") { -- cgit v1.2.3 From 39e750f78b0944157f0179266b7593b2e492453f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Aug 2025 17:36:41 +0200 Subject: add limitoverwrites option per bucket (#466) - Feature: Added global zenserver option `--cache-bucket-limit-overwrites` controlling Whether to require policy flag pattern before allowing overwrites or not. Default `false` = overwrites always allowed - Feature: Add per bucket cache configuration option `limitoverwrites` (Lua options file only) cache = { bucket = { -- This is the default for all namespaces limitoverwrites = true }, buckets = { -- Here you can add matching per bucket name (matches accross namespaces) iostorecompression = { limitoverwrites = false }, }, } --- src/zenserver-test/zenserver-test.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 9ccdf3b5e..f913c108c 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1587,7 +1587,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1654,7 +1654,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1721,7 +1721,7 @@ TEST_CASE("zcache.rpc") ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); @@ -1786,9 +1786,8 @@ TEST_CASE("zcache.rpc") // ZenServerInstance UpstreamServer(TestEnv); // SpawnServer(UpstreamServer, UpstreamCfg); - // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); - // ZenServerInstance LocalServer(TestEnv); - // SpawnServer(LocalServer, LocalCfg); + // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, + // "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); // size_t PayloadSize = 1024; // std::string_view Namespace("ue4.ddc"sv); -- cgit v1.2.3