diff options
| author | Dan Engelbrecht <[email protected]> | 2022-08-19 03:30:54 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-08-19 03:30:54 -0700 |
| commit | 606274a83d71928f8621c1d23648a26e8f79fa7d (patch) | |
| tree | d951d51e1a7919463aeec398612e97b47f99f788 /zenserver-test/zenserver-test.cpp | |
| parent | bump vcpkg version to 2022.08.15 (#146) (diff) | |
| download | zen-606274a83d71928f8621c1d23648a26e8f79fa7d.tar.xz zen-606274a83d71928f8621c1d23648a26e8f79fa7d.zip | |
De/fix crash on non responding upstream (#145)
* Fix ZenStructuredCacheClient lifetime issues
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 280 |
1 files changed, 278 insertions, 2 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index a61c16916..0c519dc7e 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -68,6 +68,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER 1 # include <zencore/testing.h> +# include <zencore/workthreadpool.h> #endif using namespace std::literals; @@ -637,6 +638,16 @@ namespace utils { return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); } + static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug) + { + std::string Args = Debug ? "--debug" : ""; + for (uint16_t Port : UpstreamPorts) + { + Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port); + } + return New(13337, Args); + } + void Spawn(ZenServerInstance& Inst) { Inst.SetTestDir(DataDir); @@ -1351,8 +1362,7 @@ TEST_CASE("zcache.rpc") { zen::CbPackage Response; cacherequests::GetCacheRecordsResult Result; - // std::vector<zen::CbFieldView> Records; - bool Success; + bool Success; }; auto GetCacheRecords = [](std::string_view BaseUri, @@ -1521,6 +1531,272 @@ TEST_CASE("zcache.rpc") } } +TEST_CASE("zcache.failing.upstream") +{ + // This is an exploratory test that takes a long time to run, so lets skip it by default + if (true) + { + return; + } + + using namespace std::literals; + using namespace utils; + + const uint16_t Upstream1PortNumber = 13338; + ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber); + Upstream1Cfg.Args += (" --http asio"); + ZenServerInstance Upstream1Server(TestEnv); + + const uint16_t Upstream2PortNumber = 13339; + ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber); + Upstream2Cfg.Args += (" --http asio"); + ZenServerInstance Upstream2Server(TestEnv); + + std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber}; + ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false); + LocalCfg.Args += (" --http asio --upstream-thread-count 2"); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber); + const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber); + + SpawnServer(Upstream1Server, Upstream1Cfg); + SpawnServer(Upstream2Server, Upstream2Cfg); + SpawnServer(LocalServer, LocalCfg); + bool Upstream1Running = true; + bool Upstream2Running = true; + + using namespace std::literals; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint32_t> Data; + Data.resize(PayloadSize / 4); + for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) + { + Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; + } + + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t KeyOffset, + size_t PayloadSize = 8192) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + OutKeys.push_back(CacheKey); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + if (Result.status_code != 200) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); + OutKeys.clear(); + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success = false; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + GetCacheRecordResult OutResult; + + if (Result.status_code == 200) + { + CbPackage Response; + if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) + { + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + } + else + { + ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code); + } + + return OutResult; + }; + + // Populate with some simple data + + CachePolicy Policy = CachePolicy::Default; + + const size_t ThreadCount = 128; + const size_t KeyMultiplier = 16384; + const size_t RecordsPerRequest = 64; + WorkerThreadPool Pool(ThreadCount); + + std::atomic_size_t Completed = 0; + + auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier]; + RwLock KeysLock; + + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + Pool.ScheduleWork([&] { + std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); + if (NewKeys.size() != RecordsPerRequest) + { + ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + { + RwLock::ExclusiveLockScope _(KeysLock); + Keys[Iteration].swap(NewKeys); + } + Completed.fetch_add(1); + }); + } + bool UseUpstream1 = false; + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(8000); + + if (UseUpstream1) + { + if (Upstream2Running) + { + Upstream2Server.EnableTermination(); + Upstream2Server.Shutdown(); + Sleep(100); + Upstream2Running = false; + } + if (!Upstream1Running) + { + SpawnServer(Upstream1Server, Upstream1Cfg); + Upstream1Running = true; + } + UseUpstream1 = !UseUpstream1; + } + else + { + if (Upstream1Running) + { + Upstream1Server.EnableTermination(); + Upstream1Server.Shutdown(); + Sleep(100); + Upstream1Running = false; + } + if (!Upstream2Running) + { + SpawnServer(Upstream2Server, Upstream2Cfg); + Upstream2Running = true; + } + UseUpstream1 = !UseUpstream1; + } + } + + Completed = 0; + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + std::vector<CacheKey>& LocalKeys = Keys[Iteration]; + if (LocalKeys.empty()) + { + Completed.fetch_add(1); + continue; + } + Pool.ScheduleWork([&] { + GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); + + if (!Result.Success) + { + ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + + if (Result.Result.Results.size() != LocalKeys.size()) + { + ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); + Completed.fetch_add(1); + return; + } + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const CacheKey& ExpectedKey = LocalKeys[Index++]; + if (!Record) + { + continue; + } + if (Record->Key != ExpectedKey) + { + continue; + } + if (Record->Values.size() != 1) + { + continue; + } + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + if (!Value.Body) + { + continue; + } + } + } + Completed.fetch_add(1); + }); + } + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(10); + } +} + TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; |