diff options
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 354 |
1 files changed, 349 insertions, 5 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index d0c73f91b..9ccdf3b5e 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -553,9 +553,13 @@ namespace utils { return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)}; } - static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort) + static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "") { - return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); + return New(Port, + fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", + Args, + Args.length() > 0 ? " " : "", + UpstreamPort)); } static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug) @@ -1301,8 +1305,10 @@ TEST_CASE("zcache.rpc") std::string_view Namespace, std::string_view Bucket, size_t Num, - size_t PayloadSize = 1024, - size_t KeyOffset = 1) -> std::vector<CacheKey> { + size_t PayloadSize = 1024, + size_t KeyOffset = 1, + CachePolicy PutPolicy = CachePolicy::Default, + std::vector<CbPackage>* OutPackages = nullptr) -> std::vector<CacheKey> { std::vector<zen::CacheKey> OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) @@ -1312,7 +1318,7 @@ TEST_CASE("zcache.rpc") const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy); OutKeys.push_back(CacheKey); CbPackage Package; @@ -1324,6 +1330,10 @@ TEST_CASE("zcache.rpc") cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK(Result.status_code == 200); + if (OutPackages) + { + OutPackages->emplace_back(std::move(Package)); + } } return OutKeys; @@ -1502,6 +1512,340 @@ TEST_CASE("zcache.rpc") } } + SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } + } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(!ResponseSuccess); + } + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + }; + + // Check that the records are present and not overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and not overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } + } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<CbPackage> Packages; + std::vector<zen::CacheKey> Keys = + PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages); + + for (const CbPackage& Package : Packages) + { + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + }; + + // Check that the records are present and unchanged in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and unchanged in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + // TODO: Propagation for rejected PUTs + // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to + // upstream") + // { + // using namespace utils; + + // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + // ZenServerInstance UpstreamServer(TestEnv); + // SpawnServer(UpstreamServer, UpstreamCfg); + + // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites"); + // ZenServerInstance LocalServer(TestEnv); + // SpawnServer(LocalServer, LocalCfg); + + // size_t PayloadSize = 1024; + // std::string_view Namespace("ue4.ddc"sv); + // std::string_view Bucket("mastodon"sv); + // const size_t NumRecords = 4; + // std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, + // CachePolicy::Local); + + // for (const zen::CacheKey& CacheKey : Keys) + // { + // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + // CbPackage Package; + // CHECK(Request.Format(Package)); + + // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + // cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + // CHECK(Result.status_code == 200); + // cacherequests::PutCacheRecordsResult ParsedResult; + // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + // CHECK(!Response.IsNull()); + // CHECK(ParsedResult.Parse(Response)); + // for (bool ResponseSuccess : ParsedResult.Success) + // { + // CHECK(!ResponseSuccess); + // } + // } + + // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) { + // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + // CHECK(Result.Result.Results.size() == Keys.size()); + + // for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + // { + // CHECK(Record); + // const CacheKey& ExpectedKey = Keys[Index++]; + // CHECK(Record->Key == ExpectedKey); + // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + // { + // CHECK(Value.RawSize == ExpectedPayloadSize); + // } + // } + // }; + + // // Check that the records are present and not overwritten in the local server + // CheckRecordCorrectness(LocalCfg, PayloadSize); + + // // Check that the records are present and are the newer size in the upstream server + // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2); + // } + SUBCASE("RpcAcceptOptions") { using namespace utils; |