aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
-rw-r--r--src/zenserver-test/zenserver-test.cpp354
1 files changed, 349 insertions, 5 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index d0c73f91b..9ccdf3b5e 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -553,9 +553,13 @@ namespace utils {
return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)};
}
- static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort)
+ static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "")
{
- return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
+ return New(Port,
+ fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}",
+ Args,
+ Args.length() > 0 ? " " : "",
+ UpstreamPort));
}
static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug)
@@ -1301,8 +1305,10 @@ TEST_CASE("zcache.rpc")
std::string_view Namespace,
std::string_view Bucket,
size_t Num,
- size_t PayloadSize = 1024,
- size_t KeyOffset = 1) -> std::vector<CacheKey> {
+ size_t PayloadSize = 1024,
+ size_t KeyOffset = 1,
+ CachePolicy PutPolicy = CachePolicy::Default,
+ std::vector<CbPackage>* OutPackages = nullptr) -> std::vector<CacheKey> {
std::vector<zen::CacheKey> OutKeys;
for (uint32_t Key = 1; Key <= Num; ++Key)
@@ -1312,7 +1318,7 @@ TEST_CASE("zcache.rpc")
const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
- AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy);
OutKeys.push_back(CacheKey);
CbPackage Package;
@@ -1324,6 +1330,10 @@ TEST_CASE("zcache.rpc")
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
CHECK(Result.status_code == 200);
+ if (OutPackages)
+ {
+ OutPackages->emplace_back(std::move(Package));
+ }
}
return OutKeys;
@@ -1502,6 +1512,340 @@ TEST_CASE("zcache.rpc")
}
}
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(!ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and not overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and not overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<CbPackage> Packages;
+ std::vector<zen::CacheKey> Keys =
+ PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages);
+
+ for (const CbPackage& Package : Packages)
+ {
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and unchanged in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and unchanged in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ // TODO: Propagation for rejected PUTs
+ // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to
+ // upstream")
+ // {
+ // using namespace utils;
+
+ // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ // ZenServerInstance UpstreamServer(TestEnv);
+ // SpawnServer(UpstreamServer, UpstreamCfg);
+
+ // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-limit-overwrites");
+ // ZenServerInstance LocalServer(TestEnv);
+ // SpawnServer(LocalServer, LocalCfg);
+
+ // size_t PayloadSize = 1024;
+ // std::string_view Namespace("ue4.ddc"sv);
+ // std::string_view Bucket("mastodon"sv);
+ // const size_t NumRecords = 4;
+ // std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1,
+ // CachePolicy::Local);
+
+ // for (const zen::CacheKey& CacheKey : Keys)
+ // {
+ // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ // CbPackage Package;
+ // CHECK(Request.Format(Package));
+
+ // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ // cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ // CHECK(Result.status_code == 200);
+ // cacherequests::PutCacheRecordsResult ParsedResult;
+ // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ // CHECK(!Response.IsNull());
+ // CHECK(ParsedResult.Parse(Response));
+ // for (bool ResponseSuccess : ParsedResult.Success)
+ // {
+ // CHECK(!ResponseSuccess);
+ // }
+ // }
+
+ // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) {
+ // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ // CHECK(Result.Result.Results.size() == Keys.size());
+
+ // for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ // {
+ // CHECK(Record);
+ // const CacheKey& ExpectedKey = Keys[Index++];
+ // CHECK(Record->Key == ExpectedKey);
+ // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ // {
+ // CHECK(Value.RawSize == ExpectedPayloadSize);
+ // }
+ // }
+ // };
+
+ // // Check that the records are present and not overwritten in the local server
+ // CheckRecordCorrectness(LocalCfg, PayloadSize);
+
+ // // Check that the records are present and are the newer size in the upstream server
+ // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2);
+ // }
+
SUBCASE("RpcAcceptOptions")
{
using namespace utils;