diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenserver-test/zenserver-test.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 3323 |
1 files changed, 0 insertions, 3323 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp deleted file mode 100644 index 3195181d1..000000000 --- a/zenserver-test/zenserver-test.cpp +++ /dev/null @@ -1,3323 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/refcount.h> -#include <zencore/stream.h> -#include <zencore/string.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zencore/xxhash.h> -#include <zenhttp/httpclient.h> -#include <zenhttp/httpshared.h> -#include <zenhttp/websocket.h> -#include <zenhttp/zenhttp.h> -#include <zenutil/cache/cache.h> -#include <zenutil/cache/cacherequests.h> -#include <zenutil/zenserverprocess.h> - -#if ZEN_USE_MIMALLOC -ZEN_THIRD_PARTY_INCLUDES_START -# include <mimalloc.h> -ZEN_THIRD_PARTY_INCLUDES_END -#endif - -#include <http_parser.h> - -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Crypt32.lib") -# pragma comment(lib, "Wldap32.lib") -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -#undef GetObject -ZEN_THIRD_PARTY_INCLUDES_END - -#include <atomic> -#include <filesystem> -#include <map> -#include <random> -#include <span> -#include <thread> -#include <typeindex> -#include <unordered_map> - -#if ZEN_PLATFORM_WINDOWS -# include <ppl.h> -# include <atlbase.h> -# include <process.h> -#endif - -#include <asio.hpp> - -////////////////////////////////////////////////////////////////////////// - -#include "projectclient.h" - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS -# define ZEN_TEST_WITH_RUNNER 1 -# include <zencore/testing.h> -# include <zencore/workthreadpool.h> -#endif - -using namespace std::literals; - -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC -struct Concurrency -{ - template<typename... T> - static void parallel_invoke(T&&... t) - { - constexpr size_t NumTs = sizeof...(t); - std::thread Threads[NumTs] = { - std::thread(std::forward<T>(t))..., - }; - - for (std::thread& Thread : Threads) - { - Thread.join(); - } - } -}; -#endif - -////////////////////////////////////////////////////////////////////////// -// -// Custom logging -- test code, this should be tweaked -// - -namespace logging { -using namespace spdlog; -using namespace spdlog::details; -using namespace std::literals; - -class full_test_formatter final : public spdlog::formatter -{ -public: - full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) - { - } - - virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); } - - static constexpr bool UseDate = false; - - virtual void format(const details::log_msg& msg, memory_buf_t& dest) override - { - using std::chrono::duration_cast; - using std::chrono::milliseconds; - using std::chrono::seconds; - - if constexpr (UseDate) - { - auto secs = std::chrono::duration_cast<seconds>(msg.time.time_since_epoch()); - if (secs != m_LastLogSecs) - { - m_CachedTm = os::localtime(log_clock::to_time_t(msg.time)); - m_LastLogSecs = secs; - } - } - - const auto& tm_time = m_CachedTm; - - // cache the date/time part for the next second. - auto duration = msg.time - m_Epoch; - auto secs = duration_cast<seconds>(duration); - - if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0) - { - m_CachedDatetime.clear(); - m_CachedDatetime.push_back('['); - - if constexpr (UseDate) - { - fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime); - m_CachedDatetime.push_back('-'); - - fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime); - m_CachedDatetime.push_back('-'); - - fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime); - m_CachedDatetime.push_back(' '); - - fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime); - m_CachedDatetime.push_back(':'); - - fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime); - m_CachedDatetime.push_back(':'); - - fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime); - } - else - { - int Count = int(secs.count()); - - const int LogSecs = Count % 60; - Count /= 60; - - const int LogMins = Count % 60; - Count /= 60; - - const int LogHours = Count; - - fmt_helper::pad2(LogHours, m_CachedDatetime); - m_CachedDatetime.push_back(':'); - fmt_helper::pad2(LogMins, m_CachedDatetime); - m_CachedDatetime.push_back(':'); - fmt_helper::pad2(LogSecs, m_CachedDatetime); - } - - m_CachedDatetime.push_back('.'); - - m_CacheTimestamp = secs; - } - - dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end()); - - auto millis = fmt_helper::time_fraction<milliseconds>(msg.time); - fmt_helper::pad3(static_cast<uint32_t>(millis.count()), dest); - dest.push_back(']'); - dest.push_back(' '); - - if (!m_LogId.empty()) - { - dest.push_back('['); - fmt_helper::append_string_view(m_LogId, dest); - dest.push_back(']'); - dest.push_back(' '); - } - - // append logger name if exists - if (msg.logger_name.size() > 0) - { - dest.push_back('['); - fmt_helper::append_string_view(msg.logger_name, dest); - dest.push_back(']'); - dest.push_back(' '); - } - - dest.push_back('['); - // wrap the level name with color - msg.color_range_start = dest.size(); - fmt_helper::append_string_view(level::to_string_view(msg.level), dest); - msg.color_range_end = dest.size(); - dest.push_back(']'); - dest.push_back(' '); - - // add source location if present - if (!msg.source.empty()) - { - dest.push_back('['); - const char* filename = details::short_filename_formatter<details::null_scoped_padder>::basename(msg.source.filename); - fmt_helper::append_string_view(filename, dest); - dest.push_back(':'); - fmt_helper::append_int(msg.source.line, dest); - dest.push_back(']'); - dest.push_back(' '); - } - - fmt_helper::append_string_view(msg.payload, dest); - fmt_helper::append_string_view("\n"sv, dest); - } - -private: - std::chrono::time_point<std::chrono::system_clock> m_Epoch; - std::tm m_CachedTm; - std::chrono::seconds m_LastLogSecs; - std::chrono::seconds m_CacheTimestamp{0}; - memory_buf_t m_CachedDatetime; - std::string m_LogId; -}; -} // namespace logging - -////////////////////////////////////////////////////////////////////////// - -#if 0 - -int -main() -{ - mi_version(); - - zen::Sleep(1000); - - zen::Stopwatch timer; - - const int RequestCount = 100000; - - cpr::Session Sessions[10]; - - for (auto& Session : Sessions) - { - Session.SetUrl(cpr::Url{"http://localhost:1337/test/hello"}); - //Session.SetUrl(cpr::Url{ "http://arn-wd-l0182:1337/test/hello" }); - } - - auto Run = [](cpr::Session& Session) { - for (int i = 0; i < 10000; ++i) - { - cpr::Response Result = Session.Get(); - - if (Result.status_code != 200) - { - ZEN_WARN("request response: {}", Result.status_code); - } - } - }; - - Concurrency::parallel_invoke([&] { Run(Sessions[0]); }, - [&] { Run(Sessions[1]); }, - [&] { Run(Sessions[2]); }, - [&] { Run(Sessions[3]); }, - [&] { Run(Sessions[4]); }, - [&] { Run(Sessions[5]); }, - [&] { Run(Sessions[6]); }, - [&] { Run(Sessions[7]); }, - [&] { Run(Sessions[8]); }, - [&] { Run(Sessions[9]); }); - - // cpr::Response r = cpr::Get(cpr::Url{ "http://localhost:1337/test/hello" }); - - ZEN_INFO("{} requests in {} ({})", - RequestCount, - zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()), - zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req")); - - return 0; -} -#elif 0 -// #include <restinio/all.hpp> - -int -main() -{ - mi_version(); - restinio::run(restinio::on_thread_pool(32).port(8080).request_handler( - [](auto req) { return req->create_response().set_body("Hello, World!").done(); })); - return 0; -} -#elif ZEN_WITH_TESTS - -zen::ZenServerEnvironment TestEnv; - -int -main(int argc, char** argv) -{ - using namespace std::literals; - -# if ZEN_USE_MIMALLOC - mi_version(); -# endif - - zen::zencore_forcelinktests(); - zen::zenhttp_forcelinktests(); - zen::cacherequests_forcelink(); - - zen::logging::InitializeLogging(); - - spdlog::set_level(spdlog::level::debug); - spdlog::set_formatter(std::make_unique< ::logging::full_test_formatter>("test", std::chrono::system_clock::now())); - - std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); - std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; - - // This is pretty janky because we're passing most of the options through to the test - // framework, so we can't just use cxxopts (I think). This should ideally be cleaned up - // somehow in the future - - std::string ServerClass; - - for (int i = 1; i < argc; ++i) - { - if (argv[i] == "--http"sv) - { - if ((i + 1) < argc) - { - ServerClass = argv[++i]; - } - } - } - - TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); - - ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); - - zen::testing::TestRunner Runner; - Runner.ApplyCommandLine(argc, argv); - - return Runner.Run(); -} - -namespace zen::tests { - -TEST_CASE("default.single") -{ - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Instance(TestEnv); - Instance.SetTestDir(TestDir); - Instance.SpawnServer(13337); - - ZEN_INFO("Waiting..."); - - Instance.WaitUntilReady(); - - std::atomic<uint64_t> RequestCount{0}; - std::atomic<uint64_t> BatchCounter{0}; - - ZEN_INFO("Running single server test..."); - - auto IssueTestRequests = [&] { - const uint64_t BatchNo = BatchCounter.fetch_add(1); - const int ThreadId = zen::GetCurrentThreadId(); - - ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId); - cpr::Session cli; - cli.SetUrl(cpr::Url{"http://localhost:13337/test/hello"}); - - for (int i = 0; i < 10000; ++i) - { - auto res = cli.Get(); - ++RequestCount; - } - ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); - }; - - zen::Stopwatch timer; - - Concurrency::parallel_invoke(IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests, - IssueTestRequests); - - uint64_t Elapsed = timer.GetElapsedTimeMs(); - - ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); -} - -TEST_CASE("multi.basic") -{ - ZenServerInstance Instance1(TestEnv); - std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); - Instance1.SetTestDir(TestDir1); - Instance1.SpawnServer(13337); - - ZenServerInstance Instance2(TestEnv); - std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); - Instance2.SetTestDir(TestDir2); - Instance2.SpawnServer(13338); - - ZEN_INFO("Waiting..."); - - Instance1.WaitUntilReady(); - Instance2.WaitUntilReady(); - - std::atomic<uint64_t> RequestCount{0}; - std::atomic<uint64_t> BatchCounter{0}; - - auto IssueTestRequests = [&](int PortNumber) { - const uint64_t BatchNo = BatchCounter.fetch_add(1); - const int ThreadId = zen::GetCurrentThreadId(); - - ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber); - - cpr::Session cli; - cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)}); - - for (int i = 0; i < 10000; ++i) - { - auto res = cli.Get(); - ++RequestCount; - } - ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); - }; - - zen::Stopwatch timer; - - ZEN_INFO("Running multi-server test..."); - - Concurrency::parallel_invoke([&] { IssueTestRequests(13337); }, - [&] { IssueTestRequests(13338); }, - [&] { IssueTestRequests(13337); }, - [&] { IssueTestRequests(13338); }); - - uint64_t Elapsed = timer.GetElapsedTimeMs(); - - ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); -} - -TEST_CASE("project.basic") -{ - using namespace std::literals; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - const uint16_t PortNumber = 13337; - - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); - - std::atomic<uint64_t> RequestCount{0}; - - zen::Stopwatch timer; - - std::mt19937_64 mt; - - zen::StringBuilder<64> BaseUri; - BaseUri << fmt::format("http://localhost:{}/prj/test", PortNumber); - - std::filesystem::path BinPath = zen::GetRunningExecutablePath(); - std::filesystem::path RootPath = BinPath.parent_path().parent_path(); - BinPath = BinPath.lexically_relative(RootPath); - - SUBCASE("build store init") - { - { - { - zen::CbObjectWriter Body; - Body << "id" - << "test"; - Body << "root" << RootPath.c_str(); - Body << "project" - << "/zooom"; - Body << "engine" - << "/zooom"; - - zen::BinaryWriter MemOut; - Body.Save(MemOut); - - auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); - CHECK(Response.status_code == 201); - } - - { - auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); - CHECK(Response.status_code == 200); - - zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); - - CHECK(ResponseObject["id"].AsString() == "test"sv); - CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); - } - } - - BaseUri << "/oplog/foobar"; - - { - { - zen::StringBuilder<64> PostUri; - PostUri << BaseUri; - auto Response = cpr::Post(cpr::Url{PostUri.c_str()}); - CHECK(Response.status_code == 201); - } - - { - auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); - CHECK(Response.status_code == 200); - - zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); - - CHECK(ResponseObject["id"].AsString() == "foobar"sv); - CHECK(ResponseObject["project"].AsString() == "test"sv); - } - } - - SUBCASE("build store persistence") - { - uint8_t AttachData[] = {1, 2, 3}; - - zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); - zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; - - zen::CbObjectWriter OpWriter; - OpWriter << "key" - << "foo" - << "attachment" << Attach; - - const std::string_view ChunkId{ - "00000000" - "00000000" - "00010000"}; - auto FileOid = zen::Oid::FromHexString(ChunkId); - - OpWriter.BeginArray("files"); - OpWriter.BeginObject(); - OpWriter << "id" << FileOid; - OpWriter << "clientpath" - << "/{engine}/client/side/path"; - OpWriter << "serverpath" << BinPath.c_str(); - OpWriter.EndObject(); - OpWriter.EndArray(); - - zen::CbObject Op = OpWriter.Save(); - - zen::CbPackage OpPackage(Op); - OpPackage.AddAttachment(Attach); - - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - - { - zen::StringBuilder<64> PostUri; - PostUri << BaseUri << "/new"; - auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); - - REQUIRE(!Response.error); - CHECK(Response.status_code == 201); - } - - // Read file data - - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << BaseUri << "/" << ChunkId; - auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); - - REQUIRE(!Response.error); - CHECK(Response.status_code == 200); - } - - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10"; - auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); - - REQUIRE(!Response.error); - CHECK(Response.status_code == 200); - CHECK(Response.text.size() == 10); - } - - ZEN_INFO("+++++++"); - } - SUBCASE("build store op commit") { ZEN_INFO("-------"); } - SUBCASE("test chunk not found error") - { - for (size_t I = 0; I < 65; I++) - { - zen::StringBuilder<128> PostUri; - PostUri << BaseUri << "/f77c781846caead318084604/info"; - auto Response = cpr::Get(cpr::Url{PostUri.c_str()}); - - REQUIRE(!Response.error); - CHECK(Response.status_code == 404); - } - } - } - - const uint64_t Elapsed = timer.GetElapsedTimeMs(); - - ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); -} - -# if 0 // this is extremely WIP -TEST_CASE("project.pipe") -{ - using namespace std::literals; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - const uint16_t PortNumber = 13337; - - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); - - zen::LocalProjectClient LocalClient(PortNumber); - - zen::CbObjectWriter Cbow; - Cbow << "hey" << 42; - - zen::CbObject Response = LocalClient.MessageTransaction(Cbow.Save()); -} -# endif - -namespace utils { - - struct ZenConfig - { - std::filesystem::path DataDir; - uint16_t Port; - std::string BaseUri; - std::string Args; - - static ZenConfig New(uint16_t Port = 13337, std::string Args = "") - { - return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), - .Port = Port, - .BaseUri = fmt::format("http://localhost:{}/z$", Port), - .Args = std::move(Args)}; - } - - static ZenConfig NewWithUpstream(uint16_t UpstreamPort) - { - return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); - } - - static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug) - { - std::string Args = Debug ? "--debug" : ""; - for (uint16_t Port : UpstreamPorts) - { - Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port); - } - return New(13337, Args); - } - - void Spawn(ZenServerInstance& Inst) - { - Inst.SetTestDir(DataDir); - Inst.SpawnServer(Port, Args); - Inst.WaitUntilReady(); - } - }; - - void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg) - { - Server.SetTestDir(Cfg.DataDir); - Server.SpawnServer(Cfg.Port, Cfg.Args); - Server.WaitUntilReady(); - } - -} // namespace utils - -TEST_CASE("zcache.basic") -{ - using namespace std::literals; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - const uint16_t PortNumber = 13337; - - const int kIterationCount = 100; - const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; - - { - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); - - // Populate with some simple data - - for (int i = 0; i < kIterationCount; ++i) - { - zen::CbObjectWriter Cbo; - Cbo << "index" << i; - - zen::BinaryWriter MemOut; - Cbo.Save(MemOut); - - zen::IoHash Key = HashKey(i); - - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, - cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - - CHECK(Result.status_code == 201); - } - - // Retrieve data - - for (int i = 0; i < kIterationCount; ++i) - { - zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i); - - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.status_code == 200); - } - - // Ensure bad bucket identifiers are rejected - - { - zen::CbObjectWriter Cbo; - Cbo << "index" << 42; - - zen::BinaryWriter MemOut; - Cbo.Save(MemOut); - - zen::IoHash Key = HashKey(442); - - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "te!st", Key)}, - cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - - CHECK(Result.status_code == 400); - } - } - - // Verify that the data persists between process runs (the previous server has exited at this point) - - { - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); - - // Retrieve data again - - for (int i = 0; i < kIterationCount; ++i) - { - zen::IoHash Key = HashKey(i); - - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.status_code == 200); - } - } -} - -TEST_CASE("zcache.cbpackage") -{ - using namespace std::literals; - - auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { - auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); - auto CompressedData = zen::CompressedBuffer::Compress(Data); - - OutAttachmentKey = CompressedData.DecodeRawHash(); - - zen::CbWriter Obj; - Obj.BeginObject("obj"sv); - Obj.AddBinaryAttachment("data", OutAttachmentKey); - Obj.EndObject(); - - zen::CbPackage Package; - Package.SetObject(Obj.Save().AsObject()); - Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); - - return Package; - }; - - auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { - zen::BinaryWriter MemStream; - - Package.Save(MemStream); - - return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - }; - - auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { - std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); - std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); - - if (LhsAttachments.size() != RhsAttachments.size()) - { - return false; - } - - for (const zen::CbAttachment& LhsAttachment : LhsAttachments) - { - const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); - CHECK(RhsAttachment); - - zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); - CHECK(!LhsBuffer.IsNull()); - - zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); - CHECK(!RhsBuffer.IsNull()); - - if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) - { - return false; - } - } - - return true; - }; - - SUBCASE("PUT/GET returns correct package") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const uint16_t PortNumber = 13337; - const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); - - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); - - // PUT - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Body.Data(), Body.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - // GET - { - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Response); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } - - SUBCASE("PUT propagates upstream") - { - // Setup local and remote server - std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); - std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); - const uint16_t LocalPortNumber = 13337; - const uint16_t RemotePortNumber = 13338; - - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); - - ZenServerInstance RemoteInstance(TestEnv); - RemoteInstance.SetTestDir(RemoteDataDir); - RemoteInstance.SpawnServer(RemotePortNumber); - RemoteInstance.WaitUntilReady(); - - ZenServerInstance LocalInstance(TestEnv); - LocalInstance.SetTestDir(LocalDataDir); - LocalInstance.SpawnServer(LocalPortNumber, - fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); - LocalInstance.WaitUntilReady(); - - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); - - // Store the cache record package in the local instance - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, - cpr::Body{(const char*)Body.Data(), Body.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - - CHECK(Result.status_code == 201); - } - - // The cache record can be retrieved as a package from the local instance - { - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - - // The cache record can be retrieved as a package from the remote instance - { - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } - - SUBCASE("GET finds upstream when missing in local") - { - // Setup local and remote server - std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); - std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); - const uint16_t LocalPortNumber = 13337; - const uint16_t RemotePortNumber = 13338; - - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); - - ZenServerInstance RemoteInstance(TestEnv); - RemoteInstance.SetTestDir(RemoteDataDir); - RemoteInstance.SpawnServer(RemotePortNumber); - RemoteInstance.WaitUntilReady(); - - ZenServerInstance LocalInstance(TestEnv); - LocalInstance.SetTestDir(LocalDataDir); - LocalInstance.SpawnServer(LocalPortNumber, - fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); - LocalInstance.WaitUntilReady(); - - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); - - // Store the cache record package in upstream cache - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, - cpr::Body{(const char*)Body.Data(), Body.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - - CHECK(Result.status_code == 201); - } - - // The cache record can be retrieved as a package from the local cache - { - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } -} - -TEST_CASE("zcache.policy") -{ - using namespace std::literals; - using namespace utils; - - auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { - auto Buf = zen::UniqueBuffer::Alloc(Size); - uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData()); - for (uint64_t Idx = 0; Idx < Size; Idx++) - { - Data[Idx] = Idx % 256; - } - OutHash = zen::IoHash::HashBuffer(Data, Size); - return Buf; - }; - - auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { - auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); - auto CompressedData = zen::CompressedBuffer::Compress(Data); - OutAttachmentKey = CompressedData.DecodeRawHash(); - - zen::CbWriter Writer; - Writer.BeginObject("obj"sv); - Writer.AddBinaryAttachment("data", OutAttachmentKey); - Writer.EndObject(); - CbObject CacheRecord = Writer.Save().AsObject(); - - OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); - - zen::CbPackage Package; - Package.SetObject(CacheRecord); - Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); - - return Package; - }; - - auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { - zen::BinaryWriter MemStream; - Package.Save(MemStream); - - return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - }; - - SUBCASE("query - 'local' does not query upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "legacy"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - auto BinaryValue = GenerateData(1024, Key); - - // Store binary cache value upstream - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, - cpr::Header{{"Content-Type", "application/octet-stream"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(Result.status_code == 404); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(Result.status_code == 200); - } - } - - SUBCASE("store - 'local' does not store upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "legacy"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - auto BinaryValue = GenerateData(1024, Key); - - // Store binary cache value locally - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, - cpr::Header{{"Content-Type", "application/octet-stream"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(Result.status_code == 404); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(Result.status_code == 200); - } - } - - SUBCASE("store - 'local/remote' stores local and upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "legacy"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - auto BinaryValue = GenerateData(1024, Key); - - // Store binary cache value locally and upstream - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, - cpr::Header{{"Content-Type", "application/octet-stream"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(Result.status_code == 200); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(Result.status_code == 200); - } - } - - SUBCASE("query - 'local' does not query upstream (cppackage)") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "legacy"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); - - // Store package upstream - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 404); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - } - } - - SUBCASE("store - 'local' does not store upstream (cbpackge)") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "legacy"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); - - // Store packge locally - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 404); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - } - } - - SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "legacy"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); - - // Store package locally and upstream - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - } - } - - SUBCASE("skip - 'data' returns cache record without attachments/empty payload") - { - ZenConfig Cfg = ZenConfig::New(); - ZenServerInstance Instance(TestEnv); - const auto Bucket = "test"sv; - - Cfg.Spawn(Instance); - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); - - // Store package - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - // Get package - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(IsHttpSuccessCode(Result.status_code)); - IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); - CbPackage ResponsePackage; - CHECK(ResponsePackage.TryLoad(Buffer)); - CHECK(ResponsePackage.GetAttachments().size() == 0); - } - - // Get record - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cb"}}); - CHECK(IsHttpSuccessCode(Result.status_code)); - IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); - CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer); - CHECK((bool)ResponseObject); - } - - // Get payload - { - cpr::Response Result = - cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)}, - cpr::Header{{"Accept", "application/x-ue-comp"}}); - CHECK(IsHttpSuccessCode(Result.status_code)); - CHECK(Result.text.size() == 0); - } - } - - SUBCASE("skip - 'data' returns empty binary value") - { - ZenConfig Cfg = ZenConfig::New(); - ZenServerInstance Instance(TestEnv); - const auto Bucket = "test"sv; - - Cfg.Spawn(Instance); - - zen::IoHash Key; - auto BinaryValue = GenerateData(1024, Key); - - // Store binary cache value - { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, - cpr::Header{{"Content-Type", "application/octet-stream"}}); - CHECK(Result.status_code == 201); - } - - // Get package - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/octet-stream"}}); - CHECK(IsHttpSuccessCode(Result.status_code)); - CHECK(Result.text.size() == 0); - } - } -} - -TEST_CASE("zcache.rpc") -{ - using namespace std::literals; - - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - const zen::CacheKey& CacheKey, - size_t PayloadSize, - CachePolicy RecordPolicy) { - std::vector<uint8_t> Data; - Data.resize(PayloadSize); - uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]); - uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); - for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx) - { - DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu); - } - if (PayloadSize & 1) - { - Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff); - } - CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); - Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); - }; - - auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, - std::string_view Namespace, - std::string_view Bucket, - size_t Num, - size_t PayloadSize = 1024, - size_t KeyOffset = 1) -> std::vector<CacheKey> { - std::vector<zen::CacheKey> OutKeys; - - for (uint32_t Key = 1; Key <= Num; ++Key) - { - zen::IoHash KeyHash; - ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key); - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); - - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); - OutKeys.push_back(CacheKey); - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - - CHECK(Result.status_code == 200); - } - - return OutKeys; - }; - - struct GetCacheRecordResult - { - zen::CbPackage Response; - cacherequests::GetCacheRecordsResult Result; - bool Success; - }; - - auto GetCacheRecords = [](std::string_view BaseUri, - std::string_view Namespace, - std::span<zen::CacheKey> Keys, - zen::CachePolicy Policy, - zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone, - int Pid = 0) -> GetCacheRecordResult { - cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .AcceptOptions = static_cast<uint16_t>(AcceptOptions), - .ProcessPid = Pid, - .DefaultPolicy = Policy, - .Namespace = std::string(Namespace)}; - for (const CacheKey& Key : Keys) - { - Request.Requests.push_back({.Key = Key}); - } - - CbObjectWriter RequestWriter; - CHECK(Request.Format(RequestWriter)); - - BinaryWriter Body; - RequestWriter.Save(Body); - - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - - GetCacheRecordResult OutResult; - - if (Result.status_code == 200) - { - CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); - if (!Response.IsNull()) - { - OutResult.Response = std::move(Response); - CHECK(OutResult.Result.Parse(OutResult.Response)); - OutResult.Success = true; - } - } - - return OutResult; - }; - - SUBCASE("get cache records") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const uint16_t PortNumber = 13337; - const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - Inst.SpawnServer(PortNumber); - Inst.WaitUntilReady(); - - CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); - GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record); - CHECK(Record->Key == ExpectedKey); - CHECK(Record->Values.size() == 1); - - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.Body); - } - } - } - - SUBCASE("get missing cache records") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const uint16_t PortNumber = 13337; - const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - Inst.SpawnServer(PortNumber); - Inst.WaitUntilReady(); - - CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); - std::vector<zen::CacheKey> Keys; - - for (const zen::CacheKey& Key : ExistingKeys) - { - Keys.push_back(Key); - Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); - } - - GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - size_t KeyIndex = 0; - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const bool Missing = Index++ % 2 != 0; - - if (Missing) - { - CHECK(!Record); - } - else - { - const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.Body); - } - } - } - } - - SUBCASE("policy - 'QueryLocal' does not query upstream") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamServer(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalServer(TestEnv); - - SpawnServer(UpstreamServer, UpstreamCfg); - SpawnServer(LocalServer, LocalCfg); - - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); - - CachePolicy Policy = CachePolicy::QueryLocal; - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(!Record); - } - } - - SUBCASE("policy - 'QueryRemote' does query upstream") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamServer(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalServer(TestEnv); - - SpawnServer(UpstreamServer, UpstreamCfg); - SpawnServer(LocalServer, LocalCfg); - - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); - - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - } - } - - SUBCASE("RpcAcceptOptions") - { - using namespace utils; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const uint16_t PortNumber = 13337; - const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - Inst.SpawnServer(PortNumber); - Inst.WaitUntilReady(); - - std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); - std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size()); - - std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end()); - Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); - - { - GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(!IsFileRef); - } - } - } - - // File path, but only for large files - { - GetCacheRecordResult Result = - GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef == (Body.Size() > 1024)); - } - } - } - - // File path, for all files - { - GetCacheRecordResult Result = - GetCacheRecords(BaseUri, - "ue4.ddc"sv, - Keys, - CachePolicy::Default, - RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef); - } - } - } - - // File handle, but only for large files - { - GetCacheRecordResult Result = GetCacheRecords(BaseUri, - "ue4.ddc"sv, - Keys, - CachePolicy::Default, - RpcAcceptOptions::kAllowLocalReferences, - GetCurrentProcessId()); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef == (Body.Size() > 1024)); - } - } - } - - // File handle, for all files - { - GetCacheRecordResult Result = - GetCacheRecords(BaseUri, - "ue4.ddc"sv, - Keys, - CachePolicy::Default, - RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences, - GetCurrentProcessId()); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef); - } - } - } - } -} - -TEST_CASE("zcache.failing.upstream") -{ - // This is an exploratory test that takes a long time to run, so lets skip it by default - if (true) - { - return; - } - - using namespace std::literals; - using namespace utils; - - const uint16_t Upstream1PortNumber = 13338; - ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber); - ZenServerInstance Upstream1Server(TestEnv); - - const uint16_t Upstream2PortNumber = 13339; - ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber); - ZenServerInstance Upstream2Server(TestEnv); - - std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber}; - ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false); - LocalCfg.Args += (" --upstream-thread-count 2"); - ZenServerInstance LocalServer(TestEnv); - const uint16_t LocalPortNumber = 13337; - const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber); - const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber); - - SpawnServer(Upstream1Server, Upstream1Cfg); - SpawnServer(Upstream2Server, Upstream2Cfg); - SpawnServer(LocalServer, LocalCfg); - bool Upstream1Running = true; - bool Upstream2Running = true; - - using namespace std::literals; - - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - const zen::CacheKey& CacheKey, - size_t PayloadSize, - CachePolicy RecordPolicy) { - std::vector<uint32_t> Data; - Data.resize(PayloadSize / 4); - for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) - { - Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; - } - - CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); - Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); - }; - - auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, - std::string_view Namespace, - std::string_view Bucket, - size_t Num, - size_t KeyOffset, - size_t PayloadSize = 8192) -> std::vector<CacheKey> { - std::vector<zen::CacheKey> OutKeys; - - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - for (size_t Key = 1; Key <= Num; ++Key) - { - zen::IoHash KeyHash; - ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); - - AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); - OutKeys.push_back(CacheKey); - } - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - - if (Result.status_code != 200) - { - ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); - OutKeys.clear(); - } - - return OutKeys; - }; - - struct GetCacheRecordResult - { - zen::CbPackage Response; - cacherequests::GetCacheRecordsResult Result; - bool Success = false; - }; - - auto GetCacheRecords = [](std::string_view BaseUri, - std::string_view Namespace, - std::span<zen::CacheKey> Keys, - zen::CachePolicy Policy) -> GetCacheRecordResult { - cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = Policy, - .Namespace = std::string(Namespace)}; - for (const CacheKey& Key : Keys) - { - Request.Requests.push_back({.Key = Key}); - } - - CbObjectWriter RequestWriter; - CHECK(Request.Format(RequestWriter)); - - BinaryWriter Body; - RequestWriter.Save(Body); - - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - - GetCacheRecordResult OutResult; - - if (Result.status_code == 200) - { - CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); - if (!Response.IsNull()) - { - OutResult.Response = std::move(Response); - CHECK(OutResult.Result.Parse(OutResult.Response)); - OutResult.Success = true; - } - } - else - { - ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code); - } - - return OutResult; - }; - - // Populate with some simple data - - CachePolicy Policy = CachePolicy::Default; - - const size_t ThreadCount = 128; - const size_t KeyMultiplier = 16384; - const size_t RecordsPerRequest = 64; - WorkerThreadPool Pool(ThreadCount); - - std::atomic_size_t Completed = 0; - - auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier]; - RwLock KeysLock; - - for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) - { - size_t Iteration = I; - Pool.ScheduleWork([&] { - std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); - if (NewKeys.size() != RecordsPerRequest) - { - ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); - Completed.fetch_add(1); - return; - } - { - RwLock::ExclusiveLockScope _(KeysLock); - Keys[Iteration].swap(NewKeys); - } - Completed.fetch_add(1); - }); - } - bool UseUpstream1 = false; - while (Completed < ThreadCount * KeyMultiplier) - { - Sleep(8000); - - if (UseUpstream1) - { - if (Upstream2Running) - { - Upstream2Server.EnableTermination(); - Upstream2Server.Shutdown(); - Sleep(100); - Upstream2Running = false; - } - if (!Upstream1Running) - { - SpawnServer(Upstream1Server, Upstream1Cfg); - Upstream1Running = true; - } - UseUpstream1 = !UseUpstream1; - } - else - { - if (Upstream1Running) - { - Upstream1Server.EnableTermination(); - Upstream1Server.Shutdown(); - Sleep(100); - Upstream1Running = false; - } - if (!Upstream2Running) - { - SpawnServer(Upstream2Server, Upstream2Cfg); - Upstream2Running = true; - } - UseUpstream1 = !UseUpstream1; - } - } - - Completed = 0; - for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) - { - size_t Iteration = I; - std::vector<CacheKey>& LocalKeys = Keys[Iteration]; - if (LocalKeys.empty()) - { - Completed.fetch_add(1); - continue; - } - Pool.ScheduleWork([&] { - GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); - - if (!Result.Success) - { - ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); - Completed.fetch_add(1); - return; - } - - if (Result.Result.Results.size() != LocalKeys.size()) - { - ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); - Completed.fetch_add(1); - return; - } - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const CacheKey& ExpectedKey = LocalKeys[Index++]; - if (!Record) - { - continue; - } - if (Record->Key != ExpectedKey) - { - continue; - } - if (Record->Values.size() != 1) - { - continue; - } - - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - if (!Value.Body) - { - continue; - } - } - } - Completed.fetch_add(1); - }); - } - while (Completed < ThreadCount * KeyMultiplier) - { - Sleep(10); - } -} - -TEST_CASE("zcache.rpc.allpolicies") -{ - using namespace std::literals; - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamServer(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalServer(TestEnv); - const uint16_t LocalPortNumber = 13337; - const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - - SpawnServer(UpstreamServer, UpstreamCfg); - SpawnServer(LocalServer, LocalCfg); - - std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; - std::string_view TestBucket = "allpoliciestest"sv; - std::string_view TestNamespace = "ue4.ddc"sv; - - // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) - // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) - constexpr int NumKeys = 256; - constexpr int NumValues = 4; - Oid ValueIds[NumValues]; - IoHash Hash; - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - ExtendableStringBuilder<16> ValueName; - ValueName << "ValueId_"sv << ValueIndex; - static_assert(sizeof(IoHash) >= sizeof(Oid)); - ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); - } - - struct KeyData; - struct UserData - { - UserData& Set(KeyData* InKeyData, int InValueIndex) - { - Data = InKeyData; - ValueIndex = InValueIndex; - return *this; - } - KeyData* Data = nullptr; - int ValueIndex = 0; - }; - struct KeyData - { - CompressedBuffer BufferValues[NumValues]; - uint64_t IntValues[NumValues]; - UserData ValueUserData[NumValues]; - bool ReceivedChunk[NumValues]; - CacheKey Key; - UserData KeyUserData; - uint32_t KeyIndex = 0; - bool GetRequestsData = true; - bool UseValueAPI = false; - bool UseValuePolicy = false; - bool ForceMiss = false; - bool UseLocal = true; - bool UseRemote = true; - bool ShouldBeHit = true; - bool ReceivedPut = false; - bool ReceivedGet = false; - bool ReceivedPutValue = false; - bool ReceivedGetValue = false; - }; - struct CachePutRequest - { - CacheKey Key; - CbObject Record; - CacheRecordPolicy Policy; - KeyData* Values; - UserData* Data; - }; - struct CachePutValueRequest - { - CacheKey Key; - CompressedBuffer Value; - CachePolicy Policy; - UserData* Data; - }; - struct CacheGetRequest - { - CacheKey Key; - CacheRecordPolicy Policy; - UserData* Data; - }; - struct CacheGetValueRequest - { - CacheKey Key; - CachePolicy Policy; - UserData* Data; - }; - struct CacheGetChunkRequest - { - CacheKey Key; - Oid ValueId; - uint64_t RawOffset; - uint64_t RawSize; - IoHash RawHash; - CachePolicy Policy; - UserData* Data; - }; - - KeyData KeyDatas[NumKeys]; - std::vector<CachePutRequest> PutRequests; - std::vector<CachePutValueRequest> PutValueRequests; - std::vector<CacheGetRequest> GetRequests; - std::vector<CacheGetValueRequest> GetValueRequests; - std::vector<CacheGetChunkRequest> ChunkRequests; - - for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) - { - IoHashStream KeyWriter; - KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); - KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); - IoHash KeyHash = KeyWriter.GetHash(); - KeyData& KeyData = KeyDatas[KeyIndex]; - - KeyData.Key = CacheKey::Create(TestBucket, KeyHash); - KeyData.KeyIndex = KeyIndex; - KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; - KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; - KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; - KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; - KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; - KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; - KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); - CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; - SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; - CachePolicy PutPolicy = SharedPolicy; - CachePolicy GetPolicy = SharedPolicy; - GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; - CacheKey& Key = KeyData.Key; - - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32); - KeyData.BufferValues[ValueIndex] = - CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); - KeyData.ReceivedChunk[ValueIndex] = false; - } - - UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); - } - if (!KeyData.UseValueAPI) - { - CbObjectWriter Builder; - Builder.BeginObject("key"sv); - Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; - Builder.EndObject(); - Builder.BeginArray("Values"sv); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - Builder.BeginObject(); - Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); - Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash()); - Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize()); - Builder.EndObject(); - } - Builder.EndArray(); - - CacheRecordPolicy PutRecordPolicy; - CacheRecordPolicy GetRecordPolicy; - if (!KeyData.UseValuePolicy) - { - PutRecordPolicy = CacheRecordPolicy(PutPolicy); - GetRecordPolicy = CacheRecordPolicy(GetPolicy); - } - else - { - // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies - // it will use the wrong value for SkipData and fail our tests. - CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); - CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); - GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); - } - PutRecordPolicy = PutBuilder.Build(); - GetRecordPolicy = GetBuilder.Build(); - } - if (!KeyData.ForceMiss) - { - PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); - } - GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; - ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); - } - } - else - { - if (!KeyData.ForceMiss) - { - PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); - } - GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); - ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); - } - } - - // PutCacheRecords - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - Request.Requests.reserve(PutRequests.size()); - for (CachePutRequest& PutRequest : PutRequests) - { - cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back(); - RecordRequest.Key = PutRequest.Key; - RecordRequest.Policy = PutRequest.Policy; - RecordRequest.Values.reserve(NumValues); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]}); - } - PutRequest.Data->Data->ReceivedPut = true; - } - - CbPackage Package; - CHECK(Request.Format(Package)); - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK_MESSAGE(Result.status_code == 200, "PutCacheRecords unexpectedly failed."); - } - - // PutCacheValues - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - - cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - Request.Requests.reserve(PutValueRequests.size()); - for (CachePutValueRequest& PutRequest : PutValueRequests) - { - Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy}); - PutRequest.Data->Data->ReceivedPutValue = true; - } - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK_MESSAGE(Result.status_code == 200, "PutCacheValues unexpectedly failed."); - } - - for (KeyData& KeyData : KeyDatas) - { - if (!KeyData.ForceMiss) - { - if (!KeyData.UseValueAPI) - { - CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str()); - } - else - { - CHECK_MESSAGE(KeyData.ReceivedPutValue, - WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str()); - } - } - } - - // GetCacheRecords - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - Request.Requests.reserve(GetRequests.size()); - for (CacheGetRequest& GetRequest : GetRequests) - { - Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); - } - - CbPackage Package; - CHECK(Request.Format(Package)); - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed."); - CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); - cacherequests::GetCacheRecordsResult RequestResult; - CHECK(RequestResult.Parse(Response)); - CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count."); - for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results) - { - bool Succeeded = RecordResult.has_value(); - CacheGetRequest& GetRequest = GetRequests[Index++]; - KeyData* KeyData = GetRequest.Data->Data; - KeyData->ReceivedGet = true; - WriteToString<32> Name("Get(", KeyData->KeyIndex, ")"); - if (KeyData->ShouldBeHit) - { - CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); - } - else if (KeyData->ForceMiss) - { - CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str()); - } - if (!KeyData->ForceMiss && Succeeded) - { - CHECK_MESSAGE(RecordResult->Values.size() == NumValues, - WriteToString<32>(Name, " number of values did not match.").c_str()); - for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values) - { - int ExpectedValueIndex = 0; - for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) - { - if (ValueIds[ExpectedValueIndex] == Value.Id) - { - break; - } - } - CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str()); - - WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")"); - - CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; - CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(), - WriteToString<32>(ValueName, " RawHash did not match.").c_str()); - CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(), - WriteToString<32>(ValueName, " RawSize did not match.").c_str()); - - if (KeyData->GetRequestsData) - { - SharedBuffer Buffer = Value.Body.Decompress(); - CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize, - WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str()); - uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; - uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; - CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str()); - } - } - } - } - } - - // GetCacheValues - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - - cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - GetCacheValuesRequest.Requests.reserve(GetValueRequests.size()); - for (CacheGetValueRequest& GetRequest : GetValueRequests) - { - GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); - } - - CbPackage Package; - CHECK(GetCacheValuesRequest.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed."); - IoBuffer MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - CbPackage Response = ParsePackageMessage(MessageBuffer); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); - cacherequests::GetCacheValuesResult GetCacheValuesResult; - CHECK(GetCacheValuesResult.Parse(Response)); - for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results) - { - bool Succeeded = ValueResult.RawHash != IoHash::Zero; - CacheGetValueRequest& Request = GetValueRequests[Index++]; - KeyData* KeyData = Request.Data->Data; - KeyData->ReceivedGetValue = true; - WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv); - - if (KeyData->ShouldBeHit) - { - CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); - } - else if (KeyData->ForceMiss) - { - CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str()); - } - if (!KeyData->ForceMiss && Succeeded) - { - CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; - CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), - WriteToString<32>(Name, " RawHash did not match.").c_str()); - CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), - WriteToString<32>(Name, " RawSize did not match.").c_str()); - - if (KeyData->GetRequestsData) - { - SharedBuffer Buffer = ValueResult.Body.Decompress(); - CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, - WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); - uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; - uint64_t ExpectedIntValue = KeyData->IntValues[0]; - CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); - } - } - } - } - - // GetCacheChunks - { - std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { - return A.Key.Hash < B.Key.Hash; - }); - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - GetCacheChunksRequest.Requests.reserve(ChunkRequests.size()); - for (CacheGetChunkRequest& ChunkRequest : ChunkRequests) - { - GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key, - .ValueId = ChunkRequest.ValueId, - .ChunkId = IoHash(), - .RawOffset = ChunkRequest.RawOffset, - .RawSize = ChunkRequest.RawSize, - .Policy = ChunkRequest.Policy}); - } - CbPackage Package; - CHECK(GetCacheChunksRequest.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed."); - CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); - cacherequests::GetCacheChunksResult GetCacheChunksResult; - CHECK(GetCacheChunksResult.Parse(Response)); - CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(), - "GetCacheChunks response count did not match request count."); - - for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results) - { - bool Succeeded = ValueResult.RawHash != IoHash::Zero; - - CacheGetChunkRequest& Request = ChunkRequests[Index++]; - KeyData* KeyData = Request.Data->Data; - int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; - KeyData->ReceivedChunk[ValueIndex] = true; - WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv); - - if (KeyData->ShouldBeHit) - { - CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str()); - } - else if (KeyData->ForceMiss) - { - CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str()); - } - if (KeyData->ShouldBeHit && Succeeded) - { - CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; - CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), - WriteToString<32>(Name, " had unexpected RawHash.").c_str()); - CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), - WriteToString<32>(Name, " had unexpected RawSize.").c_str()); - - if (KeyData->GetRequestsData) - { - SharedBuffer Buffer = ValueResult.Body.Decompress(); - CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, - WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); - uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; - uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; - CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); - } - } - } - } - - for (KeyData& KeyData : KeyDatas) - { - if (!KeyData.UseValueAPI) - { - CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - CHECK_MESSAGE( - KeyData.ReceivedChunk[ValueIndex], - WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str()); - } - } - else - { - CHECK_MESSAGE(KeyData.ReceivedGetValue, - WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); - CHECK_MESSAGE(KeyData.ReceivedChunk[0], - WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); - } - } -} - -class ZenServerTestHelper -{ -public: - ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {} - ~ZenServerTestHelper() {} - - void SpawnServers(std::string_view AdditionalServerArgs = std::string_view()) - { - SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs); - } - - void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs) - { - ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount); - - m_Instances.resize(m_ServerCount); - - for (int i = 0; i < m_ServerCount; ++i) - { - auto& Instance = m_Instances[i]; - - Instance = std::make_unique<ZenServerInstance>(TestEnv); - Instance->SetTestDir(TestEnv.CreateNewTestDir()); - - Callback(*Instance); - - Instance->SpawnServer(13337 + i, AdditionalServerArgs); - } - - for (int i = 0; i < m_ServerCount; ++i) - { - auto& Instance = m_Instances[i]; - - Instance->WaitUntilReady(); - } - } - - ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } - -private: - std::string m_HelperId; - int m_ServerCount = 0; - std::vector<std::unique_ptr<ZenServerInstance> > m_Instances; -}; - -class IoDispatcher -{ -public: - IoDispatcher(asio::io_context& IoCtx) : m_IoCtx(IoCtx) {} - ~IoDispatcher() { Stop(); } - - void Run() - { - Stop(); - - m_Running = true; - - m_IoThread = std::thread([this]() { - try - { - m_IoCtx.run(); - } - catch (std::exception& Error) - { - m_Error = Error; - } - }); - } - - void Stop() - { - if (m_Running) - { - m_Running = false; - - if (m_IoThread.joinable()) - { - m_IoThread.join(); - } - } - } - - bool IsRunning() const { return m_Running; } - - const std::exception& Error() { return m_Error; } - -private: - asio::io_context& m_IoCtx; - std::thread m_IoThread; - std::exception m_Error; - std::atomic_bool m_Running{false}; -}; - -TEST_CASE("http.basics") -{ - using namespace std::literals; - - ZenServerTestHelper Servers{"http.basics"sv, 1}; - Servers.SpawnServers(); - - ZenServerInstance& Instance = Servers.GetInstance(0); - const std::string BaseUri = Instance.GetBaseUri(); - - { - cpr::Response r = cpr::Get(cpr::Url{fmt::format("{}/testing/hello", BaseUri)}); - CHECK(IsHttpSuccessCode(r.status_code)); - } - - { - cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/hello", BaseUri)}); - CHECK_EQ(r.status_code, 404); - } - - { - cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/echo", BaseUri)}, cpr::Body{"yoyoyoyo"}); - CHECK_EQ(r.status_code, 200); - CHECK_EQ(r.text, "yoyoyoyo"); - } -} - -TEST_CASE("http.package") -{ - using namespace std::literals; - - ZenServerTestHelper Servers{"http.package"sv, 1}; - Servers.SpawnServers(); - - ZenServerInstance& Instance = Servers.GetInstance(0); - const std::string BaseUri = Instance.GetBaseUri(); - - static const uint8_t Data1[] = {0, 1, 2, 3}; - static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8}; - - zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}), - zen::OodleCompressor::NotSet, - zen::OodleCompressionLevel::None); - zen::CbAttachment Attach1{AttachmentData1, AttachmentData1.DecodeRawHash()}; - zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}), - zen::OodleCompressor::NotSet, - zen::OodleCompressionLevel::None); - zen::CbAttachment Attach2{AttachmentData2, AttachmentData2.DecodeRawHash()}; - - zen::CbObjectWriter Writer; - - Writer.AddAttachment("attach1", Attach1); - Writer.AddAttachment("attach2", Attach2); - - zen::CbObject CoreObject = Writer.Save(); - - zen::CbPackage TestPackage; - TestPackage.SetObject(CoreObject); - TestPackage.AddAttachment(Attach1); - TestPackage.AddAttachment(Attach2); - - zen::HttpClient TestClient(BaseUri); - zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage); - - zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); - - CHECK_EQ(ResponsePackage, TestPackage); -} - -TEST_CASE("websocket.basic") -{ - if (true) - { - return; - } - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const uint16_t PortNumber = 13337; - const auto MaxWaitTime = std::chrono::seconds(5); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - Inst.SpawnServer(PortNumber, "--websocket-port=8848"sv); - Inst.WaitUntilReady(); - - asio::io_context IoCtx; - IoDispatcher IoDispatcher(IoCtx); - auto WebSocket = WebSocketClient::Create(IoCtx); - - auto ConnectFuture = WebSocket->Connect({.Host = "127.0.0.1", .Port = 8848, .Endpoint = "/zen"}); - IoDispatcher.Run(); - - ConnectFuture.wait_for(MaxWaitTime); - CHECK(ConnectFuture.get()); - - for (size_t Idx = 0; Idx < 10; Idx++) - { - CbObjectWriter Request; - Request << "Method"sv - << "SayHello"sv; - - WebSocketMessage RequestMsg; - RequestMsg.SetMessageType(WebSocketMessageType::kRequest); - RequestMsg.SetBody(Request.Save()); - - auto ResponseFuture = WebSocket->SendRequest(std::move(RequestMsg)); - ResponseFuture.wait_for(MaxWaitTime); - - CbObject Response = ResponseFuture.get().Body().GetObject(); - std::string_view Message = Response["Result"].AsString(); - - CHECK(Message == "Hello Friend!!"sv); - } - - WebSocket->Disconnect(); - - IoCtx.stop(); - IoDispatcher.Stop(); -} - -std::string -OidAsString(const Oid& Id) -{ - StringBuilder<25> OidStringBuilder; - Id.ToString(OidStringBuilder); - return OidStringBuilder.ToString(); -} - -CbPackage -CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments) -{ - CbPackage Package; - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("bulkdata"); - for (const auto& Attachment : Attachments) - { - CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "type"sv - << "Standard"sv; - Object << "data"sv << Attach; - Object.EndObject(); - - Package.AddAttachment(Attach); - ZEN_DEBUG("Added attachment {}", Attach.GetHash()); - } - Object.EndArray(); - } - Package.SetObject(Object.Save()); - return Package; -}; - -std::vector<std::pair<Oid, CompressedBuffer> > -CreateAttachments(const std::span<const size_t>& Sizes) -{ - std::vector<std::pair<Oid, CompressedBuffer> > Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) - { - std::vector<uint8_t> Data; - Data.resize(Size); - uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); - for (size_t Idx = 0; Idx < Size / 2; ++Idx) - { - DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); - } - if (Size & 1) - { - Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); - } - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); - } - return Result; -} - -cpr::Body -AsBody(const IoBuffer& Payload) -{ - return cpr::Body{(const char*)Payload.GetData(), Payload.Size()}; -}; - -enum CbWriterMeta -{ - BeginObject, - EndObject, - BeginArray, - EndArray -}; - -inline CbWriter& -operator<<(CbWriter& Writer, CbWriterMeta Meta) -{ - switch (Meta) - { - case BeginObject: - Writer.BeginObject(); - break; - case EndObject: - Writer.EndObject(); - break; - case BeginArray: - Writer.BeginArray(); - break; - case EndArray: - Writer.EndArray(); - break; - default: - ZEN_ASSERT(false); - } - return Writer; -} - -TEST_CASE("project.remote") -{ - using namespace std::literals; - - ZenServerTestHelper Servers("remote", 3); - Servers.SpawnServers("--debug"); - - std::vector<Oid> OpIds; - OpIds.reserve(24); - for (size_t I = 0; I < 24; ++I) - { - OpIds.emplace_back(Oid::NewOid()); - } - - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments; - { - std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, - 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, - 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045, - 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); - auto It = AttachmentSizes.begin(); - Attachments[OpIds[0]] = {}; - Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[4]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[5]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[6]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[7]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[8]] = CreateAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[9]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[10]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[11]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[12]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[13]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[14]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[15]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[16]] = CreateAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[17]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[18]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[19]] = CreateAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[20]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[21]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[22]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[23]] = CreateAttachments(std::initializer_list<size_t>{*It++}); - ZEN_ASSERT(It == AttachmentSizes.end()); - } - - auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { - XXH3_128Stream KeyHasher; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); - Oid Id; - memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits); - IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); - const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); - Ops.insert({Id, OpCoreHash}); - }; - - auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) { - CbObjectWriter Project; - Project.AddString("id"sv, ProjectName); - Project.AddString("root"sv, ""sv); - Project.AddString("engine"sv, ""sv); - Project.AddString("project"sv, ""sv); - Project.AddString("projectfile"sv, ""sv); - IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer(); - std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName); - Session.SetUrl({ProjectRequest}); - Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - }; - - auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { - std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName); - Session.SetUrl({CreateOplogRequest}); - Session.SetBody(cpr::Body{}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - }; - - auto MakeOp = [](cpr::Session& Session, - std::string_view UrlBase, - std::string_view ProjectName, - std::string_view OplogName, - const CbPackage& OpPackage) { - std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName); - Session.SetUrl({CreateOpRequest}); - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - }; - - cpr::Session Session; - MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0"); - MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); - - std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; - for (const Oid& OpId : OpIds) - { - CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]); - CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size()); - AddOp(OpPackage.GetObject(), SourceOps); - MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage); - } - - std::vector<IoHash> AttachmentHashes; - AttachmentHashes.reserve(Attachments.size()); - for (const auto& AttachmentOplog : Attachments) - { - for (const auto& Attachment : AttachmentOplog.second) - { - AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash()); - } - } - - auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer { - CbObjectWriter Writer; - Write(Writer); - IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer(); - Result.MakeOwned(); - return Result; - }; - - auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int ServerIndex, - std::string_view Project, - std::string_view Oplog) { - std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog); - Session.SetUrl({GetChunksRequest}); - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) { - Writer << "method"sv - << "getchunks"sv; - Writer << "chunks"sv << BeginArray; - for (const IoHash& Chunk : AttachmentHashes) - { - Writer << Chunk; - } - Writer << EndArray; // chunks - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); - CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); - }; - - auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) { - std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps; - std::vector<CbObject> ResultingOplog; - - std::string GetOpsRequest = - fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog); - Session.SetUrl({GetOpsRequest}); - cpr::Response Response = Session.Get(); - CHECK(IsHttpSuccessCode(Response.status_code)); - - IoBuffer Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); - CbObject OplogResonse = LoadCompactBinaryObject(Payload); - CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView(); - - for (CbFieldView OpEntry : EntriesArray) - { - CbObjectView Core = OpEntry.AsObjectView(); - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - AddOp(Op, TargetOps); - } - CHECK(SourceOps == TargetOps); - }; - - SUBCASE("File") - { - ScopedTemporaryDirectory TempDir; - { - std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); - Session.SetUrl({SaveOplogRequest}); - - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << path; - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - { - MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - std::string LoadOplogRequest = - fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - Session.SetUrl({LoadOplogRequest}); - - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << path; - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - } - - SUBCASE("File disable blocks") - { - ScopedTemporaryDirectory TempDir; - { - std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); - Session.SetUrl({SaveOplogRequest}); - - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - Writer << "disableblocks"sv << true; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - { - MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - std::string LoadOplogRequest = - fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - Session.SetUrl({LoadOplogRequest}); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - } - - SUBCASE("File force temp blocks") - { - ScopedTemporaryDirectory TempDir; - { - std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); - Session.SetUrl({SaveOplogRequest}); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - Writer << "enabletempblocks"sv << true; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - { - MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - std::string LoadOplogRequest = - fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - Session.SetUrl({LoadOplogRequest}); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - } - - SUBCASE("Zen") - { - ScopedTemporaryDirectory TempDir; - { - std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); - std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); - MakeProject(Session, ExportTargetUri, "proj0_copy"); - MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy"); - - std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0"); - Session.SetUrl({SaveOplogRequest}); - - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "force"sv << false; - Writer << "zen"sv << BeginObject; - { - Writer << "url"sv << ExportTargetUri.substr(7); - Writer << "project" - << "proj0_copy"; - Writer << "oplog" - << "oplog0_copy"; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - - { - std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); - std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); - MakeProject(Session, ImportTargetUri, "proj1"); - MakeOplog(Session, ImportTargetUri, "proj1", "oplog1"); - std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1"); - Session.SetUrl({LoadOplogRequest}); - - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "zen"sv << BeginObject; - { - Writer << "url"sv << ImportSourceUri.substr(7); - Writer << "project" - << "proj0_copy"; - Writer << "oplog" - << "oplog0_copy"; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - Session.SetBody(AsBody(Payload)); - Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - cpr::Response Response = Session.Post(); - CHECK(IsHttpSuccessCode(Response.status_code)); - } - ValidateAttachments(2, "proj1", "oplog1"); - ValidateOplog(2, "proj1", "oplog1"); - } -} - -# if 0 -TEST_CASE("lifetime.owner") -{ - // This test is designed to verify that the hand-over of sponsor processes is handled - // correctly for the case when a second or third process is launched on the same port - // - // Due to the nature of it, it cannot be - - const uint16_t PortNumber = 23456; - - ZenServerInstance Zen1(TestEnv); - std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); - Zen1.SetTestDir(TestDir1); - Zen1.SpawnServer(PortNumber); - Zen1.WaitUntilReady(); - Zen1.Detach(); - - ZenServerInstance Zen2(TestEnv); - std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); - Zen2.SetTestDir(TestDir2); - Zen2.SpawnServer(PortNumber); - Zen2.WaitUntilReady(); - Zen2.Detach(); -} - -TEST_CASE("lifetime.owner.2") -{ - // This test is designed to verify that the hand-over of sponsor processes is handled - // correctly for the case when a second or third process is launched on the same port - // - // Due to the nature of it, it cannot be - - const uint16_t PortNumber = 13456; - - std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); - std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); - - ZenServerInstance Zen1(TestEnv); - Zen1.SetTestDir(TestDir1); - Zen1.SpawnServer(PortNumber); - Zen1.WaitUntilReady(); - - ZenServerInstance Zen2(TestEnv); - Zen2.SetTestDir(TestDir2); - Zen2.SetOwnerPid(Zen1.GetPid()); - Zen2.SpawnServer(PortNumber + 1); - Zen2.Detach(); - - ZenServerInstance Zen3(TestEnv); - Zen3.SetTestDir(TestDir2); - Zen3.SetOwnerPid(Zen1.GetPid()); - Zen3.SpawnServer(PortNumber + 1); - Zen3.Detach(); - - ZenServerInstance Zen4(TestEnv); - Zen4.SetTestDir(TestDir2); - Zen4.SetOwnerPid(Zen1.GetPid()); - Zen4.SpawnServer(PortNumber + 1); - Zen4.Detach(); -} -# endif - -} // namespace zen::tests -#else -int -main() -{ -} -#endif |