// Copyright Epic Games, Inc. All Rights Reserved. #define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_USE_MIMALLOC ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END #endif #include #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") #endif ZEN_THIRD_PARTY_INCLUDES_START #include #undef GetObject ZEN_THIRD_PARTY_INCLUDES_END #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include # include # include #endif #include ////////////////////////////////////////////////////////////////////////// #include "projectclient.h" ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER 1 # include # include #endif using namespace std::literals; #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC struct Concurrency { template static void parallel_invoke(T&&... t) { constexpr size_t NumTs = sizeof...(t); std::thread Threads[NumTs] = { std::thread(std::forward(t))..., }; for (std::thread& Thread : Threads) { Thread.join(); } } }; #endif ////////////////////////////////////////////////////////////////////////// // // Custom logging -- test code, this should be tweaked // namespace logging { using namespace spdlog; using namespace spdlog::details; using namespace std::literals; class full_test_formatter final : public spdlog::formatter { public: full_test_formatter(std::string_view LogId, std::chrono::time_point Epoch) : m_Epoch(Epoch), m_LogId(LogId) { } virtual std::unique_ptr clone() const override { return std::make_unique(m_LogId, m_Epoch); } static constexpr bool UseDate = false; virtual void format(const details::log_msg& msg, memory_buf_t& dest) override { using std::chrono::duration_cast; using std::chrono::milliseconds; using std::chrono::seconds; if constexpr (UseDate) { auto secs = std::chrono::duration_cast(msg.time.time_since_epoch()); if (secs != m_LastLogSecs) { m_CachedTm = os::localtime(log_clock::to_time_t(msg.time)); m_LastLogSecs = secs; } } const auto& tm_time = m_CachedTm; // cache the date/time part for the next second. auto duration = msg.time - m_Epoch; auto secs = duration_cast(duration); if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0) { m_CachedDatetime.clear(); m_CachedDatetime.push_back('['); if constexpr (UseDate) { fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime); m_CachedDatetime.push_back('-'); fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime); m_CachedDatetime.push_back('-'); fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime); m_CachedDatetime.push_back(' '); fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime); } else { int Count = int(secs.count()); const int LogSecs = Count % 60; Count /= 60; const int LogMins = Count % 60; Count /= 60; const int LogHours = Count; fmt_helper::pad2(LogHours, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(LogMins, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(LogSecs, m_CachedDatetime); } m_CachedDatetime.push_back('.'); m_CacheTimestamp = secs; } dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end()); auto millis = fmt_helper::time_fraction(msg.time); fmt_helper::pad3(static_cast(millis.count()), dest); dest.push_back(']'); dest.push_back(' '); if (!m_LogId.empty()) { dest.push_back('['); fmt_helper::append_string_view(m_LogId, dest); dest.push_back(']'); dest.push_back(' '); } // append logger name if exists if (msg.logger_name.size() > 0) { dest.push_back('['); fmt_helper::append_string_view(msg.logger_name, dest); dest.push_back(']'); dest.push_back(' '); } dest.push_back('['); // wrap the level name with color msg.color_range_start = dest.size(); fmt_helper::append_string_view(level::to_string_view(msg.level), dest); msg.color_range_end = dest.size(); dest.push_back(']'); dest.push_back(' '); // add source location if present if (!msg.source.empty()) { dest.push_back('['); const char* filename = details::short_filename_formatter::basename(msg.source.filename); fmt_helper::append_string_view(filename, dest); dest.push_back(':'); fmt_helper::append_int(msg.source.line, dest); dest.push_back(']'); dest.push_back(' '); } fmt_helper::append_string_view(msg.payload, dest); fmt_helper::append_string_view("\n"sv, dest); } private: std::chrono::time_point m_Epoch; std::tm m_CachedTm; std::chrono::seconds m_LastLogSecs; std::chrono::seconds m_CacheTimestamp{0}; memory_buf_t m_CachedDatetime; std::string m_LogId; }; } // namespace logging ////////////////////////////////////////////////////////////////////////// #if 0 int main() { mi_version(); zen::Sleep(1000); zen::Stopwatch timer; const int RequestCount = 100000; cpr::Session Sessions[10]; for (auto& Session : Sessions) { Session.SetUrl(cpr::Url{"http://localhost:1337/test/hello"}); //Session.SetUrl(cpr::Url{ "http://arn-wd-l0182:1337/test/hello" }); } auto Run = [](cpr::Session& Session) { for (int i = 0; i < 10000; ++i) { cpr::Response Result = Session.Get(); if (Result.status_code != 200) { ZEN_WARN("request response: {}", Result.status_code); } } }; Concurrency::parallel_invoke([&] { Run(Sessions[0]); }, [&] { Run(Sessions[1]); }, [&] { Run(Sessions[2]); }, [&] { Run(Sessions[3]); }, [&] { Run(Sessions[4]); }, [&] { Run(Sessions[5]); }, [&] { Run(Sessions[6]); }, [&] { Run(Sessions[7]); }, [&] { Run(Sessions[8]); }, [&] { Run(Sessions[9]); }); // cpr::Response r = cpr::Get(cpr::Url{ "http://localhost:1337/test/hello" }); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()), zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req")); return 0; } #elif 0 //#include int main() { mi_version(); restinio::run(restinio::on_thread_pool(32).port(8080).request_handler( [](auto req) { return req->create_response().set_body("Hello, World!").done(); })); return 0; } #elif ZEN_WITH_TESTS zen::ZenServerEnvironment TestEnv; int main(int argc, char** argv) { # if ZEN_USE_MIMALLOC mi_version(); # endif zen::zencore_forcelinktests(); zen::zenhttp_forcelinktests(); zen::cacherequests_forcelink(); zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); spdlog::set_formatter(std::make_unique< ::logging::full_test_formatter>("test", std::chrono::system_clock::now())); std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir); ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); return ZEN_RUN_TESTS(argc, argv); } namespace zen::tests { TEST_CASE("default.single") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Instance(TestEnv); Instance.SetTestDir(TestDir); Instance.SpawnServer(13337); ZEN_INFO("Waiting..."); Instance.WaitUntilReady(); std::atomic RequestCount{0}; std::atomic BatchCounter{0}; ZEN_INFO("Running single server test..."); auto IssueTestRequests = [&] { const uint64_t BatchNo = BatchCounter.fetch_add(1); const int ThreadId = zen::GetCurrentThreadId(); ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId); cpr::Session cli; cli.SetUrl(cpr::Url{"http://localhost:13337/test/hello"}); for (int i = 0; i < 10000; ++i) { auto res = cli.Get(); ++RequestCount; } ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); }; zen::Stopwatch timer; Concurrency::parallel_invoke(IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests); uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } TEST_CASE("multi.basic") { ZenServerInstance Instance1(TestEnv); std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); Instance1.SetTestDir(TestDir1); Instance1.SpawnServer(13337); ZenServerInstance Instance2(TestEnv); std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); Instance2.SetTestDir(TestDir2); Instance2.SpawnServer(13338); ZEN_INFO("Waiting..."); Instance1.WaitUntilReady(); Instance2.WaitUntilReady(); std::atomic RequestCount{0}; std::atomic BatchCounter{0}; auto IssueTestRequests = [&](int PortNumber) { const uint64_t BatchNo = BatchCounter.fetch_add(1); const int ThreadId = zen::GetCurrentThreadId(); ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber); cpr::Session cli; cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)}); for (int i = 0; i < 10000; ++i) { auto res = cli.Get(); ++RequestCount; } ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); }; zen::Stopwatch timer; ZEN_INFO("Running multi-server test..."); Concurrency::parallel_invoke([&] { IssueTestRequests(13337); }, [&] { IssueTestRequests(13338); }, [&] { IssueTestRequests(13337); }, [&] { IssueTestRequests(13338); }); uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } TEST_CASE("project.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); std::atomic RequestCount{0}; zen::Stopwatch timer; std::mt19937_64 mt; zen::StringBuilder<64> BaseUri; BaseUri << fmt::format("http://localhost:{}/prj/test", PortNumber); std::filesystem::path BinPath = zen::GetRunningExecutablePath(); std::filesystem::path RootPath = BinPath.parent_path().parent_path(); BinPath = BinPath.lexically_relative(RootPath); SUBCASE("build store init") { { { zen::CbObjectWriter Body; Body << "id" << "test"; Body << "root" << RootPath.c_str(); Body << "project" << "/zooom"; Body << "engine" << "/zooom"; zen::BinaryWriter MemOut; Body.Save(MemOut); auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); CHECK(Response.status_code == 201); } { auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); CHECK(Response.status_code == 200); zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); CHECK(ResponseObject["id"].AsString() == "test"sv); CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); } } BaseUri << "/oplog/foobar"; { { zen::StringBuilder<64> PostUri; PostUri << BaseUri; auto Response = cpr::Post(cpr::Url{PostUri.c_str()}); CHECK(Response.status_code == 201); } { auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); CHECK(Response.status_code == 200); zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); CHECK(ResponseObject["id"].AsString() == "foobar"sv); CHECK(ResponseObject["project"].AsString() == "test"sv); } } SUBCASE("build store persistence") { uint8_t AttachData[] = {1, 2, 3}; zen::CbAttachment Attach{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}))}; zen::CbObjectWriter OpWriter; OpWriter << "key" << "foo" << "attachment" << Attach; const std::string_view ChunkId{ "00000000" "00000000" "00010000"}; auto FileOid = zen::Oid::FromHexString(ChunkId); OpWriter.BeginArray("files"); OpWriter.BeginObject(); OpWriter << "id" << FileOid; OpWriter << "clientpath" << "/{engine}/client/side/path"; OpWriter << "serverpath" << BinPath.c_str(); OpWriter.EndObject(); OpWriter.EndArray(); zen::CbObject Op = OpWriter.Save(); zen::CbPackage OpPackage(Op); OpPackage.AddAttachment(Attach); zen::BinaryWriter MemOut; legacy::SaveCbPackage(OpPackage, MemOut); { zen::StringBuilder<64> PostUri; PostUri << BaseUri << "/new"; auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); REQUIRE(!Response.error); CHECK(Response.status_code == 201); } // Read file data { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << BaseUri << "/" << ChunkId; auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); REQUIRE(!Response.error); CHECK(Response.status_code == 200); } { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10"; auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); REQUIRE(!Response.error); CHECK(Response.status_code == 200); CHECK(Response.text.size() == 10); } ZEN_INFO("+++++++"); } SUBCASE("build store op commit") { ZEN_INFO("-------"); } } const uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } # if 0 // this is extremely WIP TEST_CASE("project.pipe") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); zen::LocalProjectClient LocalClient(PortNumber); zen::CbObjectWriter Cbow; Cbow << "hey" << 42; zen::CbObject Response = LocalClient.MessageTransaction(Cbow.Save()); } # endif namespace utils { struct ZenConfig { std::filesystem::path DataDir; uint16_t Port; std::string BaseUri; std::string Args; static ZenConfig New(uint16_t Port = 13337, std::string Args = "") { return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .BaseUri = fmt::format("http://localhost:{}/z$", Port), .Args = std::move(Args)}; } static ZenConfig NewWithUpstream(uint16_t UpstreamPort) { return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); } static ZenConfig NewWithThreadedUpstreams(std::span UpstreamPorts, bool Debug) { std::string Args = Debug ? "--debug" : ""; for (uint16_t Port : UpstreamPorts) { Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port); } return New(13337, Args); } void Spawn(ZenServerInstance& Inst) { Inst.SetTestDir(DataDir); Inst.SpawnServer(Port, Args); Inst.WaitUntilReady(); } }; void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg) { Server.SetTestDir(Cfg.DataDir); Server.SpawnServer(Cfg.Port, Cfg.Args); Server.WaitUntilReady(); } } // namespace utils TEST_CASE("zcache.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const int kIterationCount = 100; const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); // Populate with some simple data for (int i = 0; i < kIterationCount; ++i) { zen::CbObjectWriter Cbo; Cbo << "index" << i; zen::BinaryWriter MemOut; Cbo.Save(MemOut); zen::IoHash Key = HashKey(i); cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cb"}}); CHECK(Result.status_code == 201); } // Retrieve data for (int i = 0; i < kIterationCount; ++i) { zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i); cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } // Ensure bad bucket identifiers are rejected { zen::CbObjectWriter Cbo; Cbo << "index" << 42; zen::BinaryWriter MemOut; Cbo.Save(MemOut); zen::IoHash Key = HashKey(442); cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "te!st", Key)}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cb"}}); CHECK(Result.status_code == 400); } } // Verify that the data persists between process runs (the previous server has exited at this point) { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); // Retrieve data again for (int i = 0; i < kIterationCount; ++i) { zen::IoHash Key = HashKey(i); cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } } TEST_CASE("zcache.cbpackage") { using namespace std::literals; auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); zen::CbWriter Obj; Obj.BeginObject("obj"sv); Obj.AddBinaryAttachment("data", OutAttachmentKey); Obj.EndObject(); zen::CbPackage Package; Package.SetObject(Obj.Save().AsObject()); Package.AddAttachment(zen::CbAttachment(CompressedData)); return Package; }; auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { zen::BinaryWriter MemStream; Package.Save(MemStream); return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { std::span LhsAttachments = Lhs.GetAttachments(); std::span RhsAttachments = Rhs.GetAttachments(); if (LhsAttachments.size() != RhsAttachments.size()) { return false; } for (const zen::CbAttachment& LhsAttachment : LhsAttachments) { const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); CHECK(RhsAttachment); zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); CHECK(!LhsBuffer.IsNull()); zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); CHECK(!RhsBuffer.IsNull()); if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) { return false; } } return true; }; SUBCASE("PUT/GET returns correct package") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // PUT { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Body{(const char*)Body.Data(), Body.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // GET { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Response); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } SUBCASE("PUT propagates upstream") { // Setup local and remote server std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); const uint16_t LocalPortNumber = 13337; const uint16_t RemotePortNumber = 13338; const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); ZenServerInstance RemoteInstance(TestEnv); RemoteInstance.SetTestDir(RemoteDataDir); RemoteInstance.SpawnServer(RemotePortNumber); RemoteInstance.WaitUntilReady(); ZenServerInstance LocalInstance(TestEnv); LocalInstance.SetTestDir(LocalDataDir); LocalInstance.SpawnServer(LocalPortNumber, fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); LocalInstance.WaitUntilReady(); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // Store the cache record package in the local instance { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Body{(const char*)Body.Data(), Body.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // The cache record can be retrieved as a package from the local instance { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } // The cache record can be retrieved as a package from the remote instance { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } SUBCASE("GET finds upstream when missing in local") { // Setup local and remote server std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); const uint16_t LocalPortNumber = 13337; const uint16_t RemotePortNumber = 13338; const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); ZenServerInstance RemoteInstance(TestEnv); RemoteInstance.SetTestDir(RemoteDataDir); RemoteInstance.SpawnServer(RemotePortNumber); RemoteInstance.WaitUntilReady(); ZenServerInstance LocalInstance(TestEnv); LocalInstance.SetTestDir(LocalDataDir); LocalInstance.SpawnServer(LocalPortNumber, fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); LocalInstance.WaitUntilReady(); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // Store the cache record package in upstream cache { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Body{(const char*)Body.Data(), Body.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // The cache record can be retrieved as a package from the local cache { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } } TEST_CASE("zcache.policy") { using namespace std::literals; using namespace utils; auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { auto Buf = zen::UniqueBuffer::Alloc(Size); uint8_t* Data = reinterpret_cast(Buf.GetData()); for (uint64_t Idx = 0; Idx < Size; Idx++) { Data[Idx] = Idx % 256; } OutHash = zen::IoHash::HashBuffer(Data, Size); return Buf; }; auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); zen::CbWriter Writer; Writer.BeginObject("obj"sv); Writer.AddBinaryAttachment("data", OutAttachmentKey); Writer.EndObject(); CbObject CacheRecord = Writer.Save().AsObject(); OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); zen::CbPackage Package; Package.SetObject(CacheRecord); Package.AddAttachment(zen::CbAttachment(CompressedData)); return Package; }; auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { zen::BinaryWriter MemStream; Package.Save(MemStream); return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; SUBCASE("query - 'local' does not query upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value upstream { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local' does not store upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value locally { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local/remote' stores local and upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value locally and upstream { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } } SUBCASE("query - 'local' does not query upstream (cppackage)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package upstream { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local' does not store upstream (cbpackge)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store packge locally { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package locally and upstream { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } SUBCASE("skip - 'data' returns cache record without attachments/empty payload") { ZenConfig Cfg = ZenConfig::New(); ZenServerInstance Instance(TestEnv); const auto Bucket = "test"sv; Cfg.Spawn(Instance); zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // Get package { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(IsHttpSuccessCode(Result.status_code)); IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); CbPackage ResponsePackage; CHECK(ResponsePackage.TryLoad(Buffer)); CHECK(ResponsePackage.GetAttachments().size() == 0); } // Get record { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cb"}}); CHECK(IsHttpSuccessCode(Result.status_code)); IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer); CHECK((bool)ResponseObject); } // Get payload { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)}, cpr::Header{{"Accept", "application/x-ue-comp"}}); CHECK(IsHttpSuccessCode(Result.status_code)); CHECK(Result.text.size() == 0); } } SUBCASE("skip - 'data' returns empty binary value") { ZenConfig Cfg = ZenConfig::New(); ZenServerInstance Instance(TestEnv); const auto Bucket = "test"sv; Cfg.Spawn(Instance); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value { cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } // Get package { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(IsHttpSuccessCode(Result.status_code)); CHECK(Result.text.size() == 0); } } } TEST_CASE("zcache.rpc") { using namespace std::literals; auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, cacherequests::RecordsRequestPolicy& Policy, const zen::CacheKey& CacheKey, size_t PayloadSize, CachePolicy RecordPolicy) { std::vector Data; Data.resize(PayloadSize); for (size_t Idx = 0; Idx < PayloadSize; ++Idx) { Data[Idx] = Idx % 255; } CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}}); Policy.RecordPolicies.push_back(RecordPolicy); }; auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, std::string_view Namespace, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024) -> std::vector { std::vector OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; ((uint32_t*)(KeyHash.Hash))[0] = Key; const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default}; AppendCacheRecord(Request, Policy, CacheKey, PayloadSize, CachePolicy::Default); OutKeys.push_back(CacheKey); CbPackage Package; CHECK(Request.Format(Package, Policy)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK(Result.status_code == 200); } return OutKeys; }; struct GetCacheRecordResult { zen::CbPackage Response; cacherequests::GetCacheRecordsResult Result; bool Success; }; auto GetCacheRecords = [](std::string_view BaseUri, std::string_view Namespace, std::span Keys, zen::CachePolicy Policy) -> GetCacheRecordResult { cacherequests::GetCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; cacherequests::RecordsRequestPolicy RequestPolicy = {.DefaultPolicy = Policy}; for (const CacheKey& Key : Keys) { Request.Requests.push_back(Key); RequestPolicy.RecordPolicies.push_back({}); } CbObjectWriter RequestWriter; CHECK(Request.Format(RequestWriter, RequestPolicy)); BinaryWriter Body; RequestWriter.Save(Body); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); GetCacheRecordResult OutResult; if (Result.status_code == 200) { CbPackage Response; if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) { OutResult.Response = std::move(Response); CHECK(OutResult.Result.Parse(OutResult.Response)); OutResult.Success = true; } } return OutResult; }; SUBCASE("get cache records") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); ZenServerInstance Inst(TestEnv); Inst.SetTestDir(TestDir); Inst.SpawnServer(PortNumber); Inst.WaitUntilReady(); CachePolicy Policy = CachePolicy::Default; std::vector Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record); CHECK(Record->Key == ExpectedKey); CHECK(Record->Values.size() == 1); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.Body); } } } SUBCASE("get missing cache records") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); ZenServerInstance Inst(TestEnv); Inst.SetTestDir(TestDir); Inst.SpawnServer(PortNumber); Inst.WaitUntilReady(); CachePolicy Policy = CachePolicy::Default; std::vector ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); std::vector Keys; for (const zen::CacheKey& Key : ExistingKeys) { Keys.push_back(Key); Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); } GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); size_t KeyIndex = 0; for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { const bool Missing = Index++ % 2 != 0; if (Missing) { CHECK(!Record); } else { const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.Body); } } } } SUBCASE("policy - 'QueryLocal' does not query upstream") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamServer(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); CachePolicy Policy = CachePolicy::QueryLocal; GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (const std::optional& Record : Result.Result.Results) { CHECK(!Record); } } SUBCASE("policy - 'QueryRemote' does query upstream") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamServer(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); } } } TEST_CASE("zcache.failing.upstream") { // This is an exploratory test that takes a long time to run, so lets skip it by default if (true) { return; } using namespace std::literals; using namespace utils; const uint16_t Upstream1PortNumber = 13338; ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber); Upstream1Cfg.Args += (" --http asio"); ZenServerInstance Upstream1Server(TestEnv); const uint16_t Upstream2PortNumber = 13339; ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber); Upstream2Cfg.Args += (" --http asio"); ZenServerInstance Upstream2Server(TestEnv); std::vector UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber}; ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false); LocalCfg.Args += (" --http asio --upstream-thread-count 2"); ZenServerInstance LocalServer(TestEnv); const uint16_t LocalPortNumber = 13337; const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber); const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber); SpawnServer(Upstream1Server, Upstream1Cfg); SpawnServer(Upstream2Server, Upstream2Cfg); SpawnServer(LocalServer, LocalCfg); bool Upstream1Running = true; bool Upstream2Running = true; using namespace std::literals; auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, cacherequests::RecordsRequestPolicy& Policy, const zen::CacheKey& CacheKey, size_t PayloadSize, CachePolicy RecordPolicy) { std::vector Data; Data.resize(PayloadSize / 4); for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) { Data[Idx] = (*reinterpret_cast(&CacheKey.Hash.Hash[0])) + Idx; } CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}}); Policy.RecordPolicies.push_back(RecordPolicy); }; auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, std::string_view Namespace, std::string_view Bucket, size_t Num, size_t KeyOffset, size_t PayloadSize = 8192) -> std::vector { std::vector OutKeys; cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default}; for (size_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); AppendCacheRecord(Request, Policy, CacheKey, PayloadSize, CachePolicy::Default); OutKeys.push_back(CacheKey); } CbPackage Package; CHECK(Request.Format(Package, Policy)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); if (Result.status_code != 200) { ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); OutKeys.clear(); } return OutKeys; }; struct GetCacheRecordResult { zen::CbPackage Response; cacherequests::GetCacheRecordsResult Result; bool Success = false; }; auto GetCacheRecords = [](std::string_view BaseUri, std::string_view Namespace, std::span Keys, zen::CachePolicy Policy) -> GetCacheRecordResult { cacherequests::GetCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; cacherequests::RecordsRequestPolicy RequestPolicy = {.DefaultPolicy = Policy}; for (const CacheKey& Key : Keys) { Request.Requests.push_back(Key); RequestPolicy.RecordPolicies.push_back({}); } CbObjectWriter RequestWriter; CHECK(Request.Format(RequestWriter, RequestPolicy)); BinaryWriter Body; RequestWriter.Save(Body); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); GetCacheRecordResult OutResult; if (Result.status_code == 200) { CbPackage Response; if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) { OutResult.Response = std::move(Response); CHECK(OutResult.Result.Parse(OutResult.Response)); OutResult.Success = true; } } else { ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code); } return OutResult; }; // Populate with some simple data CachePolicy Policy = CachePolicy::Default; const size_t ThreadCount = 128; const size_t KeyMultiplier = 16384; const size_t RecordsPerRequest = 64; WorkerThreadPool Pool(ThreadCount); std::atomic_size_t Completed = 0; auto Keys = new std::vector[ThreadCount * KeyMultiplier]; RwLock KeysLock; for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) { size_t Iteration = I; Pool.ScheduleWork([&] { std::vector NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); if (NewKeys.size() != RecordsPerRequest) { ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); Completed.fetch_add(1); return; } { RwLock::ExclusiveLockScope _(KeysLock); Keys[Iteration].swap(NewKeys); } Completed.fetch_add(1); }); } bool UseUpstream1 = false; while (Completed < ThreadCount * KeyMultiplier) { Sleep(8000); if (UseUpstream1) { if (Upstream2Running) { Upstream2Server.EnableTermination(); Upstream2Server.Shutdown(); Sleep(100); Upstream2Running = false; } if (!Upstream1Running) { SpawnServer(Upstream1Server, Upstream1Cfg); Upstream1Running = true; } UseUpstream1 = !UseUpstream1; } else { if (Upstream1Running) { Upstream1Server.EnableTermination(); Upstream1Server.Shutdown(); Sleep(100); Upstream1Running = false; } if (!Upstream2Running) { SpawnServer(Upstream2Server, Upstream2Cfg); Upstream2Running = true; } UseUpstream1 = !UseUpstream1; } } Completed = 0; for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) { size_t Iteration = I; std::vector& LocalKeys = Keys[Iteration]; if (LocalKeys.empty()) { Completed.fetch_add(1); continue; } Pool.ScheduleWork([&] { GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); if (!Result.Success) { ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); Completed.fetch_add(1); return; } if (Result.Result.Results.size() != LocalKeys.size()) { ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); Completed.fetch_add(1); return; } for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { const CacheKey& ExpectedKey = LocalKeys[Index++]; if (!Record) { continue; } if (Record->Key != ExpectedKey) { continue; } if (Record->Values.size() != 1) { continue; } for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { if (!Value.Body) { continue; } } } Completed.fetch_add(1); }); } while (Completed < ThreadCount * KeyMultiplier) { Sleep(10); } } TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamServer(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalServer(TestEnv); const uint16_t LocalPortNumber = 13337; const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); char TestVersion[20 - sizeof(uint32_t) + 1] = "F72150A02AE34B57"; std::string_view TestBucket = "allpoliciestest"sv; std::string_view TestNamespace = "ue4.ddc"sv; // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) constexpr int NumKeys = 256; constexpr int NumValues = 4; Oid ValueIds[NumValues]; IoHash Hash; for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { ExtendableStringBuilder<16> ValueName; ValueName << "ValueId_"sv << ValueIndex; static_assert(sizeof(IoHash) >= sizeof(Oid)); ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); } struct KeyData; struct UserData { UserData& Set(KeyData* InKeyData, int InValueIndex) { Data = InKeyData; ValueIndex = InValueIndex; return *this; } KeyData* Data = nullptr; int ValueIndex = 0; }; struct KeyData { CompressedBuffer BufferValues[NumValues]; uint64_t IntValues[NumValues]; UserData ValueUserData[NumValues]; bool ReceivedChunk[NumValues]; CacheKey Key; UserData KeyUserData; uint32_t KeyIndex = 0; bool GetRequestsData = true; bool UseValueAPI = false; bool UseValuePolicy = false; bool ForceMiss = false; bool UseLocal = true; bool UseRemote = true; bool ShouldBeHit = true; bool ReceivedPut = false; bool ReceivedGet = false; bool ReceivedPutValue = false; bool ReceivedGetValue = false; }; struct CachePutRequest { CacheKey Key; CbObject Record; CacheRecordPolicy Policy; KeyData* Values; UserData* Data; }; struct CachePutValueRequest { CacheKey Key; CompressedBuffer Value; CachePolicy Policy; UserData* Data; }; struct CacheGetRequest { CacheKey Key; CacheRecordPolicy Policy; UserData* Data; }; struct CacheGetValueRequest { CacheKey Key; CachePolicy Policy; UserData* Data; }; struct CacheGetChunkRequest { CacheKey Key; Oid ValueId; uint64_t RawOffset; uint64_t RawSize; IoHash RawHash; CachePolicy Policy; UserData* Data; }; KeyData KeyDatas[NumKeys]; std::vector PutRequests; std::vector PutValueRequests; std::vector GetRequests; std::vector GetValueRequests; std::vector ChunkRequests; for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) { IoHash KeyHash; memcpy(&KeyHash.Hash[0], &KeyIndex, sizeof(uint32_t)); memcpy(&KeyHash.Hash[sizeof(uint32_t)], &TestVersion[0], 20 - sizeof(uint32_t)); KeyData& KeyData = KeyDatas[KeyIndex]; if (KeyIndex == 185) { std::string FailingKeyHash = KeyHash.ToHexString(); CHECK(KeyIndex < NumKeys); } KeyData.Key = CacheKey::Create(TestBucket, KeyHash); KeyData.KeyIndex = KeyIndex; KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; CachePolicy PutPolicy = SharedPolicy; CachePolicy GetPolicy = SharedPolicy; GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; CacheKey& Key = KeyData.Key; for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { KeyData.IntValues[ValueIndex] = static_cast(KeyIndex) | (static_cast(ValueIndex) << 32); KeyData.BufferValues[ValueIndex] = CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); KeyData.ReceivedChunk[ValueIndex] = false; } UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); } if (!KeyData.UseValueAPI) { CbObjectWriter Builder; Builder.BeginObject("key"sv); Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; Builder.EndObject(); Builder.BeginArray("Values"sv); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { Builder.BeginObject(); Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(KeyData.BufferValues[ValueIndex].GetRawHash())); Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].GetRawSize()); Builder.EndObject(); } Builder.EndArray(); CacheRecordPolicy PutRecordPolicy; CacheRecordPolicy GetRecordPolicy; if (!KeyData.UseValuePolicy) { PutRecordPolicy = CacheRecordPolicy(PutPolicy); GetRecordPolicy = CacheRecordPolicy(GetPolicy); } else { // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies // it will use the wrong value for SkipData and fail our tests. CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); } PutRecordPolicy = PutBuilder.Build(); GetRecordPolicy = GetBuilder.Build(); } if (!KeyData.ForceMiss) { PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); } GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); } } else { if (!KeyData.ForceMiss) { PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); } GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); } } // PutCacheRecords { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(TestNamespace)}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = BatchDefaultPolicy}; Request.Requests.reserve(PutRequests.size()); Policy.RecordPolicies.reserve(PutRequests.size()); for (CachePutRequest& PutRequest : PutRequests) { cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back(); std::optional& RecordPolicy = Policy.RecordPolicies.emplace_back(); RecordRequest.Key = PutRequest.Key; RecordPolicy = PutRequest.Policy; RecordRequest.Values.reserve(NumValues); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]}); } PutRequest.Data->Data->ReceivedPut = true; } CbPackage Package; CHECK(Request.Format(Package, Policy)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "PutCacheRecords unexpectedly failed."); } // PutCacheValues { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::PutCacheValuesRequest Request = {.DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; Request.Requests.reserve(PutValueRequests.size()); for (CachePutValueRequest& PutRequest : PutValueRequests) { Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy}); PutRequest.Data->Data->ReceivedPutValue = true; } CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "PutCacheValues unexpectedly failed."); } for (KeyData& KeyData : KeyDatas) { if (!KeyData.ForceMiss) { if (!KeyData.UseValueAPI) { CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str()); } else { CHECK_MESSAGE(KeyData.ReceivedPutValue, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str()); } } } // GetCacheRecords { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::GetCacheRecordsRequest Request = {.Namespace = std::string(TestNamespace)}; cacherequests::RecordsRequestPolicy RequestPolicy = {.DefaultPolicy = BatchDefaultPolicy}; Request.Requests.reserve(GetRequests.size()); RequestPolicy.RecordPolicies.reserve(GetRequests.size()); for (CacheGetRequest& GetRequest : GetRequests) { Request.Requests.push_back(GetRequest.Key); RequestPolicy.RecordPolicies.push_back(GetRequest.Policy); } CbPackage Package; CHECK(Request.Format(Package, RequestPolicy)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed."); CbPackage Response; bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); cacherequests::GetCacheRecordsResult RequestResult; CHECK(RequestResult.Parse(Response)); CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count."); for (int Index = 0; const std::optional& RecordResult : RequestResult.Results) { bool Succeeded = RecordResult.has_value(); CacheGetRequest& GetRequest = GetRequests[Index++]; KeyData* KeyData = GetRequest.Data->Data; KeyData->ReceivedGet = true; WriteToString<32> Name("Get(", KeyData->KeyIndex, ")"); if (KeyData->ShouldBeHit) { CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); } else if (KeyData->ForceMiss) { CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str()); } if (!KeyData->ForceMiss && Succeeded) { CHECK_MESSAGE(RecordResult->Values.size() == NumValues, WriteToString<32>(Name, " number of values did not match.").c_str()); for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values) { int ExpectedValueIndex = 0; for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) { if (ValueIds[ExpectedValueIndex] == Value.Id) { break; } } CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str()); WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")"); CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; CHECK_MESSAGE(Value.RawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()), WriteToString<32>(ValueName, " RawHash did not match.").c_str()); CHECK_MESSAGE(Value.RawSize == ExpectedValue.GetRawSize(), WriteToString<32>(ValueName, " RawSize did not match.").c_str()); if (KeyData->GetRequestsData) { SharedBuffer Buffer = Value.Body.Decompress(); CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize, WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str()); uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str()); } } } } } // GetCacheValues { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; GetCacheValuesRequest.Requests.reserve(GetValueRequests.size()); for (CacheGetValueRequest& GetRequest : GetValueRequests) { GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); } CbPackage Package; CHECK(GetCacheValuesRequest.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed."); CbPackage Response; bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); cacherequests::GetCacheValuesResult GetCacheValuesResult; CHECK(GetCacheValuesResult.Parse(Response)); for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results) { bool Succeeded = ValueResult.RawHash != IoHash::Zero; CacheGetValueRequest& Request = GetValueRequests[Index++]; KeyData* KeyData = Request.Data->Data; KeyData->ReceivedGetValue = true; WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv); if (KeyData->ShouldBeHit) { CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); } else if (KeyData->ForceMiss) { CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str()); } if (!KeyData->ForceMiss && Succeeded) { CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; CHECK_MESSAGE(ValueResult.RawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()), WriteToString<32>(Name, " RawHash did not match.").c_str()); CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.GetRawSize(), WriteToString<32>(Name, " RawSize did not match.").c_str()); if (KeyData->GetRequestsData) { SharedBuffer Buffer = ValueResult.Body.Decompress(); CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; uint64_t ExpectedIntValue = KeyData->IntValues[0]; CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); } } } } // GetCacheChunks { std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { return A.Key.Hash < B.Key.Hash; }); CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.Namespace = std::string(TestNamespace)}; cacherequests::ChunksRequestPolicy GetCacheChunksPolicy = {.DefaultPolicy = BatchDefaultPolicy}; GetCacheChunksRequest.Requests.reserve(ChunkRequests.size()); GetCacheChunksPolicy.ChunkPolicies.reserve(ChunkRequests.size()); for (CacheGetChunkRequest& ChunkRequest : ChunkRequests) { GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key, .ValueId = ChunkRequest.ValueId, .ChunkId = IoHash(), .RawOffset = ChunkRequest.RawOffset, .RawSize = ChunkRequest.RawSize}); GetCacheChunksPolicy.ChunkPolicies.push_back({ChunkRequest.Policy}); } CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package, GetCacheChunksPolicy)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed."); CbPackage Response; bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(), "GetCacheChunks response count did not match request count."); for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results) { bool Succeeded = ValueResult.RawHash != IoHash::Zero; CacheGetChunkRequest& Request = ChunkRequests[Index++]; KeyData* KeyData = Request.Data->Data; int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; KeyData->ReceivedChunk[ValueIndex] = true; auto PolicyToString = [](CachePolicy Policy) { ExtendableStringBuilder<64> Builder; Builder << Policy; return Builder.ToString(); }; WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ","sv, PolicyToString(Request.Policy), ")"sv); if (KeyData->ShouldBeHit) { CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str()); } else if (KeyData->ForceMiss) { CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str()); } if (KeyData->ShouldBeHit && Succeeded) { CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; CHECK_MESSAGE(ValueResult.RawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()), WriteToString<32>(Name, " had unexpected RawHash.").c_str()); CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.GetRawSize(), WriteToString<32>(Name, " had unexpected RawSize.").c_str()); if (KeyData->GetRequestsData) { SharedBuffer Buffer = ValueResult.Body.Decompress(); CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); } } } } for (KeyData& KeyData : KeyDatas) { if (!KeyData.UseValueAPI) { CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { CHECK_MESSAGE( KeyData.ReceivedChunk[ValueIndex], WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str()); } } else { CHECK_MESSAGE(KeyData.ReceivedGetValue, WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); CHECK_MESSAGE(KeyData.ReceivedChunk[0], WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); } } } # if ZEN_WITH_EXEC_SERVICES struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) : m_HostName(Host) , m_PortNumber(Port) , m_TreePath(TreePath) { } void Build(std::string_view Command, std::string_view Arguments) { zen::FileSystemTraversal Traversal; Traversal.TraverseFileSystem(m_TreePath, m_Visit); zen::CbObjectWriter PrepReq; PrepReq << "cmd" << Command; PrepReq << "args" << Arguments; PrepReq.BeginArray("files"); for (const auto& Kv : m_Visit.m_Files) { PrepReq.BeginObject(); PrepReq << "file" << zen::PathToUtf8(Kv.first) << "size" << Kv.second.Size << "hash" << Kv.second.Hash; PrepReq.EndObject(); } PrepReq.EndArray(); PrepReq.Save(m_MemOut); } void Prep() { cpr::Response Response = cpr::Post(cpr::Url(fmt::format("{}/prep", m_BaseUri)), cpr::Body((const char*)m_MemOut.Data(), m_MemOut.Size())); if (Response.status_code < 300) { zen::IoBuffer Payload(zen::IoBuffer::Clone, Response.text.data(), Response.text.size()); zen::CbObject Result = zen::LoadCompactBinaryObject(Payload); for (auto& Need : Result["need"]) { zen::IoHash NeedHash = Need.AsHash(); if (auto It = m_Visit.m_HashToFile.find(NeedHash); It != m_Visit.m_HashToFile.end()) { zen::IoBuffer FileData = zen::IoBufferBuilder::MakeFromFile(It->second); cpr::Response CidResponse = cpr::Post(cpr::Url(m_CidUri), cpr::Body((const char*)FileData.Data(), FileData.Size())); if (CidResponse.status_code >= 300) { ZEN_ERROR("CID put failed with {}", CidResponse.status_code); } } else { ZEN_ERROR("unknown hash in 'need' list: {}", NeedHash); } } } } zen::CbObject Exec() { cpr::Response JobResponse = cpr::Post(cpr::Url(m_BaseUri), cpr::Body((const char*)m_MemOut.Data(), m_MemOut.Size())); if (JobResponse.status_code < 300) { zen::IoBuffer Payload(zen::IoBuffer::Clone, JobResponse.text.data(), JobResponse.text.size()); return zen::LoadCompactBinaryObject(std::move(Payload)); } ZEN_INFO("job exec: {}", JobResponse.status_code); return {}; } private: struct Visitor : public zen::FileSystemTraversal::TreeVisitor { const std::filesystem::path& m_RootPath; Visitor(const std::filesystem::path& RootPath) : m_RootPath(RootPath) {} virtual void VisitFile(const std::filesystem::path& Parent, const path_view& FileName, uint64_t FileSize) override { std::filesystem::path FullPath = Parent / FileName; zen::IoHashStream Ios; zen::ScanFile(FullPath, 64 * 1024, [&](const void* Data, size_t Size) { Ios.Append(Data, Size); }); zen::IoHash Hash = Ios.GetHash(); auto RelativePath = FullPath.lexically_relative(m_RootPath).native(); // ZEN_INFO("File: {:32} => {} ({})", zen::PathToUtf8(RelativePath), Hash, FileSize); FileEntry& Entry = m_Files[RelativePath]; Entry.Hash = Hash; Entry.Size = FileSize; m_HashToFile[Hash] = FullPath; } virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName) override { std::filesystem::path FullPath = Parent / DirectoryName; if (DirectoryName.starts_with('.')) { return false; } return true; } struct FileEntry { uint64_t Size; zen::IoHash Hash; }; std::map m_Files; std::unordered_map m_HashToFile; }; std::string m_HostName; int m_PortNumber; std::filesystem::path m_TreePath; const std::string m_BaseUri = fmt::format("http://{}:{}/exec/jobs", m_HostName, m_PortNumber); const std::string m_CidUri = fmt::format("http://{}:{}/cid", m_HostName, m_PortNumber); Visitor m_Visit{m_TreePath}; zen::BinaryWriter m_MemOut; }; TEST_CASE(".exec.basic") { if (true) { return; } using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; ZenServerInstance Zen1(TestEnv); Zen1.SetTestDir(TestDir); Zen1.SpawnServer(PortNumber); Zen1.WaitUntilReady(); std::filesystem::path TreePath = TestEnv.GetTestRootDir("test/remote1"); { RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); RemoteRequest.Build("zentest-appstub.exe", ""); RemoteRequest.Prep(); zen::CbObject Result = RemoteRequest.Exec(); CHECK(Result["exitcode"sv].AsInt32(-1) == 0); } { RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); RemoteRequest.Build("zentest-appstub.exe", "-f=1"); RemoteRequest.Prep(); zen::CbObject Result = RemoteRequest.Exec(); CHECK(Result["exitcode"sv].AsInt32(-1) == 1); } } # endif // ZEN_WITH_EXEC_SERVICES TEST_CASE("mesh.basic") { // --mesh option only available with ZEN_ENABLE_MESH # if ZEN_ENABLE_MESH using namespace std::literals; const int kInstanceCount = 4; ZEN_INFO("spawning {} instances", kInstanceCount); std::unique_ptr Instances[kInstanceCount]; for (int i = 0; i < kInstanceCount; ++i) { auto& Instance = Instances[i]; Instance = std::make_unique(TestEnv); Instance->SetTestDir(TestEnv.CreateNewTestDir()); Instance->EnableMesh(); Instance->SpawnServer(13337 + i); } for (int i = 0; i < kInstanceCount; ++i) { auto& Instance = Instances[i]; Instance->WaitUntilReady(); } # endif } class ZenServerTestHelper { public: ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {} ~ZenServerTestHelper() {} void SpawnServers() { SpawnServers([](ZenServerInstance&) {}); } void SpawnServers(auto&& Callback) { ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount); m_Instances.resize(m_ServerCount); for (int i = 0; i < m_ServerCount; ++i) { auto& Instance = m_Instances[i]; Instance = std::make_unique(TestEnv); Instance->SetTestDir(TestEnv.CreateNewTestDir()); Callback(*Instance); Instance->SpawnServer(13337 + i); } for (int i = 0; i < m_ServerCount; ++i) { auto& Instance = m_Instances[i]; Instance->WaitUntilReady(); } } ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } private: std::string m_HelperId; int m_ServerCount = 0; std::vector > m_Instances; }; class IoDispatcher { public: IoDispatcher(asio::io_context& IoCtx) : m_IoCtx(IoCtx) {} ~IoDispatcher() { Stop(); } void Run() { Stop(); m_Running = true; m_IoThread = std::thread([this]() { try { m_IoCtx.run(); } catch (std::exception& Error) { m_Error = Error; } }); } void Stop() { if (m_Running) { m_Running = false; if (m_IoThread.joinable()) { m_IoThread.join(); } } } bool IsRunning() const { return m_Running; } const std::exception& Error() { return m_Error; } private: asio::io_context& m_IoCtx; std::thread m_IoThread; std::exception m_Error; std::atomic_bool m_Running{false}; }; TEST_CASE("http.basics") { using namespace std::literals; ZenServerTestHelper Servers{"http.basics"sv, 1}; Servers.SpawnServers(); ZenServerInstance& Instance = Servers.GetInstance(0); const std::string BaseUri = Instance.GetBaseUri(); { cpr::Response r = cpr::Get(cpr::Url{fmt::format("{}/testing/hello", BaseUri)}); CHECK(IsHttpSuccessCode(r.status_code)); } { cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/hello", BaseUri)}); CHECK_EQ(r.status_code, 404); } { cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/echo", BaseUri)}, cpr::Body{"yoyoyoyo"}); CHECK_EQ(r.status_code, 200); CHECK_EQ(r.text, "yoyoyoyo"); } } TEST_CASE("http.package") { using namespace std::literals; ZenServerTestHelper Servers{"http.package"sv, 1}; Servers.SpawnServers(); ZenServerInstance& Instance = Servers.GetInstance(0); const std::string BaseUri = Instance.GetBaseUri(); static const uint8_t Data1[] = {0, 1, 2, 3}; static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8}; zen::CbAttachment Attach1{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}), zen::OodleCompressor::NotSet, zen::OodleCompressionLevel::None)}; zen::CbAttachment Attach2{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}), zen::OodleCompressor::NotSet, zen::OodleCompressionLevel::None)}; zen::CbObjectWriter Writer; Writer.AddAttachment("attach1", Attach1); Writer.AddAttachment("attach2", Attach2); zen::CbObject CoreObject = Writer.Save(); zen::CbPackage TestPackage; TestPackage.SetObject(CoreObject); TestPackage.AddAttachment(Attach1); TestPackage.AddAttachment(Attach2); zen::HttpClient TestClient(BaseUri); zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage); zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); CHECK_EQ(ResponsePackage, TestPackage); } TEST_CASE("websocket.basic") { if (true) { return; } std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const auto MaxWaitTime = std::chrono::seconds(5); ZenServerInstance Inst(TestEnv); Inst.SetTestDir(TestDir); Inst.SpawnServer(PortNumber, "--websocket-port=8848"sv); Inst.WaitUntilReady(); asio::io_context IoCtx; IoDispatcher IoDispatcher(IoCtx); auto WebSocket = WebSocketClient::Create(IoCtx); auto ConnectFuture = WebSocket->Connect({.Host = "127.0.0.1", .Port = 8848, .Endpoint = "/zen"}); IoDispatcher.Run(); ConnectFuture.wait_for(MaxWaitTime); CHECK(ConnectFuture.get()); for (size_t Idx = 0; Idx < 10; Idx++) { CbObjectWriter Request; Request << "Method"sv << "SayHello"sv; WebSocketMessage RequestMsg; RequestMsg.SetMessageType(WebSocketMessageType::kRequest); RequestMsg.SetBody(Request.Save()); auto ResponseFuture = WebSocket->SendRequest(std::move(RequestMsg)); ResponseFuture.wait_for(MaxWaitTime); CbObject Response = ResponseFuture.get().Body().GetObject(); std::string_view Message = Response["Result"].AsString(); CHECK(Message == "Hello Friend!!"sv); } WebSocket->Disconnect(); IoCtx.stop(); IoDispatcher.Stop(); } # if 0 TEST_CASE("lifetime.owner") { // This test is designed to verify that the hand-over of sponsor processes is handled // correctly for the case when a second or third process is launched on the same port // // Due to the nature of it, it cannot be const uint16_t PortNumber = 23456; ZenServerInstance Zen1(TestEnv); std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); Zen1.SetTestDir(TestDir1); Zen1.SpawnServer(PortNumber); Zen1.WaitUntilReady(); Zen1.Detach(); ZenServerInstance Zen2(TestEnv); std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); Zen2.SetTestDir(TestDir2); Zen2.SpawnServer(PortNumber); Zen2.WaitUntilReady(); Zen2.Detach(); } TEST_CASE("lifetime.owner.2") { // This test is designed to verify that the hand-over of sponsor processes is handled // correctly for the case when a second or third process is launched on the same port // // Due to the nature of it, it cannot be const uint16_t PortNumber = 13456; std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); ZenServerInstance Zen1(TestEnv); Zen1.SetTestDir(TestDir1); Zen1.SpawnServer(PortNumber); Zen1.WaitUntilReady(); ZenServerInstance Zen2(TestEnv); Zen2.SetTestDir(TestDir2); Zen2.SetOwnerPid(Zen1.GetPid()); Zen2.SpawnServer(PortNumber + 1); Zen2.Detach(); ZenServerInstance Zen3(TestEnv); Zen3.SetTestDir(TestDir2); Zen3.SetOwnerPid(Zen1.GetPid()); Zen3.SpawnServer(PortNumber + 1); Zen3.Detach(); ZenServerInstance Zen4(TestEnv); Zen4.SetTestDir(TestDir2); Zen4.SetOwnerPid(Zen1.GetPid()); Zen4.SpawnServer(PortNumber + 1); Zen4.Detach(); } # endif } // namespace zen::tests #else int main() { } #endif