diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver-test/zenserver-test.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 3323 |
1 files changed, 3323 insertions, 0 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp new file mode 100644 index 000000000..3195181d1 --- /dev/null +++ b/src/zenserver-test/zenserver-test.cpp @@ -0,0 +1,3323 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/refcount.h> +#include <zencore/stream.h> +#include <zencore/string.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/xxhash.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/httpshared.h> +#include <zenhttp/websocket.h> +#include <zenhttp/zenhttp.h> +#include <zenutil/cache/cache.h> +#include <zenutil/cache/cacherequests.h> +#include <zenutil/zenserverprocess.h> + +#if ZEN_USE_MIMALLOC +ZEN_THIRD_PARTY_INCLUDES_START +# include <mimalloc.h> +ZEN_THIRD_PARTY_INCLUDES_END +#endif + +#include <http_parser.h> + +#if ZEN_PLATFORM_WINDOWS +# pragma comment(lib, "Crypt32.lib") +# pragma comment(lib, "Wldap32.lib") +#endif + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#undef GetObject +ZEN_THIRD_PARTY_INCLUDES_END + +#include <atomic> +#include <filesystem> +#include <map> +#include <random> +#include <span> +#include <thread> +#include <typeindex> +#include <unordered_map> + +#if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# include <atlbase.h> +# include <process.h> +#endif + +#include <asio.hpp> + +////////////////////////////////////////////////////////////////////////// + +#include "projectclient.h" + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include <zencore/testing.h> +# include <zencore/workthreadpool.h> +#endif + +using namespace std::literals; + +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +struct Concurrency +{ + template<typename... T> + static void parallel_invoke(T&&... t) + { + constexpr size_t NumTs = sizeof...(t); + std::thread Threads[NumTs] = { + std::thread(std::forward<T>(t))..., + }; + + for (std::thread& Thread : Threads) + { + Thread.join(); + } + } +}; +#endif + +////////////////////////////////////////////////////////////////////////// +// +// Custom logging -- test code, this should be tweaked +// + +namespace logging { +using namespace spdlog; +using namespace spdlog::details; +using namespace std::literals; + +class full_test_formatter final : public spdlog::formatter +{ +public: + full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) + { + } + + virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); } + + static constexpr bool UseDate = false; + + virtual void format(const details::log_msg& msg, memory_buf_t& dest) override + { + using std::chrono::duration_cast; + using std::chrono::milliseconds; + using std::chrono::seconds; + + if constexpr (UseDate) + { + auto secs = std::chrono::duration_cast<seconds>(msg.time.time_since_epoch()); + if (secs != m_LastLogSecs) + { + m_CachedTm = os::localtime(log_clock::to_time_t(msg.time)); + m_LastLogSecs = secs; + } + } + + const auto& tm_time = m_CachedTm; + + // cache the date/time part for the next second. + auto duration = msg.time - m_Epoch; + auto secs = duration_cast<seconds>(duration); + + if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0) + { + m_CachedDatetime.clear(); + m_CachedDatetime.push_back('['); + + if constexpr (UseDate) + { + fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime); + m_CachedDatetime.push_back('-'); + + fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime); + m_CachedDatetime.push_back('-'); + + fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime); + m_CachedDatetime.push_back(' '); + + fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + + fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + + fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime); + } + else + { + int Count = int(secs.count()); + + const int LogSecs = Count % 60; + Count /= 60; + + const int LogMins = Count % 60; + Count /= 60; + + const int LogHours = Count; + + fmt_helper::pad2(LogHours, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + fmt_helper::pad2(LogMins, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + fmt_helper::pad2(LogSecs, m_CachedDatetime); + } + + m_CachedDatetime.push_back('.'); + + m_CacheTimestamp = secs; + } + + dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end()); + + auto millis = fmt_helper::time_fraction<milliseconds>(msg.time); + fmt_helper::pad3(static_cast<uint32_t>(millis.count()), dest); + dest.push_back(']'); + dest.push_back(' '); + + if (!m_LogId.empty()) + { + dest.push_back('['); + fmt_helper::append_string_view(m_LogId, dest); + dest.push_back(']'); + dest.push_back(' '); + } + + // append logger name if exists + if (msg.logger_name.size() > 0) + { + dest.push_back('['); + fmt_helper::append_string_view(msg.logger_name, dest); + dest.push_back(']'); + dest.push_back(' '); + } + + dest.push_back('['); + // wrap the level name with color + msg.color_range_start = dest.size(); + fmt_helper::append_string_view(level::to_string_view(msg.level), dest); + msg.color_range_end = dest.size(); + dest.push_back(']'); + dest.push_back(' '); + + // add source location if present + if (!msg.source.empty()) + { + dest.push_back('['); + const char* filename = details::short_filename_formatter<details::null_scoped_padder>::basename(msg.source.filename); + fmt_helper::append_string_view(filename, dest); + dest.push_back(':'); + fmt_helper::append_int(msg.source.line, dest); + dest.push_back(']'); + dest.push_back(' '); + } + + fmt_helper::append_string_view(msg.payload, dest); + fmt_helper::append_string_view("\n"sv, dest); + } + +private: + std::chrono::time_point<std::chrono::system_clock> m_Epoch; + std::tm m_CachedTm; + std::chrono::seconds m_LastLogSecs; + std::chrono::seconds m_CacheTimestamp{0}; + memory_buf_t m_CachedDatetime; + std::string m_LogId; +}; +} // namespace logging + +////////////////////////////////////////////////////////////////////////// + +#if 0 + +int +main() +{ + mi_version(); + + zen::Sleep(1000); + + zen::Stopwatch timer; + + const int RequestCount = 100000; + + cpr::Session Sessions[10]; + + for (auto& Session : Sessions) + { + Session.SetUrl(cpr::Url{"http://localhost:1337/test/hello"}); + //Session.SetUrl(cpr::Url{ "http://arn-wd-l0182:1337/test/hello" }); + } + + auto Run = [](cpr::Session& Session) { + for (int i = 0; i < 10000; ++i) + { + cpr::Response Result = Session.Get(); + + if (Result.status_code != 200) + { + ZEN_WARN("request response: {}", Result.status_code); + } + } + }; + + Concurrency::parallel_invoke([&] { Run(Sessions[0]); }, + [&] { Run(Sessions[1]); }, + [&] { Run(Sessions[2]); }, + [&] { Run(Sessions[3]); }, + [&] { Run(Sessions[4]); }, + [&] { Run(Sessions[5]); }, + [&] { Run(Sessions[6]); }, + [&] { Run(Sessions[7]); }, + [&] { Run(Sessions[8]); }, + [&] { Run(Sessions[9]); }); + + // cpr::Response r = cpr::Get(cpr::Url{ "http://localhost:1337/test/hello" }); + + ZEN_INFO("{} requests in {} ({})", + RequestCount, + zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()), + zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req")); + + return 0; +} +#elif 0 +// #include <restinio/all.hpp> + +int +main() +{ + mi_version(); + restinio::run(restinio::on_thread_pool(32).port(8080).request_handler( + [](auto req) { return req->create_response().set_body("Hello, World!").done(); })); + return 0; +} +#elif ZEN_WITH_TESTS + +zen::ZenServerEnvironment TestEnv; + +int +main(int argc, char** argv) +{ + using namespace std::literals; + +# if ZEN_USE_MIMALLOC + mi_version(); +# endif + + zen::zencore_forcelinktests(); + zen::zenhttp_forcelinktests(); + zen::cacherequests_forcelink(); + + zen::logging::InitializeLogging(); + + spdlog::set_level(spdlog::level::debug); + spdlog::set_formatter(std::make_unique< ::logging::full_test_formatter>("test", std::chrono::system_clock::now())); + + std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); + std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; + + // This is pretty janky because we're passing most of the options through to the test + // framework, so we can't just use cxxopts (I think). This should ideally be cleaned up + // somehow in the future + + std::string ServerClass; + + for (int i = 1; i < argc; ++i) + { + if (argv[i] == "--http"sv) + { + if ((i + 1) < argc) + { + ServerClass = argv[++i]; + } + } + } + + TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); + + ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); + + zen::testing::TestRunner Runner; + Runner.ApplyCommandLine(argc, argv); + + return Runner.Run(); +} + +namespace zen::tests { + +TEST_CASE("default.single") +{ + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance(TestEnv); + Instance.SetTestDir(TestDir); + Instance.SpawnServer(13337); + + ZEN_INFO("Waiting..."); + + Instance.WaitUntilReady(); + + std::atomic<uint64_t> RequestCount{0}; + std::atomic<uint64_t> BatchCounter{0}; + + ZEN_INFO("Running single server test..."); + + auto IssueTestRequests = [&] { + const uint64_t BatchNo = BatchCounter.fetch_add(1); + const int ThreadId = zen::GetCurrentThreadId(); + + ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId); + cpr::Session cli; + cli.SetUrl(cpr::Url{"http://localhost:13337/test/hello"}); + + for (int i = 0; i < 10000; ++i) + { + auto res = cli.Get(); + ++RequestCount; + } + ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); + }; + + zen::Stopwatch timer; + + Concurrency::parallel_invoke(IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests, + IssueTestRequests); + + uint64_t Elapsed = timer.GetElapsedTimeMs(); + + ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); +} + +TEST_CASE("multi.basic") +{ + ZenServerInstance Instance1(TestEnv); + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + Instance1.SetTestDir(TestDir1); + Instance1.SpawnServer(13337); + + ZenServerInstance Instance2(TestEnv); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + Instance2.SetTestDir(TestDir2); + Instance2.SpawnServer(13338); + + ZEN_INFO("Waiting..."); + + Instance1.WaitUntilReady(); + Instance2.WaitUntilReady(); + + std::atomic<uint64_t> RequestCount{0}; + std::atomic<uint64_t> BatchCounter{0}; + + auto IssueTestRequests = [&](int PortNumber) { + const uint64_t BatchNo = BatchCounter.fetch_add(1); + const int ThreadId = zen::GetCurrentThreadId(); + + ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber); + + cpr::Session cli; + cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)}); + + for (int i = 0; i < 10000; ++i) + { + auto res = cli.Get(); + ++RequestCount; + } + ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); + }; + + zen::Stopwatch timer; + + ZEN_INFO("Running multi-server test..."); + + Concurrency::parallel_invoke([&] { IssueTestRequests(13337); }, + [&] { IssueTestRequests(13338); }, + [&] { IssueTestRequests(13337); }, + [&] { IssueTestRequests(13338); }); + + uint64_t Elapsed = timer.GetElapsedTimeMs(); + + ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); +} + +TEST_CASE("project.basic") +{ + using namespace std::literals; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + const uint16_t PortNumber = 13337; + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + std::atomic<uint64_t> RequestCount{0}; + + zen::Stopwatch timer; + + std::mt19937_64 mt; + + zen::StringBuilder<64> BaseUri; + BaseUri << fmt::format("http://localhost:{}/prj/test", PortNumber); + + std::filesystem::path BinPath = zen::GetRunningExecutablePath(); + std::filesystem::path RootPath = BinPath.parent_path().parent_path(); + BinPath = BinPath.lexically_relative(RootPath); + + SUBCASE("build store init") + { + { + { + zen::CbObjectWriter Body; + Body << "id" + << "test"; + Body << "root" << RootPath.c_str(); + Body << "project" + << "/zooom"; + Body << "engine" + << "/zooom"; + + zen::BinaryWriter MemOut; + Body.Save(MemOut); + + auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); + CHECK(Response.status_code == 201); + } + + { + auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); + CHECK(Response.status_code == 200); + + zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); + + CHECK(ResponseObject["id"].AsString() == "test"sv); + CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); + } + } + + BaseUri << "/oplog/foobar"; + + { + { + zen::StringBuilder<64> PostUri; + PostUri << BaseUri; + auto Response = cpr::Post(cpr::Url{PostUri.c_str()}); + CHECK(Response.status_code == 201); + } + + { + auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); + CHECK(Response.status_code == 200); + + zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); + + CHECK(ResponseObject["id"].AsString() == "foobar"sv); + CHECK(ResponseObject["project"].AsString() == "test"sv); + } + } + + SUBCASE("build store persistence") + { + uint8_t AttachData[] = {1, 2, 3}; + + zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); + zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; + + zen::CbObjectWriter OpWriter; + OpWriter << "key" + << "foo" + << "attachment" << Attach; + + const std::string_view ChunkId{ + "00000000" + "00000000" + "00010000"}; + auto FileOid = zen::Oid::FromHexString(ChunkId); + + OpWriter.BeginArray("files"); + OpWriter.BeginObject(); + OpWriter << "id" << FileOid; + OpWriter << "clientpath" + << "/{engine}/client/side/path"; + OpWriter << "serverpath" << BinPath.c_str(); + OpWriter.EndObject(); + OpWriter.EndArray(); + + zen::CbObject Op = OpWriter.Save(); + + zen::CbPackage OpPackage(Op); + OpPackage.AddAttachment(Attach); + + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); + + { + zen::StringBuilder<64> PostUri; + PostUri << BaseUri << "/new"; + auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); + + REQUIRE(!Response.error); + CHECK(Response.status_code == 201); + } + + // Read file data + + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << BaseUri << "/" << ChunkId; + auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); + + REQUIRE(!Response.error); + CHECK(Response.status_code == 200); + } + + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10"; + auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); + + REQUIRE(!Response.error); + CHECK(Response.status_code == 200); + CHECK(Response.text.size() == 10); + } + + ZEN_INFO("+++++++"); + } + SUBCASE("build store op commit") { ZEN_INFO("-------"); } + SUBCASE("test chunk not found error") + { + for (size_t I = 0; I < 65; I++) + { + zen::StringBuilder<128> PostUri; + PostUri << BaseUri << "/f77c781846caead318084604/info"; + auto Response = cpr::Get(cpr::Url{PostUri.c_str()}); + + REQUIRE(!Response.error); + CHECK(Response.status_code == 404); + } + } + } + + const uint64_t Elapsed = timer.GetElapsedTimeMs(); + + ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); +} + +# if 0 // this is extremely WIP +TEST_CASE("project.pipe") +{ + using namespace std::literals; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + const uint16_t PortNumber = 13337; + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + zen::LocalProjectClient LocalClient(PortNumber); + + zen::CbObjectWriter Cbow; + Cbow << "hey" << 42; + + zen::CbObject Response = LocalClient.MessageTransaction(Cbow.Save()); +} +# endif + +namespace utils { + + struct ZenConfig + { + std::filesystem::path DataDir; + uint16_t Port; + std::string BaseUri; + std::string Args; + + static ZenConfig New(uint16_t Port = 13337, std::string Args = "") + { + return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), + .Port = Port, + .BaseUri = fmt::format("http://localhost:{}/z$", Port), + .Args = std::move(Args)}; + } + + static ZenConfig NewWithUpstream(uint16_t UpstreamPort) + { + return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); + } + + static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug) + { + std::string Args = Debug ? "--debug" : ""; + for (uint16_t Port : UpstreamPorts) + { + Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port); + } + return New(13337, Args); + } + + void Spawn(ZenServerInstance& Inst) + { + Inst.SetTestDir(DataDir); + Inst.SpawnServer(Port, Args); + Inst.WaitUntilReady(); + } + }; + + void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg) + { + Server.SetTestDir(Cfg.DataDir); + Server.SpawnServer(Cfg.Port, Cfg.Args); + Server.WaitUntilReady(); + } + +} // namespace utils + +TEST_CASE("zcache.basic") +{ + using namespace std::literals; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + const uint16_t PortNumber = 13337; + + const int kIterationCount = 100; + const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; + + { + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + // Populate with some simple data + + for (int i = 0; i < kIterationCount; ++i) + { + zen::CbObjectWriter Cbo; + Cbo << "index" << i; + + zen::BinaryWriter MemOut; + Cbo.Save(MemOut); + + zen::IoHash Key = HashKey(i); + + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, + cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + + CHECK(Result.status_code == 201); + } + + // Retrieve data + + for (int i = 0; i < kIterationCount; ++i) + { + zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i); + + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 200); + } + + // Ensure bad bucket identifiers are rejected + + { + zen::CbObjectWriter Cbo; + Cbo << "index" << 42; + + zen::BinaryWriter MemOut; + Cbo.Save(MemOut); + + zen::IoHash Key = HashKey(442); + + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "te!st", Key)}, + cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + + CHECK(Result.status_code == 400); + } + } + + // Verify that the data persists between process runs (the previous server has exited at this point) + + { + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + // Retrieve data again + + for (int i = 0; i < kIterationCount; ++i) + { + zen::IoHash Key = HashKey(i); + + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 200); + } + } +} + +TEST_CASE("zcache.cbpackage") +{ + using namespace std::literals; + + auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + + OutAttachmentKey = CompressedData.DecodeRawHash(); + + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); + + return Package; + }; + + auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::BinaryWriter MemStream; + + Package.Save(MemStream); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { + std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); + std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); + + if (LhsAttachments.size() != RhsAttachments.size()) + { + return false; + } + + for (const zen::CbAttachment& LhsAttachment : LhsAttachments) + { + const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); + CHECK(RhsAttachment); + + zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); + CHECK(!LhsBuffer.IsNull()); + + zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); + CHECK(!RhsBuffer.IsNull()); + + if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) + { + return false; + } + } + + return true; + }; + + SUBCASE("PUT/GET returns correct package") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // PUT + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + // GET + { + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Response); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("PUT propagates upstream") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + RemoteInstance.WaitUntilReady(); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); + LocalInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in the local instance + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local instance + { + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + + // The cache record can be retrieved as a package from the remote instance + { + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("GET finds upstream when missing in local") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + RemoteInstance.WaitUntilReady(); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); + LocalInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in upstream cache + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local cache + { + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } +} + +TEST_CASE("zcache.policy") +{ + using namespace std::literals; + using namespace utils; + + auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { + auto Buf = zen::UniqueBuffer::Alloc(Size); + uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData()); + for (uint64_t Idx = 0; Idx < Size; Idx++) + { + Data[Idx] = Idx % 256; + } + OutHash = zen::IoHash::HashBuffer(Data, Size); + return Buf; + }; + + auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + OutAttachmentKey = CompressedData.DecodeRawHash(); + + zen::CbWriter Writer; + Writer.BeginObject("obj"sv); + Writer.AddBinaryAttachment("data", OutAttachmentKey); + Writer.EndObject(); + CbObject CacheRecord = Writer.Save().AsObject(); + + OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); + + zen::CbPackage Package; + Package.SetObject(CacheRecord); + Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); + + return Package; + }; + + auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::BinaryWriter MemStream; + Package.Save(MemStream); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + SUBCASE("query - 'local' does not query upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value upstream + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local' does not store upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value locally + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value locally and upstream + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("query - 'local' does not query upstream (cppackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + // Store package upstream + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local' does not store upstream (cbpackge)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + // Store packge locally + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + // Store package locally and upstream + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("skip - 'data' returns cache record without attachments/empty payload") + { + ZenConfig Cfg = ZenConfig::New(); + ZenServerInstance Instance(TestEnv); + const auto Bucket = "test"sv; + + Cfg.Spawn(Instance); + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + // Store package + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + // Get package + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(IsHttpSuccessCode(Result.status_code)); + IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); + CbPackage ResponsePackage; + CHECK(ResponsePackage.TryLoad(Buffer)); + CHECK(ResponsePackage.GetAttachments().size() == 0); + } + + // Get record + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cb"}}); + CHECK(IsHttpSuccessCode(Result.status_code)); + IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); + CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer); + CHECK((bool)ResponseObject); + } + + // Get payload + { + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)}, + cpr::Header{{"Accept", "application/x-ue-comp"}}); + CHECK(IsHttpSuccessCode(Result.status_code)); + CHECK(Result.text.size() == 0); + } + } + + SUBCASE("skip - 'data' returns empty binary value") + { + ZenConfig Cfg = ZenConfig::New(); + ZenServerInstance Instance(TestEnv); + const auto Bucket = "test"sv; + + Cfg.Spawn(Instance); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value + { + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + // Get package + { + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(IsHttpSuccessCode(Result.status_code)); + CHECK(Result.text.size() == 0); + } + } +} + +TEST_CASE("zcache.rpc") +{ + using namespace std::literals; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint8_t> Data; + Data.resize(PayloadSize); + uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]); + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx) + { + DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu); + } + if (PayloadSize & 1) + { + Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff); + } + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t PayloadSize = 1024, + size_t KeyOffset = 1) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + for (uint32_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key); + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + OutKeys.push_back(CacheKey); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy, + zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone, + int Pid = 0) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .AcceptOptions = static_cast<uint16_t>(AcceptOptions), + .ProcessPid = Pid, + .DefaultPolicy = Policy, + .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + GetCacheRecordResult OutResult; + + if (Result.status_code == 200) + { + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + if (!Response.IsNull()) + { + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + } + + return OutResult; + }; + + SUBCASE("get cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + CachePolicy Policy = CachePolicy::Default; + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record); + CHECK(Record->Key == ExpectedKey); + CHECK(Record->Values.size() == 1); + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.Body); + } + } + } + + SUBCASE("get missing cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + CachePolicy Policy = CachePolicy::Default; + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys; + + for (const zen::CacheKey& Key : ExistingKeys) + { + Keys.push_back(Key); + Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); + } + + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + size_t KeyIndex = 0; + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const bool Missing = Index++ % 2 != 0; + + if (Missing) + { + CHECK(!Record); + } + else + { + const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.Body); + } + } + } + } + + SUBCASE("policy - 'QueryLocal' does not query upstream") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); + + CachePolicy Policy = CachePolicy::QueryLocal; + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(!Record); + } + } + + SUBCASE("policy - 'QueryRemote' does query upstream") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); + + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + } + } + + SUBCASE("RpcAcceptOptions") + { + using namespace utils; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); + std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size()); + + std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end()); + Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); + + { + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(!IsFileRef); + } + } + } + + // File path, but only for large files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef == (Body.Size() > 1024)); + } + } + } + + // File path, for all files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef); + } + } + } + + // File handle, but only for large files + { + GetCacheRecordResult Result = GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences, + GetCurrentProcessId()); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef == (Body.Size() > 1024)); + } + } + } + + // File handle, for all files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences, + GetCurrentProcessId()); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef); + } + } + } + } +} + +TEST_CASE("zcache.failing.upstream") +{ + // This is an exploratory test that takes a long time to run, so lets skip it by default + if (true) + { + return; + } + + using namespace std::literals; + using namespace utils; + + const uint16_t Upstream1PortNumber = 13338; + ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber); + ZenServerInstance Upstream1Server(TestEnv); + + const uint16_t Upstream2PortNumber = 13339; + ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber); + ZenServerInstance Upstream2Server(TestEnv); + + std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber}; + ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false); + LocalCfg.Args += (" --upstream-thread-count 2"); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber); + const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber); + + SpawnServer(Upstream1Server, Upstream1Cfg); + SpawnServer(Upstream2Server, Upstream2Cfg); + SpawnServer(LocalServer, LocalCfg); + bool Upstream1Running = true; + bool Upstream2Running = true; + + using namespace std::literals; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint32_t> Data; + Data.resize(PayloadSize / 4); + for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) + { + Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; + } + + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t KeyOffset, + size_t PayloadSize = 8192) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + OutKeys.push_back(CacheKey); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + if (Result.status_code != 200) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); + OutKeys.clear(); + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success = false; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = Policy, + .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + GetCacheRecordResult OutResult; + + if (Result.status_code == 200) + { + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + if (!Response.IsNull()) + { + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + } + else + { + ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code); + } + + return OutResult; + }; + + // Populate with some simple data + + CachePolicy Policy = CachePolicy::Default; + + const size_t ThreadCount = 128; + const size_t KeyMultiplier = 16384; + const size_t RecordsPerRequest = 64; + WorkerThreadPool Pool(ThreadCount); + + std::atomic_size_t Completed = 0; + + auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier]; + RwLock KeysLock; + + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + Pool.ScheduleWork([&] { + std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); + if (NewKeys.size() != RecordsPerRequest) + { + ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + { + RwLock::ExclusiveLockScope _(KeysLock); + Keys[Iteration].swap(NewKeys); + } + Completed.fetch_add(1); + }); + } + bool UseUpstream1 = false; + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(8000); + + if (UseUpstream1) + { + if (Upstream2Running) + { + Upstream2Server.EnableTermination(); + Upstream2Server.Shutdown(); + Sleep(100); + Upstream2Running = false; + } + if (!Upstream1Running) + { + SpawnServer(Upstream1Server, Upstream1Cfg); + Upstream1Running = true; + } + UseUpstream1 = !UseUpstream1; + } + else + { + if (Upstream1Running) + { + Upstream1Server.EnableTermination(); + Upstream1Server.Shutdown(); + Sleep(100); + Upstream1Running = false; + } + if (!Upstream2Running) + { + SpawnServer(Upstream2Server, Upstream2Cfg); + Upstream2Running = true; + } + UseUpstream1 = !UseUpstream1; + } + } + + Completed = 0; + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + std::vector<CacheKey>& LocalKeys = Keys[Iteration]; + if (LocalKeys.empty()) + { + Completed.fetch_add(1); + continue; + } + Pool.ScheduleWork([&] { + GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); + + if (!Result.Success) + { + ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + + if (Result.Result.Results.size() != LocalKeys.size()) + { + ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); + Completed.fetch_add(1); + return; + } + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const CacheKey& ExpectedKey = LocalKeys[Index++]; + if (!Record) + { + continue; + } + if (Record->Key != ExpectedKey) + { + continue; + } + if (Record->Values.size() != 1) + { + continue; + } + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + if (!Value.Body) + { + continue; + } + } + } + Completed.fetch_add(1); + }); + } + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(10); + } +} + +TEST_CASE("zcache.rpc.allpolicies") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; + std::string_view TestBucket = "allpoliciestest"sv; + std::string_view TestNamespace = "ue4.ddc"sv; + + // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) + // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) + constexpr int NumKeys = 256; + constexpr int NumValues = 4; + Oid ValueIds[NumValues]; + IoHash Hash; + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + ExtendableStringBuilder<16> ValueName; + ValueName << "ValueId_"sv << ValueIndex; + static_assert(sizeof(IoHash) >= sizeof(Oid)); + ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); + } + + struct KeyData; + struct UserData + { + UserData& Set(KeyData* InKeyData, int InValueIndex) + { + Data = InKeyData; + ValueIndex = InValueIndex; + return *this; + } + KeyData* Data = nullptr; + int ValueIndex = 0; + }; + struct KeyData + { + CompressedBuffer BufferValues[NumValues]; + uint64_t IntValues[NumValues]; + UserData ValueUserData[NumValues]; + bool ReceivedChunk[NumValues]; + CacheKey Key; + UserData KeyUserData; + uint32_t KeyIndex = 0; + bool GetRequestsData = true; + bool UseValueAPI = false; + bool UseValuePolicy = false; + bool ForceMiss = false; + bool UseLocal = true; + bool UseRemote = true; + bool ShouldBeHit = true; + bool ReceivedPut = false; + bool ReceivedGet = false; + bool ReceivedPutValue = false; + bool ReceivedGetValue = false; + }; + struct CachePutRequest + { + CacheKey Key; + CbObject Record; + CacheRecordPolicy Policy; + KeyData* Values; + UserData* Data; + }; + struct CachePutValueRequest + { + CacheKey Key; + CompressedBuffer Value; + CachePolicy Policy; + UserData* Data; + }; + struct CacheGetRequest + { + CacheKey Key; + CacheRecordPolicy Policy; + UserData* Data; + }; + struct CacheGetValueRequest + { + CacheKey Key; + CachePolicy Policy; + UserData* Data; + }; + struct CacheGetChunkRequest + { + CacheKey Key; + Oid ValueId; + uint64_t RawOffset; + uint64_t RawSize; + IoHash RawHash; + CachePolicy Policy; + UserData* Data; + }; + + KeyData KeyDatas[NumKeys]; + std::vector<CachePutRequest> PutRequests; + std::vector<CachePutValueRequest> PutValueRequests; + std::vector<CacheGetRequest> GetRequests; + std::vector<CacheGetValueRequest> GetValueRequests; + std::vector<CacheGetChunkRequest> ChunkRequests; + + for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) + { + IoHashStream KeyWriter; + KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); + KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); + IoHash KeyHash = KeyWriter.GetHash(); + KeyData& KeyData = KeyDatas[KeyIndex]; + + KeyData.Key = CacheKey::Create(TestBucket, KeyHash); + KeyData.KeyIndex = KeyIndex; + KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; + KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; + KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; + KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; + KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; + KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; + KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); + CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; + SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; + CachePolicy PutPolicy = SharedPolicy; + CachePolicy GetPolicy = SharedPolicy; + GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; + CacheKey& Key = KeyData.Key; + + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32); + KeyData.BufferValues[ValueIndex] = + CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); + KeyData.ReceivedChunk[ValueIndex] = false; + } + + UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); + } + if (!KeyData.UseValueAPI) + { + CbObjectWriter Builder; + Builder.BeginObject("key"sv); + Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; + Builder.EndObject(); + Builder.BeginArray("Values"sv); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + Builder.BeginObject(); + Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); + Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash()); + Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize()); + Builder.EndObject(); + } + Builder.EndArray(); + + CacheRecordPolicy PutRecordPolicy; + CacheRecordPolicy GetRecordPolicy; + if (!KeyData.UseValuePolicy) + { + PutRecordPolicy = CacheRecordPolicy(PutPolicy); + GetRecordPolicy = CacheRecordPolicy(GetPolicy); + } + else + { + // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies + // it will use the wrong value for SkipData and fail our tests. + CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); + CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); + GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); + } + PutRecordPolicy = PutBuilder.Build(); + GetRecordPolicy = GetBuilder.Build(); + } + if (!KeyData.ForceMiss) + { + PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); + } + GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; + ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); + } + } + else + { + if (!KeyData.ForceMiss) + { + PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); + } + GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); + ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); + } + } + + // PutCacheRecords + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + Request.Requests.reserve(PutRequests.size()); + for (CachePutRequest& PutRequest : PutRequests) + { + cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back(); + RecordRequest.Key = PutRequest.Key; + RecordRequest.Policy = PutRequest.Policy; + RecordRequest.Values.reserve(NumValues); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]}); + } + PutRequest.Data->Data->ReceivedPut = true; + } + + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK_MESSAGE(Result.status_code == 200, "PutCacheRecords unexpectedly failed."); + } + + // PutCacheValues + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + + cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + Request.Requests.reserve(PutValueRequests.size()); + for (CachePutValueRequest& PutRequest : PutValueRequests) + { + Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy}); + PutRequest.Data->Data->ReceivedPutValue = true; + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK_MESSAGE(Result.status_code == 200, "PutCacheValues unexpectedly failed."); + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.ForceMiss) + { + if (!KeyData.UseValueAPI) + { + CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str()); + } + else + { + CHECK_MESSAGE(KeyData.ReceivedPutValue, + WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str()); + } + } + } + + // GetCacheRecords + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + Request.Requests.reserve(GetRequests.size()); + for (CacheGetRequest& GetRequest : GetRequests) + { + Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed."); + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); + cacherequests::GetCacheRecordsResult RequestResult; + CHECK(RequestResult.Parse(Response)); + CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count."); + for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results) + { + bool Succeeded = RecordResult.has_value(); + CacheGetRequest& GetRequest = GetRequests[Index++]; + KeyData* KeyData = GetRequest.Data->Data; + KeyData->ReceivedGet = true; + WriteToString<32> Name("Get(", KeyData->KeyIndex, ")"); + if (KeyData->ShouldBeHit) + { + CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); + } + else if (KeyData->ForceMiss) + { + CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str()); + } + if (!KeyData->ForceMiss && Succeeded) + { + CHECK_MESSAGE(RecordResult->Values.size() == NumValues, + WriteToString<32>(Name, " number of values did not match.").c_str()); + for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values) + { + int ExpectedValueIndex = 0; + for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) + { + if (ValueIds[ExpectedValueIndex] == Value.Id) + { + break; + } + } + CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str()); + + WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")"); + + CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; + CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(), + WriteToString<32>(ValueName, " RawHash did not match.").c_str()); + CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(), + WriteToString<32>(ValueName, " RawSize did not match.").c_str()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = Value.Body.Decompress(); + CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize, + WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str()); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; + CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str()); + } + } + } + } + } + + // GetCacheValues + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + + cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + GetCacheValuesRequest.Requests.reserve(GetValueRequests.size()); + for (CacheGetValueRequest& GetRequest : GetValueRequests) + { + GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); + } + + CbPackage Package; + CHECK(GetCacheValuesRequest.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed."); + IoBuffer MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + CbPackage Response = ParsePackageMessage(MessageBuffer); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); + cacherequests::GetCacheValuesResult GetCacheValuesResult; + CHECK(GetCacheValuesResult.Parse(Response)); + for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results) + { + bool Succeeded = ValueResult.RawHash != IoHash::Zero; + CacheGetValueRequest& Request = GetValueRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + KeyData->ReceivedGetValue = true; + WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv); + + if (KeyData->ShouldBeHit) + { + CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); + } + else if (KeyData->ForceMiss) + { + CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str()); + } + if (!KeyData->ForceMiss && Succeeded) + { + CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; + CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), + WriteToString<32>(Name, " RawHash did not match.").c_str()); + CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), + WriteToString<32>(Name, " RawSize did not match.").c_str()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ValueResult.Body.Decompress(); + CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, + WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[0]; + CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); + } + } + } + } + + // GetCacheChunks + { + std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { + return A.Key.Hash < B.Key.Hash; + }); + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + GetCacheChunksRequest.Requests.reserve(ChunkRequests.size()); + for (CacheGetChunkRequest& ChunkRequest : ChunkRequests) + { + GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key, + .ValueId = ChunkRequest.ValueId, + .ChunkId = IoHash(), + .RawOffset = ChunkRequest.RawOffset, + .RawSize = ChunkRequest.RawSize, + .Policy = ChunkRequest.Policy}); + } + CbPackage Package; + CHECK(GetCacheChunksRequest.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed."); + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); + cacherequests::GetCacheChunksResult GetCacheChunksResult; + CHECK(GetCacheChunksResult.Parse(Response)); + CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(), + "GetCacheChunks response count did not match request count."); + + for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results) + { + bool Succeeded = ValueResult.RawHash != IoHash::Zero; + + CacheGetChunkRequest& Request = ChunkRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; + KeyData->ReceivedChunk[ValueIndex] = true; + WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv); + + if (KeyData->ShouldBeHit) + { + CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str()); + } + else if (KeyData->ForceMiss) + { + CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str()); + } + if (KeyData->ShouldBeHit && Succeeded) + { + CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; + CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), + WriteToString<32>(Name, " had unexpected RawHash.").c_str()); + CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), + WriteToString<32>(Name, " had unexpected RawSize.").c_str()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ValueResult.Body.Decompress(); + CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, + WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; + CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); + } + } + } + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.UseValueAPI) + { + CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + CHECK_MESSAGE( + KeyData.ReceivedChunk[ValueIndex], + WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str()); + } + } + else + { + CHECK_MESSAGE(KeyData.ReceivedGetValue, + WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); + CHECK_MESSAGE(KeyData.ReceivedChunk[0], + WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); + } + } +} + +class ZenServerTestHelper +{ +public: + ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {} + ~ZenServerTestHelper() {} + + void SpawnServers(std::string_view AdditionalServerArgs = std::string_view()) + { + SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs); + } + + void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs) + { + ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount); + + m_Instances.resize(m_ServerCount); + + for (int i = 0; i < m_ServerCount; ++i) + { + auto& Instance = m_Instances[i]; + + Instance = std::make_unique<ZenServerInstance>(TestEnv); + Instance->SetTestDir(TestEnv.CreateNewTestDir()); + + Callback(*Instance); + + Instance->SpawnServer(13337 + i, AdditionalServerArgs); + } + + for (int i = 0; i < m_ServerCount; ++i) + { + auto& Instance = m_Instances[i]; + + Instance->WaitUntilReady(); + } + } + + ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } + +private: + std::string m_HelperId; + int m_ServerCount = 0; + std::vector<std::unique_ptr<ZenServerInstance> > m_Instances; +}; + +class IoDispatcher +{ +public: + IoDispatcher(asio::io_context& IoCtx) : m_IoCtx(IoCtx) {} + ~IoDispatcher() { Stop(); } + + void Run() + { + Stop(); + + m_Running = true; + + m_IoThread = std::thread([this]() { + try + { + m_IoCtx.run(); + } + catch (std::exception& Error) + { + m_Error = Error; + } + }); + } + + void Stop() + { + if (m_Running) + { + m_Running = false; + + if (m_IoThread.joinable()) + { + m_IoThread.join(); + } + } + } + + bool IsRunning() const { return m_Running; } + + const std::exception& Error() { return m_Error; } + +private: + asio::io_context& m_IoCtx; + std::thread m_IoThread; + std::exception m_Error; + std::atomic_bool m_Running{false}; +}; + +TEST_CASE("http.basics") +{ + using namespace std::literals; + + ZenServerTestHelper Servers{"http.basics"sv, 1}; + Servers.SpawnServers(); + + ZenServerInstance& Instance = Servers.GetInstance(0); + const std::string BaseUri = Instance.GetBaseUri(); + + { + cpr::Response r = cpr::Get(cpr::Url{fmt::format("{}/testing/hello", BaseUri)}); + CHECK(IsHttpSuccessCode(r.status_code)); + } + + { + cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/hello", BaseUri)}); + CHECK_EQ(r.status_code, 404); + } + + { + cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/echo", BaseUri)}, cpr::Body{"yoyoyoyo"}); + CHECK_EQ(r.status_code, 200); + CHECK_EQ(r.text, "yoyoyoyo"); + } +} + +TEST_CASE("http.package") +{ + using namespace std::literals; + + ZenServerTestHelper Servers{"http.package"sv, 1}; + Servers.SpawnServers(); + + ZenServerInstance& Instance = Servers.GetInstance(0); + const std::string BaseUri = Instance.GetBaseUri(); + + static const uint8_t Data1[] = {0, 1, 2, 3}; + static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8}; + + zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}), + zen::OodleCompressor::NotSet, + zen::OodleCompressionLevel::None); + zen::CbAttachment Attach1{AttachmentData1, AttachmentData1.DecodeRawHash()}; + zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}), + zen::OodleCompressor::NotSet, + zen::OodleCompressionLevel::None); + zen::CbAttachment Attach2{AttachmentData2, AttachmentData2.DecodeRawHash()}; + + zen::CbObjectWriter Writer; + + Writer.AddAttachment("attach1", Attach1); + Writer.AddAttachment("attach2", Attach2); + + zen::CbObject CoreObject = Writer.Save(); + + zen::CbPackage TestPackage; + TestPackage.SetObject(CoreObject); + TestPackage.AddAttachment(Attach1); + TestPackage.AddAttachment(Attach2); + + zen::HttpClient TestClient(BaseUri); + zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage); + + zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + + CHECK_EQ(ResponsePackage, TestPackage); +} + +TEST_CASE("websocket.basic") +{ + if (true) + { + return; + } + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto MaxWaitTime = std::chrono::seconds(5); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber, "--websocket-port=8848"sv); + Inst.WaitUntilReady(); + + asio::io_context IoCtx; + IoDispatcher IoDispatcher(IoCtx); + auto WebSocket = WebSocketClient::Create(IoCtx); + + auto ConnectFuture = WebSocket->Connect({.Host = "127.0.0.1", .Port = 8848, .Endpoint = "/zen"}); + IoDispatcher.Run(); + + ConnectFuture.wait_for(MaxWaitTime); + CHECK(ConnectFuture.get()); + + for (size_t Idx = 0; Idx < 10; Idx++) + { + CbObjectWriter Request; + Request << "Method"sv + << "SayHello"sv; + + WebSocketMessage RequestMsg; + RequestMsg.SetMessageType(WebSocketMessageType::kRequest); + RequestMsg.SetBody(Request.Save()); + + auto ResponseFuture = WebSocket->SendRequest(std::move(RequestMsg)); + ResponseFuture.wait_for(MaxWaitTime); + + CbObject Response = ResponseFuture.get().Body().GetObject(); + std::string_view Message = Response["Result"].AsString(); + + CHECK(Message == "Hello Friend!!"sv); + } + + WebSocket->Disconnect(); + + IoCtx.stop(); + IoDispatcher.Stop(); +} + +std::string +OidAsString(const Oid& Id) +{ + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); +} + +CbPackage +CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments) +{ + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + Package.AddAttachment(Attach); + ZEN_DEBUG("Added attachment {}", Attach.GetHash()); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; +}; + +std::vector<std::pair<Oid, CompressedBuffer> > +CreateAttachments(const std::span<const size_t>& Sizes) +{ + std::vector<std::pair<Oid, CompressedBuffer> > Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + std::vector<uint8_t> Data; + Data.resize(Size); + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < Size / 2; ++Idx) + { + DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); + } + if (Size & 1) + { + Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); + } + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); + } + return Result; +} + +cpr::Body +AsBody(const IoBuffer& Payload) +{ + return cpr::Body{(const char*)Payload.GetData(), Payload.Size()}; +}; + +enum CbWriterMeta +{ + BeginObject, + EndObject, + BeginArray, + EndArray +}; + +inline CbWriter& +operator<<(CbWriter& Writer, CbWriterMeta Meta) +{ + switch (Meta) + { + case BeginObject: + Writer.BeginObject(); + break; + case EndObject: + Writer.EndObject(); + break; + case BeginArray: + Writer.BeginArray(); + break; + case EndArray: + Writer.EndArray(); + break; + default: + ZEN_ASSERT(false); + } + return Writer; +} + +TEST_CASE("project.remote") +{ + using namespace std::literals; + + ZenServerTestHelper Servers("remote", 3); + Servers.SpawnServers("--debug"); + + std::vector<Oid> OpIds; + OpIds.reserve(24); + for (size_t I = 0; I < 24; ++I) + { + OpIds.emplace_back(Oid::NewOid()); + } + + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments; + { + std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, + 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, + 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045, + 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); + auto It = AttachmentSizes.begin(); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[4]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[5]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[6]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[7]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[8]] = CreateAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[9]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[10]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[11]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[12]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[13]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[14]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[15]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[16]] = CreateAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[17]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[18]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[19]] = CreateAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[20]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[21]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[22]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[23]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + ZEN_ASSERT(It == AttachmentSizes.end()); + } + + auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { + XXH3_128Stream KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + Oid Id; + memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits); + IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); + Ops.insert({Id, OpCoreHash}); + }; + + auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) { + CbObjectWriter Project; + Project.AddString("id"sv, ProjectName); + Project.AddString("root"sv, ""sv); + Project.AddString("engine"sv, ""sv); + Project.AddString("project"sv, ""sv); + Project.AddString("projectfile"sv, ""sv); + IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer(); + std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName); + Session.SetUrl({ProjectRequest}); + Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + }; + + auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { + std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName); + Session.SetUrl({CreateOplogRequest}); + Session.SetBody(cpr::Body{}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + }; + + auto MakeOp = [](cpr::Session& Session, + std::string_view UrlBase, + std::string_view ProjectName, + std::string_view OplogName, + const CbPackage& OpPackage) { + std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName); + Session.SetUrl({CreateOpRequest}); + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); + Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + }; + + cpr::Session Session; + MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0"); + MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]); + CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size()); + AddOp(OpPackage.GetObject(), SourceOps); + MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage); + } + + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(Attachments.size()); + for (const auto& AttachmentOplog : Attachments) + { + for (const auto& Attachment : AttachmentOplog.second) + { + AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash()); + } + } + + auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer { + CbObjectWriter Writer; + Write(Writer); + IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer(); + Result.MakeOwned(); + return Result; + }; + + auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int ServerIndex, + std::string_view Project, + std::string_view Oplog) { + std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog); + Session.SetUrl({GetChunksRequest}); + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) { + Writer << "method"sv + << "getchunks"sv; + Writer << "chunks"sv << BeginArray; + for (const IoHash& Chunk : AttachmentHashes) + { + Writer << Chunk; + } + Writer << EndArray; // chunks + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); + }; + + auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) { + std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps; + std::vector<CbObject> ResultingOplog; + + std::string GetOpsRequest = + fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog); + Session.SetUrl({GetOpsRequest}); + cpr::Response Response = Session.Get(); + CHECK(IsHttpSuccessCode(Response.status_code)); + + IoBuffer Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); + CbObject OplogResonse = LoadCompactBinaryObject(Payload); + CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView(); + + for (CbFieldView OpEntry : EntriesArray) + { + CbObjectView Core = OpEntry.AsObjectView(); + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + AddOp(Op, TargetOps); + } + CHECK(SourceOps == TargetOps); + }; + + SUBCASE("File") + { + ScopedTemporaryDirectory TempDir; + { + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << path; + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + { + MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string LoadOplogRequest = + fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + Session.SetUrl({LoadOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << path; + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("File disable blocks") + { + ScopedTemporaryDirectory TempDir; + { + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "disableblocks"sv << true; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + { + MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string LoadOplogRequest = + fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + Session.SetUrl({LoadOplogRequest}); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("File force temp blocks") + { + ScopedTemporaryDirectory TempDir; + { + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "enabletempblocks"sv << true; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + { + MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string LoadOplogRequest = + fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + Session.SetUrl({LoadOplogRequest}); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("Zen") + { + ScopedTemporaryDirectory TempDir; + { + std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); + std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); + MakeProject(Session, ExportTargetUri, "proj0_copy"); + MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy"); + + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "zen"sv << BeginObject; + { + Writer << "url"sv << ExportTargetUri.substr(7); + Writer << "project" + << "proj0_copy"; + Writer << "oplog" + << "oplog0_copy"; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + + { + std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); + std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); + MakeProject(Session, ImportTargetUri, "proj1"); + MakeOplog(Session, ImportTargetUri, "proj1", "oplog1"); + std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1"); + Session.SetUrl({LoadOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "zen"sv << BeginObject; + { + Writer << "url"sv << ImportSourceUri.substr(7); + Writer << "project" + << "proj0_copy"; + Writer << "oplog" + << "oplog0_copy"; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(2, "proj1", "oplog1"); + ValidateOplog(2, "proj1", "oplog1"); + } +} + +# if 0 +TEST_CASE("lifetime.owner") +{ + // This test is designed to verify that the hand-over of sponsor processes is handled + // correctly for the case when a second or third process is launched on the same port + // + // Due to the nature of it, it cannot be + + const uint16_t PortNumber = 23456; + + ZenServerInstance Zen1(TestEnv); + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + Zen1.SetTestDir(TestDir1); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + Zen1.Detach(); + + ZenServerInstance Zen2(TestEnv); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + Zen2.SetTestDir(TestDir2); + Zen2.SpawnServer(PortNumber); + Zen2.WaitUntilReady(); + Zen2.Detach(); +} + +TEST_CASE("lifetime.owner.2") +{ + // This test is designed to verify that the hand-over of sponsor processes is handled + // correctly for the case when a second or third process is launched on the same port + // + // Due to the nature of it, it cannot be + + const uint16_t PortNumber = 13456; + + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + + ZenServerInstance Zen1(TestEnv); + Zen1.SetTestDir(TestDir1); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + + ZenServerInstance Zen2(TestEnv); + Zen2.SetTestDir(TestDir2); + Zen2.SetOwnerPid(Zen1.GetPid()); + Zen2.SpawnServer(PortNumber + 1); + Zen2.Detach(); + + ZenServerInstance Zen3(TestEnv); + Zen3.SetTestDir(TestDir2); + Zen3.SetOwnerPid(Zen1.GetPid()); + Zen3.SpawnServer(PortNumber + 1); + Zen3.Detach(); + + ZenServerInstance Zen4(TestEnv); + Zen4.SetTestDir(TestDir2); + Zen4.SetOwnerPid(Zen1.GetPid()); + Zen4.SpawnServer(PortNumber + 1); + Zen4.Detach(); +} +# endif + +} // namespace zen::tests +#else +int +main() +{ +} +#endif |