aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver-test/zenserver-test.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
-rw-r--r--src/zenserver-test/zenserver-test.cpp3323
1 files changed, 3323 insertions, 0 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
new file mode 100644
index 000000000..3195181d1
--- /dev/null
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -0,0 +1,3323 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/refcount.h>
+#include <zencore/stream.h>
+#include <zencore/string.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/xxhash.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/httpshared.h>
+#include <zenhttp/websocket.h>
+#include <zenhttp/zenhttp.h>
+#include <zenutil/cache/cache.h>
+#include <zenutil/cache/cacherequests.h>
+#include <zenutil/zenserverprocess.h>
+
+#if ZEN_USE_MIMALLOC
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <mimalloc.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+#endif
+
+#include <http_parser.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# pragma comment(lib, "Crypt32.lib")
+# pragma comment(lib, "Wldap32.lib")
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+#undef GetObject
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <atomic>
+#include <filesystem>
+#include <map>
+#include <random>
+#include <span>
+#include <thread>
+#include <typeindex>
+#include <unordered_map>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# include <atlbase.h>
+# include <process.h>
+#endif
+
+#include <asio.hpp>
+
+//////////////////////////////////////////////////////////////////////////
+
+#include "projectclient.h"
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+# define ZEN_TEST_WITH_RUNNER 1
+# include <zencore/testing.h>
+# include <zencore/workthreadpool.h>
+#endif
+
+using namespace std::literals;
+
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+struct Concurrency
+{
+ template<typename... T>
+ static void parallel_invoke(T&&... t)
+ {
+ constexpr size_t NumTs = sizeof...(t);
+ std::thread Threads[NumTs] = {
+ std::thread(std::forward<T>(t))...,
+ };
+
+ for (std::thread& Thread : Threads)
+ {
+ Thread.join();
+ }
+ }
+};
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Custom logging -- test code, this should be tweaked
+//
+
+namespace logging {
+using namespace spdlog;
+using namespace spdlog::details;
+using namespace std::literals;
+
+class full_test_formatter final : public spdlog::formatter
+{
+public:
+ full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId)
+ {
+ }
+
+ virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); }
+
+ static constexpr bool UseDate = false;
+
+ virtual void format(const details::log_msg& msg, memory_buf_t& dest) override
+ {
+ using std::chrono::duration_cast;
+ using std::chrono::milliseconds;
+ using std::chrono::seconds;
+
+ if constexpr (UseDate)
+ {
+ auto secs = std::chrono::duration_cast<seconds>(msg.time.time_since_epoch());
+ if (secs != m_LastLogSecs)
+ {
+ m_CachedTm = os::localtime(log_clock::to_time_t(msg.time));
+ m_LastLogSecs = secs;
+ }
+ }
+
+ const auto& tm_time = m_CachedTm;
+
+ // cache the date/time part for the next second.
+ auto duration = msg.time - m_Epoch;
+ auto secs = duration_cast<seconds>(duration);
+
+ if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0)
+ {
+ m_CachedDatetime.clear();
+ m_CachedDatetime.push_back('[');
+
+ if constexpr (UseDate)
+ {
+ fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime);
+ m_CachedDatetime.push_back('-');
+
+ fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime);
+ m_CachedDatetime.push_back('-');
+
+ fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime);
+ m_CachedDatetime.push_back(' ');
+
+ fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime);
+ m_CachedDatetime.push_back(':');
+
+ fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime);
+ m_CachedDatetime.push_back(':');
+
+ fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime);
+ }
+ else
+ {
+ int Count = int(secs.count());
+
+ const int LogSecs = Count % 60;
+ Count /= 60;
+
+ const int LogMins = Count % 60;
+ Count /= 60;
+
+ const int LogHours = Count;
+
+ fmt_helper::pad2(LogHours, m_CachedDatetime);
+ m_CachedDatetime.push_back(':');
+ fmt_helper::pad2(LogMins, m_CachedDatetime);
+ m_CachedDatetime.push_back(':');
+ fmt_helper::pad2(LogSecs, m_CachedDatetime);
+ }
+
+ m_CachedDatetime.push_back('.');
+
+ m_CacheTimestamp = secs;
+ }
+
+ dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end());
+
+ auto millis = fmt_helper::time_fraction<milliseconds>(msg.time);
+ fmt_helper::pad3(static_cast<uint32_t>(millis.count()), dest);
+ dest.push_back(']');
+ dest.push_back(' ');
+
+ if (!m_LogId.empty())
+ {
+ dest.push_back('[');
+ fmt_helper::append_string_view(m_LogId, dest);
+ dest.push_back(']');
+ dest.push_back(' ');
+ }
+
+ // append logger name if exists
+ if (msg.logger_name.size() > 0)
+ {
+ dest.push_back('[');
+ fmt_helper::append_string_view(msg.logger_name, dest);
+ dest.push_back(']');
+ dest.push_back(' ');
+ }
+
+ dest.push_back('[');
+ // wrap the level name with color
+ msg.color_range_start = dest.size();
+ fmt_helper::append_string_view(level::to_string_view(msg.level), dest);
+ msg.color_range_end = dest.size();
+ dest.push_back(']');
+ dest.push_back(' ');
+
+ // add source location if present
+ if (!msg.source.empty())
+ {
+ dest.push_back('[');
+ const char* filename = details::short_filename_formatter<details::null_scoped_padder>::basename(msg.source.filename);
+ fmt_helper::append_string_view(filename, dest);
+ dest.push_back(':');
+ fmt_helper::append_int(msg.source.line, dest);
+ dest.push_back(']');
+ dest.push_back(' ');
+ }
+
+ fmt_helper::append_string_view(msg.payload, dest);
+ fmt_helper::append_string_view("\n"sv, dest);
+ }
+
+private:
+ std::chrono::time_point<std::chrono::system_clock> m_Epoch;
+ std::tm m_CachedTm;
+ std::chrono::seconds m_LastLogSecs;
+ std::chrono::seconds m_CacheTimestamp{0};
+ memory_buf_t m_CachedDatetime;
+ std::string m_LogId;
+};
+} // namespace logging
+
+//////////////////////////////////////////////////////////////////////////
+
+#if 0
+
+int
+main()
+{
+ mi_version();
+
+ zen::Sleep(1000);
+
+ zen::Stopwatch timer;
+
+ const int RequestCount = 100000;
+
+ cpr::Session Sessions[10];
+
+ for (auto& Session : Sessions)
+ {
+ Session.SetUrl(cpr::Url{"http://localhost:1337/test/hello"});
+ //Session.SetUrl(cpr::Url{ "http://arn-wd-l0182:1337/test/hello" });
+ }
+
+ auto Run = [](cpr::Session& Session) {
+ for (int i = 0; i < 10000; ++i)
+ {
+ cpr::Response Result = Session.Get();
+
+ if (Result.status_code != 200)
+ {
+ ZEN_WARN("request response: {}", Result.status_code);
+ }
+ }
+ };
+
+ Concurrency::parallel_invoke([&] { Run(Sessions[0]); },
+ [&] { Run(Sessions[1]); },
+ [&] { Run(Sessions[2]); },
+ [&] { Run(Sessions[3]); },
+ [&] { Run(Sessions[4]); },
+ [&] { Run(Sessions[5]); },
+ [&] { Run(Sessions[6]); },
+ [&] { Run(Sessions[7]); },
+ [&] { Run(Sessions[8]); },
+ [&] { Run(Sessions[9]); });
+
+ // cpr::Response r = cpr::Get(cpr::Url{ "http://localhost:1337/test/hello" });
+
+ ZEN_INFO("{} requests in {} ({})",
+ RequestCount,
+ zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()),
+ zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req"));
+
+ return 0;
+}
+#elif 0
+// #include <restinio/all.hpp>
+
+int
+main()
+{
+ mi_version();
+ restinio::run(restinio::on_thread_pool(32).port(8080).request_handler(
+ [](auto req) { return req->create_response().set_body("Hello, World!").done(); }));
+ return 0;
+}
+#elif ZEN_WITH_TESTS
+
+zen::ZenServerEnvironment TestEnv;
+
+int
+main(int argc, char** argv)
+{
+ using namespace std::literals;
+
+# if ZEN_USE_MIMALLOC
+ mi_version();
+# endif
+
+ zen::zencore_forcelinktests();
+ zen::zenhttp_forcelinktests();
+ zen::cacherequests_forcelink();
+
+ zen::logging::InitializeLogging();
+
+ spdlog::set_level(spdlog::level::debug);
+ spdlog::set_formatter(std::make_unique< ::logging::full_test_formatter>("test", std::chrono::system_clock::now()));
+
+ std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path();
+ std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test";
+
+ // This is pretty janky because we're passing most of the options through to the test
+ // framework, so we can't just use cxxopts (I think). This should ideally be cleaned up
+ // somehow in the future
+
+ std::string ServerClass;
+
+ for (int i = 1; i < argc; ++i)
+ {
+ if (argv[i] == "--http"sv)
+ {
+ if ((i + 1) < argc)
+ {
+ ServerClass = argv[++i];
+ }
+ }
+ }
+
+ TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass);
+
+ ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);
+
+ zen::testing::TestRunner Runner;
+ Runner.ApplyCommandLine(argc, argv);
+
+ return Runner.Run();
+}
+
+namespace zen::tests {
+
+TEST_CASE("default.single")
+{
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetTestDir(TestDir);
+ Instance.SpawnServer(13337);
+
+ ZEN_INFO("Waiting...");
+
+ Instance.WaitUntilReady();
+
+ std::atomic<uint64_t> RequestCount{0};
+ std::atomic<uint64_t> BatchCounter{0};
+
+ ZEN_INFO("Running single server test...");
+
+ auto IssueTestRequests = [&] {
+ const uint64_t BatchNo = BatchCounter.fetch_add(1);
+ const int ThreadId = zen::GetCurrentThreadId();
+
+ ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId);
+ cpr::Session cli;
+ cli.SetUrl(cpr::Url{"http://localhost:13337/test/hello"});
+
+ for (int i = 0; i < 10000; ++i)
+ {
+ auto res = cli.Get();
+ ++RequestCount;
+ }
+ ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId);
+ };
+
+ zen::Stopwatch timer;
+
+ Concurrency::parallel_invoke(IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests,
+ IssueTestRequests);
+
+ uint64_t Elapsed = timer.GetElapsedTimeMs();
+
+ ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
+}
+
+TEST_CASE("multi.basic")
+{
+ ZenServerInstance Instance1(TestEnv);
+ std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
+ Instance1.SetTestDir(TestDir1);
+ Instance1.SpawnServer(13337);
+
+ ZenServerInstance Instance2(TestEnv);
+ std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
+ Instance2.SetTestDir(TestDir2);
+ Instance2.SpawnServer(13338);
+
+ ZEN_INFO("Waiting...");
+
+ Instance1.WaitUntilReady();
+ Instance2.WaitUntilReady();
+
+ std::atomic<uint64_t> RequestCount{0};
+ std::atomic<uint64_t> BatchCounter{0};
+
+ auto IssueTestRequests = [&](int PortNumber) {
+ const uint64_t BatchNo = BatchCounter.fetch_add(1);
+ const int ThreadId = zen::GetCurrentThreadId();
+
+ ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber);
+
+ cpr::Session cli;
+ cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)});
+
+ for (int i = 0; i < 10000; ++i)
+ {
+ auto res = cli.Get();
+ ++RequestCount;
+ }
+ ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId);
+ };
+
+ zen::Stopwatch timer;
+
+ ZEN_INFO("Running multi-server test...");
+
+ Concurrency::parallel_invoke([&] { IssueTestRequests(13337); },
+ [&] { IssueTestRequests(13338); },
+ [&] { IssueTestRequests(13337); },
+ [&] { IssueTestRequests(13338); });
+
+ uint64_t Elapsed = timer.GetElapsedTimeMs();
+
+ ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
+}
+
+TEST_CASE("project.basic")
+{
+ using namespace std::literals;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ const uint16_t PortNumber = 13337;
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ std::atomic<uint64_t> RequestCount{0};
+
+ zen::Stopwatch timer;
+
+ std::mt19937_64 mt;
+
+ zen::StringBuilder<64> BaseUri;
+ BaseUri << fmt::format("http://localhost:{}/prj/test", PortNumber);
+
+ std::filesystem::path BinPath = zen::GetRunningExecutablePath();
+ std::filesystem::path RootPath = BinPath.parent_path().parent_path();
+ BinPath = BinPath.lexically_relative(RootPath);
+
+ SUBCASE("build store init")
+ {
+ {
+ {
+ zen::CbObjectWriter Body;
+ Body << "id"
+ << "test";
+ Body << "root" << RootPath.c_str();
+ Body << "project"
+ << "/zooom";
+ Body << "engine"
+ << "/zooom";
+
+ zen::BinaryWriter MemOut;
+ Body.Save(MemOut);
+
+ auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
+ CHECK(Response.status_code == 201);
+ }
+
+ {
+ auto Response = cpr::Get(cpr::Url{BaseUri.c_str()});
+ CHECK(Response.status_code == 200);
+
+ zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();
+
+ CHECK(ResponseObject["id"].AsString() == "test"sv);
+ CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
+ }
+ }
+
+ BaseUri << "/oplog/foobar";
+
+ {
+ {
+ zen::StringBuilder<64> PostUri;
+ PostUri << BaseUri;
+ auto Response = cpr::Post(cpr::Url{PostUri.c_str()});
+ CHECK(Response.status_code == 201);
+ }
+
+ {
+ auto Response = cpr::Get(cpr::Url{BaseUri.c_str()});
+ CHECK(Response.status_code == 200);
+
+ zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();
+
+ CHECK(ResponseObject["id"].AsString() == "foobar"sv);
+ CHECK(ResponseObject["project"].AsString() == "test"sv);
+ }
+ }
+
+ SUBCASE("build store persistence")
+ {
+ uint8_t AttachData[] = {1, 2, 3};
+
+ zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
+ zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
+
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo"
+ << "attachment" << Attach;
+
+ const std::string_view ChunkId{
+ "00000000"
+ "00000000"
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
+ OpPackage.AddAttachment(Attach);
+
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
+
+ {
+ zen::StringBuilder<64> PostUri;
+ PostUri << BaseUri << "/new";
+ auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
+
+ REQUIRE(!Response.error);
+ CHECK(Response.status_code == 201);
+ }
+
+ // Read file data
+
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << BaseUri << "/" << ChunkId;
+ auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});
+
+ REQUIRE(!Response.error);
+ CHECK(Response.status_code == 200);
+ }
+
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10";
+ auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});
+
+ REQUIRE(!Response.error);
+ CHECK(Response.status_code == 200);
+ CHECK(Response.text.size() == 10);
+ }
+
+ ZEN_INFO("+++++++");
+ }
+ SUBCASE("build store op commit") { ZEN_INFO("-------"); }
+ SUBCASE("test chunk not found error")
+ {
+ for (size_t I = 0; I < 65; I++)
+ {
+ zen::StringBuilder<128> PostUri;
+ PostUri << BaseUri << "/f77c781846caead318084604/info";
+ auto Response = cpr::Get(cpr::Url{PostUri.c_str()});
+
+ REQUIRE(!Response.error);
+ CHECK(Response.status_code == 404);
+ }
+ }
+ }
+
+ const uint64_t Elapsed = timer.GetElapsedTimeMs();
+
+ ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
+}
+
+# if 0 // this is extremely WIP
+TEST_CASE("project.pipe")
+{
+ using namespace std::literals;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ const uint16_t PortNumber = 13337;
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ zen::LocalProjectClient LocalClient(PortNumber);
+
+ zen::CbObjectWriter Cbow;
+ Cbow << "hey" << 42;
+
+ zen::CbObject Response = LocalClient.MessageTransaction(Cbow.Save());
+}
+# endif
+
+namespace utils {
+
+ struct ZenConfig
+ {
+ std::filesystem::path DataDir;
+ uint16_t Port;
+ std::string BaseUri;
+ std::string Args;
+
+ static ZenConfig New(uint16_t Port = 13337, std::string Args = "")
+ {
+ return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(),
+ .Port = Port,
+ .BaseUri = fmt::format("http://localhost:{}/z$", Port),
+ .Args = std::move(Args)};
+ }
+
+ static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
+ {
+ return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
+ }
+
+ static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug)
+ {
+ std::string Args = Debug ? "--debug" : "";
+ for (uint16_t Port : UpstreamPorts)
+ {
+ Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port);
+ }
+ return New(13337, Args);
+ }
+
+ void Spawn(ZenServerInstance& Inst)
+ {
+ Inst.SetTestDir(DataDir);
+ Inst.SpawnServer(Port, Args);
+ Inst.WaitUntilReady();
+ }
+ };
+
+ void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg)
+ {
+ Server.SetTestDir(Cfg.DataDir);
+ Server.SpawnServer(Cfg.Port, Cfg.Args);
+ Server.WaitUntilReady();
+ }
+
+} // namespace utils
+
+TEST_CASE("zcache.basic")
+{
+ using namespace std::literals;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ const uint16_t PortNumber = 13337;
+
+ const int kIterationCount = 100;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); };
+
+ {
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ // Populate with some simple data
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ zen::CbObjectWriter Cbo;
+ Cbo << "index" << i;
+
+ zen::BinaryWriter MemOut;
+ Cbo.Save(MemOut);
+
+ zen::IoHash Key = HashKey(i);
+
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)},
+ cpr::Body{(const char*)MemOut.Data(), MemOut.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // Retrieve data
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i);
+
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 200);
+ }
+
+ // Ensure bad bucket identifiers are rejected
+
+ {
+ zen::CbObjectWriter Cbo;
+ Cbo << "index" << 42;
+
+ zen::BinaryWriter MemOut;
+ Cbo.Save(MemOut);
+
+ zen::IoHash Key = HashKey(442);
+
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "te!st", Key)},
+ cpr::Body{(const char*)MemOut.Data(), MemOut.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+
+ CHECK(Result.status_code == 400);
+ }
+ }
+
+ // Verify that the data persists between process runs (the previous server has exited at this point)
+
+ {
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ // Retrieve data again
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ zen::IoHash Key = HashKey(i);
+
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 200);
+ }
+ }
+}
+
+TEST_CASE("zcache.cbpackage")
+{
+ using namespace std::literals;
+
+ auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+
+ OutAttachmentKey = CompressedData.DecodeRawHash();
+
+ zen::CbWriter Obj;
+ Obj.BeginObject("obj"sv);
+ Obj.AddBinaryAttachment("data", OutAttachmentKey);
+ Obj.EndObject();
+
+ zen::CbPackage Package;
+ Package.SetObject(Obj.Save().AsObject());
+ Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
+
+ return Package;
+ };
+
+ auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::BinaryWriter MemStream;
+
+ Package.Save(MemStream);
+
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool {
+ std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments();
+ std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments();
+
+ if (LhsAttachments.size() != RhsAttachments.size())
+ {
+ return false;
+ }
+
+ for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
+ {
+ const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
+ CHECK(RhsAttachment);
+
+ zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
+ CHECK(!LhsBuffer.IsNull());
+
+ zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress();
+ CHECK(!RhsBuffer.IsNull());
+
+ if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView()))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ };
+
+ SUBCASE("PUT/GET returns correct package")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // PUT
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // GET
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Response);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("PUT propagates upstream")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t LocalPortNumber = 13337;
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+ RemoteInstance.WaitUntilReady();
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(LocalPortNumber,
+ fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
+ LocalInstance.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in the local instance
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // The cache record can be retrieved as a package from the local instance
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+
+ // The cache record can be retrieved as a package from the remote instance
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("GET finds upstream when missing in local")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t LocalPortNumber = 13337;
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+ RemoteInstance.WaitUntilReady();
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(LocalPortNumber,
+ fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
+ LocalInstance.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in upstream cache
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // The cache record can be retrieved as a package from the local cache
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+}
+
+TEST_CASE("zcache.policy")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer {
+ auto Buf = zen::UniqueBuffer::Alloc(Size);
+ uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData());
+ for (uint64_t Idx = 0; Idx < Size; Idx++)
+ {
+ Data[Idx] = Idx % 256;
+ }
+ OutHash = zen::IoHash::HashBuffer(Data, Size);
+ return Buf;
+ };
+
+ auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+ OutAttachmentKey = CompressedData.DecodeRawHash();
+
+ zen::CbWriter Writer;
+ Writer.BeginObject("obj"sv);
+ Writer.AddBinaryAttachment("data", OutAttachmentKey);
+ Writer.EndObject();
+ CbObject CacheRecord = Writer.Save().AsObject();
+
+ OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView());
+
+ zen::CbPackage Package;
+ Package.SetObject(CacheRecord);
+ Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
+
+ return Package;
+ };
+
+ auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ SUBCASE("query - 'local' does not query upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local' does not store upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value locally
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local/remote' stores local and upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value locally and upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("query - 'local' does not query upstream (cppackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ // Store package upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local' does not store upstream (cbpackge)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ // Store packge locally
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 404);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamInst(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalInst(TestEnv);
+ const auto Bucket = "legacy"sv;
+
+ UpstreamCfg.Spawn(UpstreamInst);
+ LocalCfg.Spawn(LocalInst);
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ // Store package locally and upstream
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ }
+ }
+
+ SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
+ {
+ ZenConfig Cfg = ZenConfig::New();
+ ZenServerInstance Instance(TestEnv);
+ const auto Bucket = "test"sv;
+
+ Cfg.Spawn(Instance);
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ // Store package
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // Get package
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(IsHttpSuccessCode(Result.status_code));
+ IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
+ CbPackage ResponsePackage;
+ CHECK(ResponsePackage.TryLoad(Buffer));
+ CHECK(ResponsePackage.GetAttachments().size() == 0);
+ }
+
+ // Get record
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cb"}});
+ CHECK(IsHttpSuccessCode(Result.status_code));
+ IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
+ CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer);
+ CHECK((bool)ResponseObject);
+ }
+
+ // Get payload
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)},
+ cpr::Header{{"Accept", "application/x-ue-comp"}});
+ CHECK(IsHttpSuccessCode(Result.status_code));
+ CHECK(Result.text.size() == 0);
+ }
+ }
+
+ SUBCASE("skip - 'data' returns empty binary value")
+ {
+ ZenConfig Cfg = ZenConfig::New();
+ ZenServerInstance Instance(TestEnv);
+ const auto Bucket = "test"sv;
+
+ Cfg.Spawn(Instance);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // Get package
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(IsHttpSuccessCode(Result.status_code));
+ CHECK(Result.text.size() == 0);
+ }
+ }
+}
+
+TEST_CASE("zcache.rpc")
+{
+ using namespace std::literals;
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy RecordPolicy) {
+ std::vector<uint8_t> Data;
+ Data.resize(PayloadSize);
+ uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]);
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx)
+ {
+ DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu);
+ }
+ if (PayloadSize & 1)
+ {
+ Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff);
+ }
+ CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t Num,
+ size_t PayloadSize = 1024,
+ size_t KeyOffset = 1) -> std::vector<CacheKey> {
+ std::vector<zen::CacheKey> OutKeys;
+
+ for (uint32_t Key = 1; Key <= Num; ++Key)
+ {
+ zen::IoHash KeyHash;
+ ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key);
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ OutKeys.push_back(CacheKey);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ cacherequests::GetCacheRecordsResult Result;
+ bool Success;
+ };
+
+ auto GetCacheRecords = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::span<zen::CacheKey> Keys,
+ zen::CachePolicy Policy,
+ zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone,
+ int Pid = 0) -> GetCacheRecordResult {
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .AcceptOptions = static_cast<uint16_t>(AcceptOptions),
+ .ProcessPid = Pid,
+ .DefaultPolicy = Policy,
+ .Namespace = std::string(Namespace)};
+ for (const CacheKey& Key : Keys)
+ {
+ Request.Requests.push_back({.Key = Key});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(Request.Format(RequestWriter));
+
+ BinaryWriter Body;
+ RequestWriter.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.status_code == 200)
+ {
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ if (!Response.IsNull())
+ {
+ OutResult.Response = std::move(Response);
+ CHECK(OutResult.Result.Parse(OutResult.Response));
+ OutResult.Success = true;
+ }
+ }
+
+ return OutResult;
+ };
+
+ SUBCASE("get cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ CachePolicy Policy = CachePolicy::Default;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record);
+ CHECK(Record->Key == ExpectedKey);
+ CHECK(Record->Values.size() == 1);
+
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.Body);
+ }
+ }
+ }
+
+ SUBCASE("get missing cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ CachePolicy Policy = CachePolicy::Default;
+ std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> Keys;
+
+ for (const zen::CacheKey& Key : ExistingKeys)
+ {
+ Keys.push_back(Key);
+ Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero));
+ }
+
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ size_t KeyIndex = 0;
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const bool Missing = Index++ % 2 != 0;
+
+ if (Missing)
+ {
+ CHECK(!Record);
+ }
+ else
+ {
+ const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.Body);
+ }
+ }
+ }
+ }
+
+ SUBCASE("policy - 'QueryLocal' does not query upstream")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);
+
+ CachePolicy Policy = CachePolicy::QueryLocal;
+ GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(!Record);
+ }
+ }
+
+ SUBCASE("policy - 'QueryRemote' does query upstream")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);
+
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ }
+ }
+
+ SUBCASE("RpcAcceptOptions")
+ {
+ using namespace utils;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024);
+ std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size());
+
+ std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end());
+ Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end());
+
+ {
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(!IsFileRef);
+ }
+ }
+ }
+
+ // File path, but only for large files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef == (Body.Size() > 1024));
+ }
+ }
+ }
+
+ // File path, for all files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef);
+ }
+ }
+ }
+
+ // File handle, but only for large files
+ {
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences,
+ GetCurrentProcessId());
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef == (Body.Size() > 1024));
+ }
+ }
+ }
+
+ // File handle, for all files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences,
+ GetCurrentProcessId());
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef);
+ }
+ }
+ }
+ }
+}
+
+TEST_CASE("zcache.failing.upstream")
+{
+ // This is an exploratory test that takes a long time to run, so lets skip it by default
+ if (true)
+ {
+ return;
+ }
+
+ using namespace std::literals;
+ using namespace utils;
+
+ const uint16_t Upstream1PortNumber = 13338;
+ ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber);
+ ZenServerInstance Upstream1Server(TestEnv);
+
+ const uint16_t Upstream2PortNumber = 13339;
+ ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber);
+ ZenServerInstance Upstream2Server(TestEnv);
+
+ std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber};
+ ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false);
+ LocalCfg.Args += (" --upstream-thread-count 2");
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber);
+ const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber);
+
+ SpawnServer(Upstream1Server, Upstream1Cfg);
+ SpawnServer(Upstream2Server, Upstream2Cfg);
+ SpawnServer(LocalServer, LocalCfg);
+ bool Upstream1Running = true;
+ bool Upstream2Running = true;
+
+ using namespace std::literals;
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy RecordPolicy) {
+ std::vector<uint32_t> Data;
+ Data.resize(PayloadSize / 4);
+ for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx)
+ {
+ Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx;
+ }
+
+ CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4));
+ Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t Num,
+ size_t KeyOffset,
+ size_t PayloadSize = 8192) -> std::vector<CacheKey> {
+ std::vector<zen::CacheKey> OutKeys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ zen::IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key;
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+
+ AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ OutKeys.push_back(CacheKey);
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ if (Result.status_code != 200)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
+ OutKeys.clear();
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ cacherequests::GetCacheRecordsResult Result;
+ bool Success = false;
+ };
+
+ auto GetCacheRecords = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::span<zen::CacheKey> Keys,
+ zen::CachePolicy Policy) -> GetCacheRecordResult {
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = Policy,
+ .Namespace = std::string(Namespace)};
+ for (const CacheKey& Key : Keys)
+ {
+ Request.Requests.push_back({.Key = Key});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(Request.Format(RequestWriter));
+
+ BinaryWriter Body;
+ RequestWriter.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.status_code == 200)
+ {
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ if (!Response.IsNull())
+ {
+ OutResult.Response = std::move(Response);
+ CHECK(OutResult.Result.Parse(OutResult.Response));
+ OutResult.Success = true;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code);
+ }
+
+ return OutResult;
+ };
+
+ // Populate with some simple data
+
+ CachePolicy Policy = CachePolicy::Default;
+
+ const size_t ThreadCount = 128;
+ const size_t KeyMultiplier = 16384;
+ const size_t RecordsPerRequest = 64;
+ WorkerThreadPool Pool(ThreadCount);
+
+ std::atomic_size_t Completed = 0;
+
+ auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier];
+ RwLock KeysLock;
+
+ for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
+ {
+ size_t Iteration = I;
+ Pool.ScheduleWork([&] {
+ std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
+ if (NewKeys.size() != RecordsPerRequest)
+ {
+ ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ {
+ RwLock::ExclusiveLockScope _(KeysLock);
+ Keys[Iteration].swap(NewKeys);
+ }
+ Completed.fetch_add(1);
+ });
+ }
+ bool UseUpstream1 = false;
+ while (Completed < ThreadCount * KeyMultiplier)
+ {
+ Sleep(8000);
+
+ if (UseUpstream1)
+ {
+ if (Upstream2Running)
+ {
+ Upstream2Server.EnableTermination();
+ Upstream2Server.Shutdown();
+ Sleep(100);
+ Upstream2Running = false;
+ }
+ if (!Upstream1Running)
+ {
+ SpawnServer(Upstream1Server, Upstream1Cfg);
+ Upstream1Running = true;
+ }
+ UseUpstream1 = !UseUpstream1;
+ }
+ else
+ {
+ if (Upstream1Running)
+ {
+ Upstream1Server.EnableTermination();
+ Upstream1Server.Shutdown();
+ Sleep(100);
+ Upstream1Running = false;
+ }
+ if (!Upstream2Running)
+ {
+ SpawnServer(Upstream2Server, Upstream2Cfg);
+ Upstream2Running = true;
+ }
+ UseUpstream1 = !UseUpstream1;
+ }
+ }
+
+ Completed = 0;
+ for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
+ {
+ size_t Iteration = I;
+ std::vector<CacheKey>& LocalKeys = Keys[Iteration];
+ if (LocalKeys.empty())
+ {
+ Completed.fetch_add(1);
+ continue;
+ }
+ Pool.ScheduleWork([&] {
+ GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);
+
+ if (!Result.Success)
+ {
+ ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+
+ if (Result.Result.Results.size() != LocalKeys.size())
+ {
+ ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const CacheKey& ExpectedKey = LocalKeys[Index++];
+ if (!Record)
+ {
+ continue;
+ }
+ if (Record->Key != ExpectedKey)
+ {
+ continue;
+ }
+ if (Record->Values.size() != 1)
+ {
+ continue;
+ }
+
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ if (!Value.Body)
+ {
+ continue;
+ }
+ }
+ }
+ Completed.fetch_add(1);
+ });
+ }
+ while (Completed < ThreadCount * KeyMultiplier)
+ {
+ Sleep(10);
+ }
+}
+
+TEST_CASE("zcache.rpc.allpolicies")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
+ std::string_view TestBucket = "allpoliciestest"sv;
+ std::string_view TestNamespace = "ue4.ddc"sv;
+
+ // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
+ // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
+ constexpr int NumKeys = 256;
+ constexpr int NumValues = 4;
+ Oid ValueIds[NumValues];
+ IoHash Hash;
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ ExtendableStringBuilder<16> ValueName;
+ ValueName << "ValueId_"sv << ValueIndex;
+ static_assert(sizeof(IoHash) >= sizeof(Oid));
+ ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
+ }
+
+ struct KeyData;
+ struct UserData
+ {
+ UserData& Set(KeyData* InKeyData, int InValueIndex)
+ {
+ Data = InKeyData;
+ ValueIndex = InValueIndex;
+ return *this;
+ }
+ KeyData* Data = nullptr;
+ int ValueIndex = 0;
+ };
+ struct KeyData
+ {
+ CompressedBuffer BufferValues[NumValues];
+ uint64_t IntValues[NumValues];
+ UserData ValueUserData[NumValues];
+ bool ReceivedChunk[NumValues];
+ CacheKey Key;
+ UserData KeyUserData;
+ uint32_t KeyIndex = 0;
+ bool GetRequestsData = true;
+ bool UseValueAPI = false;
+ bool UseValuePolicy = false;
+ bool ForceMiss = false;
+ bool UseLocal = true;
+ bool UseRemote = true;
+ bool ShouldBeHit = true;
+ bool ReceivedPut = false;
+ bool ReceivedGet = false;
+ bool ReceivedPutValue = false;
+ bool ReceivedGetValue = false;
+ };
+ struct CachePutRequest
+ {
+ CacheKey Key;
+ CbObject Record;
+ CacheRecordPolicy Policy;
+ KeyData* Values;
+ UserData* Data;
+ };
+ struct CachePutValueRequest
+ {
+ CacheKey Key;
+ CompressedBuffer Value;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetRequest
+ {
+ CacheKey Key;
+ CacheRecordPolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetValueRequest
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetChunkRequest
+ {
+ CacheKey Key;
+ Oid ValueId;
+ uint64_t RawOffset;
+ uint64_t RawSize;
+ IoHash RawHash;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+
+ KeyData KeyDatas[NumKeys];
+ std::vector<CachePutRequest> PutRequests;
+ std::vector<CachePutValueRequest> PutValueRequests;
+ std::vector<CacheGetRequest> GetRequests;
+ std::vector<CacheGetValueRequest> GetValueRequests;
+ std::vector<CacheGetChunkRequest> ChunkRequests;
+
+ for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
+ {
+ IoHashStream KeyWriter;
+ KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
+ KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
+ IoHash KeyHash = KeyWriter.GetHash();
+ KeyData& KeyData = KeyDatas[KeyIndex];
+
+ KeyData.Key = CacheKey::Create(TestBucket, KeyHash);
+ KeyData.KeyIndex = KeyIndex;
+ KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0;
+ KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0;
+ KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0;
+ KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0;
+ KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0;
+ KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0;
+ KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
+ CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
+ SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
+ CachePolicy PutPolicy = SharedPolicy;
+ CachePolicy GetPolicy = SharedPolicy;
+ GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
+ CacheKey& Key = KeyData.Key;
+
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
+ KeyData.BufferValues[ValueIndex] =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
+ KeyData.ReceivedChunk[ValueIndex] = false;
+ }
+
+ UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
+ }
+ if (!KeyData.UseValueAPI)
+ {
+ CbObjectWriter Builder;
+ Builder.BeginObject("key"sv);
+ Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
+ Builder.EndObject();
+ Builder.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Builder.BeginObject();
+ Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash());
+ Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize());
+ Builder.EndObject();
+ }
+ Builder.EndArray();
+
+ CacheRecordPolicy PutRecordPolicy;
+ CacheRecordPolicy GetRecordPolicy;
+ if (!KeyData.UseValuePolicy)
+ {
+ PutRecordPolicy = CacheRecordPolicy(PutPolicy);
+ GetRecordPolicy = CacheRecordPolicy(GetPolicy);
+ }
+ else
+ {
+ // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
+ // it will use the wrong value for SkipData and fail our tests.
+ CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
+ CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
+ GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
+ }
+ PutRecordPolicy = PutBuilder.Build();
+ GetRecordPolicy = GetBuilder.Build();
+ }
+ if (!KeyData.ForceMiss)
+ {
+ PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
+ }
+ GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
+ ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
+ }
+ }
+ else
+ {
+ if (!KeyData.ForceMiss)
+ {
+ PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
+ }
+ GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
+ ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
+ }
+ }
+
+ // PutCacheRecords
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ Request.Requests.reserve(PutRequests.size());
+ for (CachePutRequest& PutRequest : PutRequests)
+ {
+ cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back();
+ RecordRequest.Key = PutRequest.Key;
+ RecordRequest.Policy = PutRequest.Policy;
+ RecordRequest.Values.reserve(NumValues);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]});
+ }
+ PutRequest.Data->Data->ReceivedPut = true;
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK_MESSAGE(Result.status_code == 200, "PutCacheRecords unexpectedly failed.");
+ }
+
+ // PutCacheValues
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+
+ cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ Request.Requests.reserve(PutValueRequests.size());
+ for (CachePutValueRequest& PutRequest : PutValueRequests)
+ {
+ Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy});
+ PutRequest.Data->Data->ReceivedPutValue = true;
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK_MESSAGE(Result.status_code == 200, "PutCacheValues unexpectedly failed.");
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.ForceMiss)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str());
+ }
+ else
+ {
+ CHECK_MESSAGE(KeyData.ReceivedPutValue,
+ WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str());
+ }
+ }
+ }
+
+ // GetCacheRecords
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ Request.Requests.reserve(GetRequests.size());
+ for (CacheGetRequest& GetRequest : GetRequests)
+ {
+ Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed.");
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load.");
+ cacherequests::GetCacheRecordsResult RequestResult;
+ CHECK(RequestResult.Parse(Response));
+ CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count.");
+ for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results)
+ {
+ bool Succeeded = RecordResult.has_value();
+ CacheGetRequest& GetRequest = GetRequests[Index++];
+ KeyData* KeyData = GetRequest.Data->Data;
+ KeyData->ReceivedGet = true;
+ WriteToString<32> Name("Get(", KeyData->KeyIndex, ")");
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str());
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CHECK_MESSAGE(RecordResult->Values.size() == NumValues,
+ WriteToString<32>(Name, " number of values did not match.").c_str());
+ for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values)
+ {
+ int ExpectedValueIndex = 0;
+ for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
+ {
+ if (ValueIds[ExpectedValueIndex] == Value.Id)
+ {
+ break;
+ }
+ }
+ CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str());
+
+ WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")");
+
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
+ CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(),
+ WriteToString<32>(ValueName, " RawHash did not match.").c_str());
+ CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(),
+ WriteToString<32>(ValueName, " RawSize did not match.").c_str());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = Value.Body.Decompress();
+ CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize,
+ WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str());
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
+ CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str());
+ }
+ }
+ }
+ }
+ }
+
+ // GetCacheValues
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+
+ cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ GetCacheValuesRequest.Requests.reserve(GetValueRequests.size());
+ for (CacheGetValueRequest& GetRequest : GetValueRequests)
+ {
+ GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
+ }
+
+ CbPackage Package;
+ CHECK(GetCacheValuesRequest.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed.");
+ IoBuffer MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ CbPackage Response = ParsePackageMessage(MessageBuffer);
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load.");
+ cacherequests::GetCacheValuesResult GetCacheValuesResult;
+ CHECK(GetCacheValuesResult.Parse(Response));
+ for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results)
+ {
+ bool Succeeded = ValueResult.RawHash != IoHash::Zero;
+ CacheGetValueRequest& Request = GetValueRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ KeyData->ReceivedGetValue = true;
+ WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv);
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str());
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
+ CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
+ WriteToString<32>(Name, " RawHash did not match.").c_str());
+ CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
+ WriteToString<32>(Name, " RawSize did not match.").c_str());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ValueResult.Body.Decompress();
+ CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
+ WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[0];
+ CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
+ }
+ }
+ }
+ }
+
+ // GetCacheChunks
+ {
+ std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
+ return A.Key.Hash < B.Key.Hash;
+ });
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ GetCacheChunksRequest.Requests.reserve(ChunkRequests.size());
+ for (CacheGetChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key,
+ .ValueId = ChunkRequest.ValueId,
+ .ChunkId = IoHash(),
+ .RawOffset = ChunkRequest.RawOffset,
+ .RawSize = ChunkRequest.RawSize,
+ .Policy = ChunkRequest.Policy});
+ }
+ CbPackage Package;
+ CHECK(GetCacheChunksRequest.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed.");
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
+ cacherequests::GetCacheChunksResult GetCacheChunksResult;
+ CHECK(GetCacheChunksResult.Parse(Response));
+ CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(),
+ "GetCacheChunks response count did not match request count.");
+
+ for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results)
+ {
+ bool Succeeded = ValueResult.RawHash != IoHash::Zero;
+
+ CacheGetChunkRequest& Request = ChunkRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0;
+ KeyData->ReceivedChunk[ValueIndex] = true;
+ WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv);
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str());
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str());
+ }
+ if (KeyData->ShouldBeHit && Succeeded)
+ {
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
+ CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
+ WriteToString<32>(Name, " had unexpected RawHash.").c_str());
+ CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
+ WriteToString<32>(Name, " had unexpected RawSize.").c_str());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ValueResult.Body.Decompress();
+ CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
+ WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
+ CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
+ }
+ }
+ }
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ CHECK_MESSAGE(
+ KeyData.ReceivedChunk[ValueIndex],
+ WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str());
+ }
+ }
+ else
+ {
+ CHECK_MESSAGE(KeyData.ReceivedGetValue,
+ WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
+ CHECK_MESSAGE(KeyData.ReceivedChunk[0],
+ WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
+ }
+ }
+}
+
+class ZenServerTestHelper
+{
+public:
+ ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {}
+ ~ZenServerTestHelper() {}
+
+ void SpawnServers(std::string_view AdditionalServerArgs = std::string_view())
+ {
+ SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs);
+ }
+
+ void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs)
+ {
+ ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount);
+
+ m_Instances.resize(m_ServerCount);
+
+ for (int i = 0; i < m_ServerCount; ++i)
+ {
+ auto& Instance = m_Instances[i];
+
+ Instance = std::make_unique<ZenServerInstance>(TestEnv);
+ Instance->SetTestDir(TestEnv.CreateNewTestDir());
+
+ Callback(*Instance);
+
+ Instance->SpawnServer(13337 + i, AdditionalServerArgs);
+ }
+
+ for (int i = 0; i < m_ServerCount; ++i)
+ {
+ auto& Instance = m_Instances[i];
+
+ Instance->WaitUntilReady();
+ }
+ }
+
+ ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; }
+
+private:
+ std::string m_HelperId;
+ int m_ServerCount = 0;
+ std::vector<std::unique_ptr<ZenServerInstance> > m_Instances;
+};
+
+class IoDispatcher
+{
+public:
+ IoDispatcher(asio::io_context& IoCtx) : m_IoCtx(IoCtx) {}
+ ~IoDispatcher() { Stop(); }
+
+ void Run()
+ {
+ Stop();
+
+ m_Running = true;
+
+ m_IoThread = std::thread([this]() {
+ try
+ {
+ m_IoCtx.run();
+ }
+ catch (std::exception& Error)
+ {
+ m_Error = Error;
+ }
+ });
+ }
+
+ void Stop()
+ {
+ if (m_Running)
+ {
+ m_Running = false;
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ }
+ }
+
+ bool IsRunning() const { return m_Running; }
+
+ const std::exception& Error() { return m_Error; }
+
+private:
+ asio::io_context& m_IoCtx;
+ std::thread m_IoThread;
+ std::exception m_Error;
+ std::atomic_bool m_Running{false};
+};
+
+TEST_CASE("http.basics")
+{
+ using namespace std::literals;
+
+ ZenServerTestHelper Servers{"http.basics"sv, 1};
+ Servers.SpawnServers();
+
+ ZenServerInstance& Instance = Servers.GetInstance(0);
+ const std::string BaseUri = Instance.GetBaseUri();
+
+ {
+ cpr::Response r = cpr::Get(cpr::Url{fmt::format("{}/testing/hello", BaseUri)});
+ CHECK(IsHttpSuccessCode(r.status_code));
+ }
+
+ {
+ cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/hello", BaseUri)});
+ CHECK_EQ(r.status_code, 404);
+ }
+
+ {
+ cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/echo", BaseUri)}, cpr::Body{"yoyoyoyo"});
+ CHECK_EQ(r.status_code, 200);
+ CHECK_EQ(r.text, "yoyoyoyo");
+ }
+}
+
+TEST_CASE("http.package")
+{
+ using namespace std::literals;
+
+ ZenServerTestHelper Servers{"http.package"sv, 1};
+ Servers.SpawnServers();
+
+ ZenServerInstance& Instance = Servers.GetInstance(0);
+ const std::string BaseUri = Instance.GetBaseUri();
+
+ static const uint8_t Data1[] = {0, 1, 2, 3};
+ static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+ zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}),
+ zen::OodleCompressor::NotSet,
+ zen::OodleCompressionLevel::None);
+ zen::CbAttachment Attach1{AttachmentData1, AttachmentData1.DecodeRawHash()};
+ zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}),
+ zen::OodleCompressor::NotSet,
+ zen::OodleCompressionLevel::None);
+ zen::CbAttachment Attach2{AttachmentData2, AttachmentData2.DecodeRawHash()};
+
+ zen::CbObjectWriter Writer;
+
+ Writer.AddAttachment("attach1", Attach1);
+ Writer.AddAttachment("attach2", Attach2);
+
+ zen::CbObject CoreObject = Writer.Save();
+
+ zen::CbPackage TestPackage;
+ TestPackage.SetObject(CoreObject);
+ TestPackage.AddAttachment(Attach1);
+ TestPackage.AddAttachment(Attach2);
+
+ zen::HttpClient TestClient(BaseUri);
+ zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage);
+
+ zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
+
+ CHECK_EQ(ResponsePackage, TestPackage);
+}
+
+TEST_CASE("websocket.basic")
+{
+ if (true)
+ {
+ return;
+ }
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto MaxWaitTime = std::chrono::seconds(5);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber, "--websocket-port=8848"sv);
+ Inst.WaitUntilReady();
+
+ asio::io_context IoCtx;
+ IoDispatcher IoDispatcher(IoCtx);
+ auto WebSocket = WebSocketClient::Create(IoCtx);
+
+ auto ConnectFuture = WebSocket->Connect({.Host = "127.0.0.1", .Port = 8848, .Endpoint = "/zen"});
+ IoDispatcher.Run();
+
+ ConnectFuture.wait_for(MaxWaitTime);
+ CHECK(ConnectFuture.get());
+
+ for (size_t Idx = 0; Idx < 10; Idx++)
+ {
+ CbObjectWriter Request;
+ Request << "Method"sv
+ << "SayHello"sv;
+
+ WebSocketMessage RequestMsg;
+ RequestMsg.SetMessageType(WebSocketMessageType::kRequest);
+ RequestMsg.SetBody(Request.Save());
+
+ auto ResponseFuture = WebSocket->SendRequest(std::move(RequestMsg));
+ ResponseFuture.wait_for(MaxWaitTime);
+
+ CbObject Response = ResponseFuture.get().Body().GetObject();
+ std::string_view Message = Response["Result"].AsString();
+
+ CHECK(Message == "Hello Friend!!"sv);
+ }
+
+ WebSocket->Disconnect();
+
+ IoCtx.stop();
+ IoDispatcher.Stop();
+}
+
+std::string
+OidAsString(const Oid& Id)
+{
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+}
+
+CbPackage
+CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments)
+{
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ Package.AddAttachment(Attach);
+ ZEN_DEBUG("Added attachment {}", Attach.GetHash());
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+};
+
+std::vector<std::pair<Oid, CompressedBuffer> >
+CreateAttachments(const std::span<const size_t>& Sizes)
+{
+ std::vector<std::pair<Oid, CompressedBuffer> > Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ std::vector<uint8_t> Data;
+ Data.resize(Size);
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < Size / 2; ++Idx)
+ {
+ DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
+ }
+ if (Size & 1)
+ {
+ Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
+ }
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
+ }
+ return Result;
+}
+
+cpr::Body
+AsBody(const IoBuffer& Payload)
+{
+ return cpr::Body{(const char*)Payload.GetData(), Payload.Size()};
+};
+
+enum CbWriterMeta
+{
+ BeginObject,
+ EndObject,
+ BeginArray,
+ EndArray
+};
+
+inline CbWriter&
+operator<<(CbWriter& Writer, CbWriterMeta Meta)
+{
+ switch (Meta)
+ {
+ case BeginObject:
+ Writer.BeginObject();
+ break;
+ case EndObject:
+ Writer.EndObject();
+ break;
+ case BeginArray:
+ Writer.BeginArray();
+ break;
+ case EndArray:
+ Writer.EndArray();
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ return Writer;
+}
+
+TEST_CASE("project.remote")
+{
+ using namespace std::literals;
+
+ ZenServerTestHelper Servers("remote", 3);
+ Servers.SpawnServers("--debug");
+
+ std::vector<Oid> OpIds;
+ OpIds.reserve(24);
+ for (size_t I = 0; I < 24; ++I)
+ {
+ OpIds.emplace_back(Oid::NewOid());
+ }
+
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments;
+ {
+ std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269,
+ 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759,
+ 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045,
+ 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
+ auto It = AttachmentSizes.begin();
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[4]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[5]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[6]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[7]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[8]] = CreateAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[9]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[10]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[11]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[12]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[13]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[14]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[15]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[16]] = CreateAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[17]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[18]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[19]] = CreateAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[20]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[21]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[22]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[23]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ ZEN_ASSERT(It == AttachmentSizes.end());
+ }
+
+ auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
+ XXH3_128Stream KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+ Oid Id;
+ memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits);
+ IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
+ Ops.insert({Id, OpCoreHash});
+ };
+
+ auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) {
+ CbObjectWriter Project;
+ Project.AddString("id"sv, ProjectName);
+ Project.AddString("root"sv, ""sv);
+ Project.AddString("engine"sv, ""sv);
+ Project.AddString("project"sv, ""sv);
+ Project.AddString("projectfile"sv, ""sv);
+ IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer();
+ std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName);
+ Session.SetUrl({ProjectRequest});
+ Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ };
+
+ auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) {
+ std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName);
+ Session.SetUrl({CreateOplogRequest});
+ Session.SetBody(cpr::Body{});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ };
+
+ auto MakeOp = [](cpr::Session& Session,
+ std::string_view UrlBase,
+ std::string_view ProjectName,
+ std::string_view OplogName,
+ const CbPackage& OpPackage) {
+ std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName);
+ Session.SetUrl({CreateOpRequest});
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
+ Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ };
+
+ cpr::Session Session;
+ MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0");
+ MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]);
+ CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size());
+ AddOp(OpPackage.GetObject(), SourceOps);
+ MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage);
+ }
+
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(Attachments.size());
+ for (const auto& AttachmentOplog : Attachments)
+ {
+ for (const auto& Attachment : AttachmentOplog.second)
+ {
+ AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash());
+ }
+ }
+
+ auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer {
+ CbObjectWriter Writer;
+ Write(Writer);
+ IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer();
+ Result.MakeOwned();
+ return Result;
+ };
+
+ auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int ServerIndex,
+ std::string_view Project,
+ std::string_view Oplog) {
+ std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
+ Session.SetUrl({GetChunksRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "getchunks"sv;
+ Writer << "chunks"sv << BeginArray;
+ for (const IoHash& Chunk : AttachmentHashes)
+ {
+ Writer << Chunk;
+ }
+ Writer << EndArray; // chunks
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
+ };
+
+ auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) {
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps;
+ std::vector<CbObject> ResultingOplog;
+
+ std::string GetOpsRequest =
+ fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
+ Session.SetUrl({GetOpsRequest});
+ cpr::Response Response = Session.Get();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+
+ IoBuffer Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ CbObject OplogResonse = LoadCompactBinaryObject(Payload);
+ CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView();
+
+ for (CbFieldView OpEntry : EntriesArray)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ AddOp(Op, TargetOps);
+ }
+ CHECK(SourceOps == TargetOps);
+ };
+
+ SUBCASE("File")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << path;
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ {
+ MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string LoadOplogRequest =
+ fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ Session.SetUrl({LoadOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << path;
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("File disable blocks")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "disableblocks"sv << true;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ {
+ MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string LoadOplogRequest =
+ fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ Session.SetUrl({LoadOplogRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("File force temp blocks")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "enabletempblocks"sv << true;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ {
+ MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string LoadOplogRequest =
+ fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ Session.SetUrl({LoadOplogRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("Zen")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
+ std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
+ MakeProject(Session, ExportTargetUri, "proj0_copy");
+ MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy");
+
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "zen"sv << BeginObject;
+ {
+ Writer << "url"sv << ExportTargetUri.substr(7);
+ Writer << "project"
+ << "proj0_copy";
+ Writer << "oplog"
+ << "oplog0_copy";
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+
+ {
+ std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
+ std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
+ MakeProject(Session, ImportTargetUri, "proj1");
+ MakeOplog(Session, ImportTargetUri, "proj1", "oplog1");
+ std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1");
+ Session.SetUrl({LoadOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "zen"sv << BeginObject;
+ {
+ Writer << "url"sv << ImportSourceUri.substr(7);
+ Writer << "project"
+ << "proj0_copy";
+ Writer << "oplog"
+ << "oplog0_copy";
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(2, "proj1", "oplog1");
+ ValidateOplog(2, "proj1", "oplog1");
+ }
+}
+
+# if 0
+TEST_CASE("lifetime.owner")
+{
+ // This test is designed to verify that the hand-over of sponsor processes is handled
+ // correctly for the case when a second or third process is launched on the same port
+ //
+ // Due to the nature of it, it cannot be
+
+ const uint16_t PortNumber = 23456;
+
+ ZenServerInstance Zen1(TestEnv);
+ std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
+ Zen1.SetTestDir(TestDir1);
+ Zen1.SpawnServer(PortNumber);
+ Zen1.WaitUntilReady();
+ Zen1.Detach();
+
+ ZenServerInstance Zen2(TestEnv);
+ std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
+ Zen2.SetTestDir(TestDir2);
+ Zen2.SpawnServer(PortNumber);
+ Zen2.WaitUntilReady();
+ Zen2.Detach();
+}
+
+TEST_CASE("lifetime.owner.2")
+{
+ // This test is designed to verify that the hand-over of sponsor processes is handled
+ // correctly for the case when a second or third process is launched on the same port
+ //
+ // Due to the nature of it, it cannot be
+
+ const uint16_t PortNumber = 13456;
+
+ std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
+ std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Zen1(TestEnv);
+ Zen1.SetTestDir(TestDir1);
+ Zen1.SpawnServer(PortNumber);
+ Zen1.WaitUntilReady();
+
+ ZenServerInstance Zen2(TestEnv);
+ Zen2.SetTestDir(TestDir2);
+ Zen2.SetOwnerPid(Zen1.GetPid());
+ Zen2.SpawnServer(PortNumber + 1);
+ Zen2.Detach();
+
+ ZenServerInstance Zen3(TestEnv);
+ Zen3.SetTestDir(TestDir2);
+ Zen3.SetOwnerPid(Zen1.GetPid());
+ Zen3.SpawnServer(PortNumber + 1);
+ Zen3.Detach();
+
+ ZenServerInstance Zen4(TestEnv);
+ Zen4.SetTestDir(TestDir2);
+ Zen4.SetOwnerPid(Zen1.GetPid());
+ Zen4.SpawnServer(PortNumber + 1);
+ Zen4.Detach();
+}
+# endif
+
+} // namespace zen::tests
+#else
+int
+main()
+{
+}
+#endif