aboutsummaryrefslogtreecommitdiff
path: root/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-08-19 03:30:54 -0700
committerGitHub <[email protected]>2022-08-19 03:30:54 -0700
commit606274a83d71928f8621c1d23648a26e8f79fa7d (patch)
treed951d51e1a7919463aeec398612e97b47f99f788 /zenserver-test/zenserver-test.cpp
parentbump vcpkg version to 2022.08.15 (#146) (diff)
downloadzen-606274a83d71928f8621c1d23648a26e8f79fa7d.tar.xz
zen-606274a83d71928f8621c1d23648a26e8f79fa7d.zip
De/fix crash on non responding upstream (#145)
* Fix ZenStructuredCacheClient lifetime issues
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
-rw-r--r--zenserver-test/zenserver-test.cpp280
1 files changed, 278 insertions, 2 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index a61c16916..0c519dc7e 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -68,6 +68,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# define ZEN_TEST_WITH_RUNNER 1
# include <zencore/testing.h>
+# include <zencore/workthreadpool.h>
#endif
using namespace std::literals;
@@ -637,6 +638,16 @@ namespace utils {
return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
}
+ static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug)
+ {
+ std::string Args = Debug ? "--debug" : "";
+ for (uint16_t Port : UpstreamPorts)
+ {
+ Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port);
+ }
+ return New(13337, Args);
+ }
+
void Spawn(ZenServerInstance& Inst)
{
Inst.SetTestDir(DataDir);
@@ -1351,8 +1362,7 @@ TEST_CASE("zcache.rpc")
{
zen::CbPackage Response;
cacherequests::GetCacheRecordsResult Result;
- // std::vector<zen::CbFieldView> Records;
- bool Success;
+ bool Success;
};
auto GetCacheRecords = [](std::string_view BaseUri,
@@ -1521,6 +1531,272 @@ TEST_CASE("zcache.rpc")
}
}
+TEST_CASE("zcache.failing.upstream")
+{
+ // This is an exploratory test that takes a long time to run, so lets skip it by default
+ if (true)
+ {
+ return;
+ }
+
+ using namespace std::literals;
+ using namespace utils;
+
+ const uint16_t Upstream1PortNumber = 13338;
+ ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber);
+ Upstream1Cfg.Args += (" --http asio");
+ ZenServerInstance Upstream1Server(TestEnv);
+
+ const uint16_t Upstream2PortNumber = 13339;
+ ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber);
+ Upstream2Cfg.Args += (" --http asio");
+ ZenServerInstance Upstream2Server(TestEnv);
+
+ std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber};
+ ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false);
+ LocalCfg.Args += (" --http asio --upstream-thread-count 2");
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber);
+ const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber);
+
+ SpawnServer(Upstream1Server, Upstream1Cfg);
+ SpawnServer(Upstream2Server, Upstream2Cfg);
+ SpawnServer(LocalServer, LocalCfg);
+ bool Upstream1Running = true;
+ bool Upstream2Running = true;
+
+ using namespace std::literals;
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy RecordPolicy) {
+ std::vector<uint32_t> Data;
+ Data.resize(PayloadSize / 4);
+ for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx)
+ {
+ Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx;
+ }
+
+ CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4));
+ Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t Num,
+ size_t KeyOffset,
+ size_t PayloadSize = 8192) -> std::vector<CacheKey> {
+ std::vector<zen::CacheKey> OutKeys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ zen::IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key;
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+
+ AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ OutKeys.push_back(CacheKey);
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ if (Result.status_code != 200)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
+ OutKeys.clear();
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ cacherequests::GetCacheRecordsResult Result;
+ bool Success = false;
+ };
+
+ auto GetCacheRecords = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::span<zen::CacheKey> Keys,
+ zen::CachePolicy Policy) -> GetCacheRecordResult {
+ cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = Policy, .Namespace = std::string(Namespace)};
+ for (const CacheKey& Key : Keys)
+ {
+ Request.Requests.push_back({.Key = Key});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(Request.Format(RequestWriter));
+
+ BinaryWriter Body;
+ RequestWriter.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.status_code == 200)
+ {
+ CbPackage Response;
+ if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())))
+ {
+ OutResult.Response = std::move(Response);
+ CHECK(OutResult.Result.Parse(OutResult.Response));
+ OutResult.Success = true;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code);
+ }
+
+ return OutResult;
+ };
+
+ // Populate with some simple data
+
+ CachePolicy Policy = CachePolicy::Default;
+
+ const size_t ThreadCount = 128;
+ const size_t KeyMultiplier = 16384;
+ const size_t RecordsPerRequest = 64;
+ WorkerThreadPool Pool(ThreadCount);
+
+ std::atomic_size_t Completed = 0;
+
+ auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier];
+ RwLock KeysLock;
+
+ for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
+ {
+ size_t Iteration = I;
+ Pool.ScheduleWork([&] {
+ std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
+ if (NewKeys.size() != RecordsPerRequest)
+ {
+ ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ {
+ RwLock::ExclusiveLockScope _(KeysLock);
+ Keys[Iteration].swap(NewKeys);
+ }
+ Completed.fetch_add(1);
+ });
+ }
+ bool UseUpstream1 = false;
+ while (Completed < ThreadCount * KeyMultiplier)
+ {
+ Sleep(8000);
+
+ if (UseUpstream1)
+ {
+ if (Upstream2Running)
+ {
+ Upstream2Server.EnableTermination();
+ Upstream2Server.Shutdown();
+ Sleep(100);
+ Upstream2Running = false;
+ }
+ if (!Upstream1Running)
+ {
+ SpawnServer(Upstream1Server, Upstream1Cfg);
+ Upstream1Running = true;
+ }
+ UseUpstream1 = !UseUpstream1;
+ }
+ else
+ {
+ if (Upstream1Running)
+ {
+ Upstream1Server.EnableTermination();
+ Upstream1Server.Shutdown();
+ Sleep(100);
+ Upstream1Running = false;
+ }
+ if (!Upstream2Running)
+ {
+ SpawnServer(Upstream2Server, Upstream2Cfg);
+ Upstream2Running = true;
+ }
+ UseUpstream1 = !UseUpstream1;
+ }
+ }
+
+ Completed = 0;
+ for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
+ {
+ size_t Iteration = I;
+ std::vector<CacheKey>& LocalKeys = Keys[Iteration];
+ if (LocalKeys.empty())
+ {
+ Completed.fetch_add(1);
+ continue;
+ }
+ Pool.ScheduleWork([&] {
+ GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);
+
+ if (!Result.Success)
+ {
+ ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+
+ if (Result.Result.Results.size() != LocalKeys.size())
+ {
+ ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const CacheKey& ExpectedKey = LocalKeys[Index++];
+ if (!Record)
+ {
+ continue;
+ }
+ if (Record->Key != ExpectedKey)
+ {
+ continue;
+ }
+ if (Record->Values.size() != 1)
+ {
+ continue;
+ }
+
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ if (!Value.Body)
+ {
+ continue;
+ }
+ }
+ }
+ Completed.fetch_add(1);
+ });
+ }
+ while (Completed < ThreadCount * KeyMultiplier)
+ {
+ Sleep(10);
+ }
+}
+
TEST_CASE("zcache.rpc.allpolicies")
{
using namespace std::literals;