aboutsummaryrefslogtreecommitdiff
path: root/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /zenserver-test/zenserver-test.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
-rw-r--r--zenserver-test/zenserver-test.cpp3323
1 files changed, 0 insertions, 3323 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
deleted file mode 100644
index 3195181d1..000000000
--- a/zenserver-test/zenserver-test.cpp
+++ /dev/null
@@ -1,3323 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/refcount.h>
-#include <zencore/stream.h>
-#include <zencore/string.h>
-#include <zencore/testutils.h>
-#include <zencore/thread.h>
-#include <zencore/timer.h>
-#include <zencore/xxhash.h>
-#include <zenhttp/httpclient.h>
-#include <zenhttp/httpshared.h>
-#include <zenhttp/websocket.h>
-#include <zenhttp/zenhttp.h>
-#include <zenutil/cache/cache.h>
-#include <zenutil/cache/cacherequests.h>
-#include <zenutil/zenserverprocess.h>
-
-#if ZEN_USE_MIMALLOC
-ZEN_THIRD_PARTY_INCLUDES_START
-# include <mimalloc.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-#endif
-
-#include <http_parser.h>
-
-#if ZEN_PLATFORM_WINDOWS
-# pragma comment(lib, "Crypt32.lib")
-# pragma comment(lib, "Wldap32.lib")
-#endif
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-#undef GetObject
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <atomic>
-#include <filesystem>
-#include <map>
-#include <random>
-#include <span>
-#include <thread>
-#include <typeindex>
-#include <unordered_map>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <ppl.h>
-# include <atlbase.h>
-# include <process.h>
-#endif
-
-#include <asio.hpp>
-
-//////////////////////////////////////////////////////////////////////////
-
-#include "projectclient.h"
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-# define ZEN_TEST_WITH_RUNNER 1
-# include <zencore/testing.h>
-# include <zencore/workthreadpool.h>
-#endif
-
-using namespace std::literals;
-
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
-struct Concurrency
-{
- template<typename... T>
- static void parallel_invoke(T&&... t)
- {
- constexpr size_t NumTs = sizeof...(t);
- std::thread Threads[NumTs] = {
- std::thread(std::forward<T>(t))...,
- };
-
- for (std::thread& Thread : Threads)
- {
- Thread.join();
- }
- }
-};
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-//
-// Custom logging -- test code, this should be tweaked
-//
-
-namespace logging {
-using namespace spdlog;
-using namespace spdlog::details;
-using namespace std::literals;
-
-class full_test_formatter final : public spdlog::formatter
-{
-public:
- full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId)
- {
- }
-
- virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); }
-
- static constexpr bool UseDate = false;
-
- virtual void format(const details::log_msg& msg, memory_buf_t& dest) override
- {
- using std::chrono::duration_cast;
- using std::chrono::milliseconds;
- using std::chrono::seconds;
-
- if constexpr (UseDate)
- {
- auto secs = std::chrono::duration_cast<seconds>(msg.time.time_since_epoch());
- if (secs != m_LastLogSecs)
- {
- m_CachedTm = os::localtime(log_clock::to_time_t(msg.time));
- m_LastLogSecs = secs;
- }
- }
-
- const auto& tm_time = m_CachedTm;
-
- // cache the date/time part for the next second.
- auto duration = msg.time - m_Epoch;
- auto secs = duration_cast<seconds>(duration);
-
- if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0)
- {
- m_CachedDatetime.clear();
- m_CachedDatetime.push_back('[');
-
- if constexpr (UseDate)
- {
- fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime);
- m_CachedDatetime.push_back('-');
-
- fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime);
- m_CachedDatetime.push_back('-');
-
- fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime);
- m_CachedDatetime.push_back(' ');
-
- fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime);
- m_CachedDatetime.push_back(':');
-
- fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime);
- m_CachedDatetime.push_back(':');
-
- fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime);
- }
- else
- {
- int Count = int(secs.count());
-
- const int LogSecs = Count % 60;
- Count /= 60;
-
- const int LogMins = Count % 60;
- Count /= 60;
-
- const int LogHours = Count;
-
- fmt_helper::pad2(LogHours, m_CachedDatetime);
- m_CachedDatetime.push_back(':');
- fmt_helper::pad2(LogMins, m_CachedDatetime);
- m_CachedDatetime.push_back(':');
- fmt_helper::pad2(LogSecs, m_CachedDatetime);
- }
-
- m_CachedDatetime.push_back('.');
-
- m_CacheTimestamp = secs;
- }
-
- dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end());
-
- auto millis = fmt_helper::time_fraction<milliseconds>(msg.time);
- fmt_helper::pad3(static_cast<uint32_t>(millis.count()), dest);
- dest.push_back(']');
- dest.push_back(' ');
-
- if (!m_LogId.empty())
- {
- dest.push_back('[');
- fmt_helper::append_string_view(m_LogId, dest);
- dest.push_back(']');
- dest.push_back(' ');
- }
-
- // append logger name if exists
- if (msg.logger_name.size() > 0)
- {
- dest.push_back('[');
- fmt_helper::append_string_view(msg.logger_name, dest);
- dest.push_back(']');
- dest.push_back(' ');
- }
-
- dest.push_back('[');
- // wrap the level name with color
- msg.color_range_start = dest.size();
- fmt_helper::append_string_view(level::to_string_view(msg.level), dest);
- msg.color_range_end = dest.size();
- dest.push_back(']');
- dest.push_back(' ');
-
- // add source location if present
- if (!msg.source.empty())
- {
- dest.push_back('[');
- const char* filename = details::short_filename_formatter<details::null_scoped_padder>::basename(msg.source.filename);
- fmt_helper::append_string_view(filename, dest);
- dest.push_back(':');
- fmt_helper::append_int(msg.source.line, dest);
- dest.push_back(']');
- dest.push_back(' ');
- }
-
- fmt_helper::append_string_view(msg.payload, dest);
- fmt_helper::append_string_view("\n"sv, dest);
- }
-
-private:
- std::chrono::time_point<std::chrono::system_clock> m_Epoch;
- std::tm m_CachedTm;
- std::chrono::seconds m_LastLogSecs;
- std::chrono::seconds m_CacheTimestamp{0};
- memory_buf_t m_CachedDatetime;
- std::string m_LogId;
-};
-} // namespace logging
-
-//////////////////////////////////////////////////////////////////////////
-
-#if 0
-
-int
-main()
-{
- mi_version();
-
- zen::Sleep(1000);
-
- zen::Stopwatch timer;
-
- const int RequestCount = 100000;
-
- cpr::Session Sessions[10];
-
- for (auto& Session : Sessions)
- {
- Session.SetUrl(cpr::Url{"http://localhost:1337/test/hello"});
- //Session.SetUrl(cpr::Url{ "http://arn-wd-l0182:1337/test/hello" });
- }
-
- auto Run = [](cpr::Session& Session) {
- for (int i = 0; i < 10000; ++i)
- {
- cpr::Response Result = Session.Get();
-
- if (Result.status_code != 200)
- {
- ZEN_WARN("request response: {}", Result.status_code);
- }
- }
- };
-
- Concurrency::parallel_invoke([&] { Run(Sessions[0]); },
- [&] { Run(Sessions[1]); },
- [&] { Run(Sessions[2]); },
- [&] { Run(Sessions[3]); },
- [&] { Run(Sessions[4]); },
- [&] { Run(Sessions[5]); },
- [&] { Run(Sessions[6]); },
- [&] { Run(Sessions[7]); },
- [&] { Run(Sessions[8]); },
- [&] { Run(Sessions[9]); });
-
- // cpr::Response r = cpr::Get(cpr::Url{ "http://localhost:1337/test/hello" });
-
- ZEN_INFO("{} requests in {} ({})",
- RequestCount,
- zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()),
- zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req"));
-
- return 0;
-}
-#elif 0
-// #include <restinio/all.hpp>
-
-int
-main()
-{
- mi_version();
- restinio::run(restinio::on_thread_pool(32).port(8080).request_handler(
- [](auto req) { return req->create_response().set_body("Hello, World!").done(); }));
- return 0;
-}
-#elif ZEN_WITH_TESTS
-
-zen::ZenServerEnvironment TestEnv;
-
-int
-main(int argc, char** argv)
-{
- using namespace std::literals;
-
-# if ZEN_USE_MIMALLOC
- mi_version();
-# endif
-
- zen::zencore_forcelinktests();
- zen::zenhttp_forcelinktests();
- zen::cacherequests_forcelink();
-
- zen::logging::InitializeLogging();
-
- spdlog::set_level(spdlog::level::debug);
- spdlog::set_formatter(std::make_unique< ::logging::full_test_formatter>("test", std::chrono::system_clock::now()));
-
- std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path();
- std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test";
-
- // This is pretty janky because we're passing most of the options through to the test
- // framework, so we can't just use cxxopts (I think). This should ideally be cleaned up
- // somehow in the future
-
- std::string ServerClass;
-
- for (int i = 1; i < argc; ++i)
- {
- if (argv[i] == "--http"sv)
- {
- if ((i + 1) < argc)
- {
- ServerClass = argv[++i];
- }
- }
- }
-
- TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass);
-
- ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);
-
- zen::testing::TestRunner Runner;
- Runner.ApplyCommandLine(argc, argv);
-
- return Runner.Run();
-}
-
-namespace zen::tests {
-
-TEST_CASE("default.single")
-{
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- ZenServerInstance Instance(TestEnv);
- Instance.SetTestDir(TestDir);
- Instance.SpawnServer(13337);
-
- ZEN_INFO("Waiting...");
-
- Instance.WaitUntilReady();
-
- std::atomic<uint64_t> RequestCount{0};
- std::atomic<uint64_t> BatchCounter{0};
-
- ZEN_INFO("Running single server test...");
-
- auto IssueTestRequests = [&] {
- const uint64_t BatchNo = BatchCounter.fetch_add(1);
- const int ThreadId = zen::GetCurrentThreadId();
-
- ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId);
- cpr::Session cli;
- cli.SetUrl(cpr::Url{"http://localhost:13337/test/hello"});
-
- for (int i = 0; i < 10000; ++i)
- {
- auto res = cli.Get();
- ++RequestCount;
- }
- ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId);
- };
-
- zen::Stopwatch timer;
-
- Concurrency::parallel_invoke(IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests,
- IssueTestRequests);
-
- uint64_t Elapsed = timer.GetElapsedTimeMs();
-
- ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
-}
-
-TEST_CASE("multi.basic")
-{
- ZenServerInstance Instance1(TestEnv);
- std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
- Instance1.SetTestDir(TestDir1);
- Instance1.SpawnServer(13337);
-
- ZenServerInstance Instance2(TestEnv);
- std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
- Instance2.SetTestDir(TestDir2);
- Instance2.SpawnServer(13338);
-
- ZEN_INFO("Waiting...");
-
- Instance1.WaitUntilReady();
- Instance2.WaitUntilReady();
-
- std::atomic<uint64_t> RequestCount{0};
- std::atomic<uint64_t> BatchCounter{0};
-
- auto IssueTestRequests = [&](int PortNumber) {
- const uint64_t BatchNo = BatchCounter.fetch_add(1);
- const int ThreadId = zen::GetCurrentThreadId();
-
- ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber);
-
- cpr::Session cli;
- cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)});
-
- for (int i = 0; i < 10000; ++i)
- {
- auto res = cli.Get();
- ++RequestCount;
- }
- ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId);
- };
-
- zen::Stopwatch timer;
-
- ZEN_INFO("Running multi-server test...");
-
- Concurrency::parallel_invoke([&] { IssueTestRequests(13337); },
- [&] { IssueTestRequests(13338); },
- [&] { IssueTestRequests(13337); },
- [&] { IssueTestRequests(13338); });
-
- uint64_t Elapsed = timer.GetElapsedTimeMs();
-
- ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
-}
-
-TEST_CASE("project.basic")
-{
- using namespace std::literals;
-
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- const uint16_t PortNumber = 13337;
-
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetTestDir(TestDir);
- Instance1.SpawnServer(PortNumber);
- Instance1.WaitUntilReady();
-
- std::atomic<uint64_t> RequestCount{0};
-
- zen::Stopwatch timer;
-
- std::mt19937_64 mt;
-
- zen::StringBuilder<64> BaseUri;
- BaseUri << fmt::format("http://localhost:{}/prj/test", PortNumber);
-
- std::filesystem::path BinPath = zen::GetRunningExecutablePath();
- std::filesystem::path RootPath = BinPath.parent_path().parent_path();
- BinPath = BinPath.lexically_relative(RootPath);
-
- SUBCASE("build store init")
- {
- {
- {
- zen::CbObjectWriter Body;
- Body << "id"
- << "test";
- Body << "root" << RootPath.c_str();
- Body << "project"
- << "/zooom";
- Body << "engine"
- << "/zooom";
-
- zen::BinaryWriter MemOut;
- Body.Save(MemOut);
-
- auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
- CHECK(Response.status_code == 201);
- }
-
- {
- auto Response = cpr::Get(cpr::Url{BaseUri.c_str()});
- CHECK(Response.status_code == 200);
-
- zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();
-
- CHECK(ResponseObject["id"].AsString() == "test"sv);
- CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
- }
- }
-
- BaseUri << "/oplog/foobar";
-
- {
- {
- zen::StringBuilder<64> PostUri;
- PostUri << BaseUri;
- auto Response = cpr::Post(cpr::Url{PostUri.c_str()});
- CHECK(Response.status_code == 201);
- }
-
- {
- auto Response = cpr::Get(cpr::Url{BaseUri.c_str()});
- CHECK(Response.status_code == 200);
-
- zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();
-
- CHECK(ResponseObject["id"].AsString() == "foobar"sv);
- CHECK(ResponseObject["project"].AsString() == "test"sv);
- }
- }
-
- SUBCASE("build store persistence")
- {
- uint8_t AttachData[] = {1, 2, 3};
-
- zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
- zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
-
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "foo"
- << "attachment" << Attach;
-
- const std::string_view ChunkId{
- "00000000"
- "00000000"
- "00010000"};
- auto FileOid = zen::Oid::FromHexString(ChunkId);
-
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << FileOid;
- OpWriter << "clientpath"
- << "/{engine}/client/side/path";
- OpWriter << "serverpath" << BinPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
-
- zen::CbPackage OpPackage(Op);
- OpPackage.AddAttachment(Attach);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- {
- zen::StringBuilder<64> PostUri;
- PostUri << BaseUri << "/new";
- auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
-
- REQUIRE(!Response.error);
- CHECK(Response.status_code == 201);
- }
-
- // Read file data
-
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << BaseUri << "/" << ChunkId;
- auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});
-
- REQUIRE(!Response.error);
- CHECK(Response.status_code == 200);
- }
-
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10";
- auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});
-
- REQUIRE(!Response.error);
- CHECK(Response.status_code == 200);
- CHECK(Response.text.size() == 10);
- }
-
- ZEN_INFO("+++++++");
- }
- SUBCASE("build store op commit") { ZEN_INFO("-------"); }
- SUBCASE("test chunk not found error")
- {
- for (size_t I = 0; I < 65; I++)
- {
- zen::StringBuilder<128> PostUri;
- PostUri << BaseUri << "/f77c781846caead318084604/info";
- auto Response = cpr::Get(cpr::Url{PostUri.c_str()});
-
- REQUIRE(!Response.error);
- CHECK(Response.status_code == 404);
- }
- }
- }
-
- const uint64_t Elapsed = timer.GetElapsedTimeMs();
-
- ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
-}
-
-# if 0 // this is extremely WIP
-TEST_CASE("project.pipe")
-{
- using namespace std::literals;
-
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- const uint16_t PortNumber = 13337;
-
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetTestDir(TestDir);
- Instance1.SpawnServer(PortNumber);
- Instance1.WaitUntilReady();
-
- zen::LocalProjectClient LocalClient(PortNumber);
-
- zen::CbObjectWriter Cbow;
- Cbow << "hey" << 42;
-
- zen::CbObject Response = LocalClient.MessageTransaction(Cbow.Save());
-}
-# endif
-
-namespace utils {
-
- struct ZenConfig
- {
- std::filesystem::path DataDir;
- uint16_t Port;
- std::string BaseUri;
- std::string Args;
-
- static ZenConfig New(uint16_t Port = 13337, std::string Args = "")
- {
- return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(),
- .Port = Port,
- .BaseUri = fmt::format("http://localhost:{}/z$", Port),
- .Args = std::move(Args)};
- }
-
- static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
- {
- return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
- }
-
- static ZenConfig NewWithThreadedUpstreams(std::span<uint16_t> UpstreamPorts, bool Debug)
- {
- std::string Args = Debug ? "--debug" : "";
- for (uint16_t Port : UpstreamPorts)
- {
- Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port);
- }
- return New(13337, Args);
- }
-
- void Spawn(ZenServerInstance& Inst)
- {
- Inst.SetTestDir(DataDir);
- Inst.SpawnServer(Port, Args);
- Inst.WaitUntilReady();
- }
- };
-
- void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg)
- {
- Server.SetTestDir(Cfg.DataDir);
- Server.SpawnServer(Cfg.Port, Cfg.Args);
- Server.WaitUntilReady();
- }
-
-} // namespace utils
-
-TEST_CASE("zcache.basic")
-{
- using namespace std::literals;
-
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- const uint16_t PortNumber = 13337;
-
- const int kIterationCount = 100;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
-
- auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); };
-
- {
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetTestDir(TestDir);
- Instance1.SpawnServer(PortNumber);
- Instance1.WaitUntilReady();
-
- // Populate with some simple data
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- zen::CbObjectWriter Cbo;
- Cbo << "index" << i;
-
- zen::BinaryWriter MemOut;
- Cbo.Save(MemOut);
-
- zen::IoHash Key = HashKey(i);
-
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)},
- cpr::Body{(const char*)MemOut.Data(), MemOut.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cb"}});
-
- CHECK(Result.status_code == 201);
- }
-
- // Retrieve data
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i);
-
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
-
- CHECK(Result.status_code == 200);
- }
-
- // Ensure bad bucket identifiers are rejected
-
- {
- zen::CbObjectWriter Cbo;
- Cbo << "index" << 42;
-
- zen::BinaryWriter MemOut;
- Cbo.Save(MemOut);
-
- zen::IoHash Key = HashKey(442);
-
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "te!st", Key)},
- cpr::Body{(const char*)MemOut.Data(), MemOut.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cb"}});
-
- CHECK(Result.status_code == 400);
- }
- }
-
- // Verify that the data persists between process runs (the previous server has exited at this point)
-
- {
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetTestDir(TestDir);
- Instance1.SpawnServer(PortNumber);
- Instance1.WaitUntilReady();
-
- // Retrieve data again
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- zen::IoHash Key = HashKey(i);
-
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
-
- CHECK(Result.status_code == 200);
- }
- }
-}
-
-TEST_CASE("zcache.cbpackage")
-{
- using namespace std::literals;
-
- auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
- auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
- auto CompressedData = zen::CompressedBuffer::Compress(Data);
-
- OutAttachmentKey = CompressedData.DecodeRawHash();
-
- zen::CbWriter Obj;
- Obj.BeginObject("obj"sv);
- Obj.AddBinaryAttachment("data", OutAttachmentKey);
- Obj.EndObject();
-
- zen::CbPackage Package;
- Package.SetObject(Obj.Save().AsObject());
- Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
-
- return Package;
- };
-
- auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
- zen::BinaryWriter MemStream;
-
- Package.Save(MemStream);
-
- return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- };
-
- auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool {
- std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments();
- std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments();
-
- if (LhsAttachments.size() != RhsAttachments.size())
- {
- return false;
- }
-
- for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
- {
- const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
- CHECK(RhsAttachment);
-
- zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
- CHECK(!LhsBuffer.IsNull());
-
- zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress();
- CHECK(!RhsBuffer.IsNull());
-
- if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView()))
- {
- return false;
- }
- }
-
- return true;
- };
-
- SUBCASE("PUT/GET returns correct package")
- {
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const uint16_t PortNumber = 13337;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
-
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetTestDir(TestDir);
- Instance1.SpawnServer(PortNumber);
- Instance1.WaitUntilReady();
-
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
-
- // PUT
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Body.Data(), Body.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- // GET
- {
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
-
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Response);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
- }
-
- SUBCASE("PUT propagates upstream")
- {
- // Setup local and remote server
- std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
- std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
- const uint16_t LocalPortNumber = 13337;
- const uint16_t RemotePortNumber = 13338;
-
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
-
- ZenServerInstance RemoteInstance(TestEnv);
- RemoteInstance.SetTestDir(RemoteDataDir);
- RemoteInstance.SpawnServer(RemotePortNumber);
- RemoteInstance.WaitUntilReady();
-
- ZenServerInstance LocalInstance(TestEnv);
- LocalInstance.SetTestDir(LocalDataDir);
- LocalInstance.SpawnServer(LocalPortNumber,
- fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
- LocalInstance.WaitUntilReady();
-
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
-
- // Store the cache record package in the local instance
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)},
- cpr::Body{(const char*)Body.Data(), Body.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
-
- CHECK(Result.status_code == 201);
- }
-
- // The cache record can be retrieved as a package from the local instance
- {
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
-
- // The cache record can be retrieved as a package from the remote instance
- {
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
- }
-
- SUBCASE("GET finds upstream when missing in local")
- {
- // Setup local and remote server
- std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
- std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
- const uint16_t LocalPortNumber = 13337;
- const uint16_t RemotePortNumber = 13338;
-
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
-
- ZenServerInstance RemoteInstance(TestEnv);
- RemoteInstance.SetTestDir(RemoteDataDir);
- RemoteInstance.SpawnServer(RemotePortNumber);
- RemoteInstance.WaitUntilReady();
-
- ZenServerInstance LocalInstance(TestEnv);
- LocalInstance.SetTestDir(LocalDataDir);
- LocalInstance.SpawnServer(LocalPortNumber,
- fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
- LocalInstance.WaitUntilReady();
-
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
-
- // Store the cache record package in upstream cache
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)},
- cpr::Body{(const char*)Body.Data(), Body.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
-
- CHECK(Result.status_code == 201);
- }
-
- // The cache record can be retrieved as a package from the local cache
- {
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
- }
-}
-
-TEST_CASE("zcache.policy")
-{
- using namespace std::literals;
- using namespace utils;
-
- auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer {
- auto Buf = zen::UniqueBuffer::Alloc(Size);
- uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData());
- for (uint64_t Idx = 0; Idx < Size; Idx++)
- {
- Data[Idx] = Idx % 256;
- }
- OutHash = zen::IoHash::HashBuffer(Data, Size);
- return Buf;
- };
-
- auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
- auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
- auto CompressedData = zen::CompressedBuffer::Compress(Data);
- OutAttachmentKey = CompressedData.DecodeRawHash();
-
- zen::CbWriter Writer;
- Writer.BeginObject("obj"sv);
- Writer.AddBinaryAttachment("data", OutAttachmentKey);
- Writer.EndObject();
- CbObject CacheRecord = Writer.Save().AsObject();
-
- OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView());
-
- zen::CbPackage Package;
- Package.SetObject(CacheRecord);
- Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
-
- return Package;
- };
-
- auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
- zen::BinaryWriter MemStream;
- Package.Save(MemStream);
-
- return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- };
-
- SUBCASE("query - 'local' does not query upstream (binary)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "legacy"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- auto BinaryValue = GenerateData(1024, Key);
-
- // Store binary cache value upstream
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
- cpr::Header{{"Content-Type", "application/octet-stream"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(Result.status_code == 404);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(Result.status_code == 200);
- }
- }
-
- SUBCASE("store - 'local' does not store upstream (binary)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "legacy"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- auto BinaryValue = GenerateData(1024, Key);
-
- // Store binary cache value locally
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
- cpr::Header{{"Content-Type", "application/octet-stream"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(Result.status_code == 404);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(Result.status_code == 200);
- }
- }
-
- SUBCASE("store - 'local/remote' stores local and upstream (binary)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "legacy"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- auto BinaryValue = GenerateData(1024, Key);
-
- // Store binary cache value locally and upstream
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
- cpr::Header{{"Content-Type", "application/octet-stream"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(Result.status_code == 200);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(Result.status_code == 200);
- }
- }
-
- SUBCASE("query - 'local' does not query upstream (cppackage)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "legacy"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- zen::IoHash PayloadId;
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
-
- // Store package upstream
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 404);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
- }
- }
-
- SUBCASE("store - 'local' does not store upstream (cbpackge)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "legacy"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- zen::IoHash PayloadId;
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
-
- // Store packge locally
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 404);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
- }
- }
-
- SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "legacy"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- zen::IoHash PayloadId;
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
-
- // Store package locally and upstream
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
- }
- }
-
- SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
- {
- ZenConfig Cfg = ZenConfig::New();
- ZenServerInstance Instance(TestEnv);
- const auto Bucket = "test"sv;
-
- Cfg.Spawn(Instance);
-
- zen::IoHash Key;
- zen::IoHash PayloadId;
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
-
- // Store package
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- // Get package
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(IsHttpSuccessCode(Result.status_code));
- IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
- CbPackage ResponsePackage;
- CHECK(ResponsePackage.TryLoad(Buffer));
- CHECK(ResponsePackage.GetAttachments().size() == 0);
- }
-
- // Get record
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cb"}});
- CHECK(IsHttpSuccessCode(Result.status_code));
- IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
- CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer);
- CHECK((bool)ResponseObject);
- }
-
- // Get payload
- {
- cpr::Response Result =
- cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)},
- cpr::Header{{"Accept", "application/x-ue-comp"}});
- CHECK(IsHttpSuccessCode(Result.status_code));
- CHECK(Result.text.size() == 0);
- }
- }
-
- SUBCASE("skip - 'data' returns empty binary value")
- {
- ZenConfig Cfg = ZenConfig::New();
- ZenServerInstance Instance(TestEnv);
- const auto Bucket = "test"sv;
-
- Cfg.Spawn(Instance);
-
- zen::IoHash Key;
- auto BinaryValue = GenerateData(1024, Key);
-
- // Store binary cache value
- {
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
- cpr::Header{{"Content-Type", "application/octet-stream"}});
- CHECK(Result.status_code == 201);
- }
-
- // Get package
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/octet-stream"}});
- CHECK(IsHttpSuccessCode(Result.status_code));
- CHECK(Result.text.size() == 0);
- }
- }
-}
-
-TEST_CASE("zcache.rpc")
-{
- using namespace std::literals;
-
- auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
- const zen::CacheKey& CacheKey,
- size_t PayloadSize,
- CachePolicy RecordPolicy) {
- std::vector<uint8_t> Data;
- Data.resize(PayloadSize);
- uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]);
- uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
- for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx)
- {
- DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu);
- }
- if (PayloadSize & 1)
- {
- Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff);
- }
- CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
- Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
- };
-
- auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
- std::string_view Namespace,
- std::string_view Bucket,
- size_t Num,
- size_t PayloadSize = 1024,
- size_t KeyOffset = 1) -> std::vector<CacheKey> {
- std::vector<zen::CacheKey> OutKeys;
-
- for (uint32_t Key = 1; Key <= Num; ++Key)
- {
- zen::IoHash KeyHash;
- ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key);
- const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
-
- cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
- AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
- OutKeys.push_back(CacheKey);
-
- CbPackage Package;
- CHECK(Request.Format(Package));
-
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
-
- CHECK(Result.status_code == 200);
- }
-
- return OutKeys;
- };
-
- struct GetCacheRecordResult
- {
- zen::CbPackage Response;
- cacherequests::GetCacheRecordsResult Result;
- bool Success;
- };
-
- auto GetCacheRecords = [](std::string_view BaseUri,
- std::string_view Namespace,
- std::span<zen::CacheKey> Keys,
- zen::CachePolicy Policy,
- zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone,
- int Pid = 0) -> GetCacheRecordResult {
- cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
- .AcceptOptions = static_cast<uint16_t>(AcceptOptions),
- .ProcessPid = Pid,
- .DefaultPolicy = Policy,
- .Namespace = std::string(Namespace)};
- for (const CacheKey& Key : Keys)
- {
- Request.Requests.push_back({.Key = Key});
- }
-
- CbObjectWriter RequestWriter;
- CHECK(Request.Format(RequestWriter));
-
- BinaryWriter Body;
- RequestWriter.Save(Body);
-
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
-
- GetCacheRecordResult OutResult;
-
- if (Result.status_code == 200)
- {
- CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
- if (!Response.IsNull())
- {
- OutResult.Response = std::move(Response);
- CHECK(OutResult.Result.Parse(OutResult.Response));
- OutResult.Success = true;
- }
- }
-
- return OutResult;
- };
-
- SUBCASE("get cache records")
- {
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const uint16_t PortNumber = 13337;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
-
- ZenServerInstance Inst(TestEnv);
- Inst.SetTestDir(TestDir);
- Inst.SpawnServer(PortNumber);
- Inst.WaitUntilReady();
-
- CachePolicy Policy = CachePolicy::Default;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
- GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record);
- CHECK(Record->Key == ExpectedKey);
- CHECK(Record->Values.size() == 1);
-
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- CHECK(Value.Body);
- }
- }
- }
-
- SUBCASE("get missing cache records")
- {
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const uint16_t PortNumber = 13337;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
-
- ZenServerInstance Inst(TestEnv);
- Inst.SetTestDir(TestDir);
- Inst.SpawnServer(PortNumber);
- Inst.WaitUntilReady();
-
- CachePolicy Policy = CachePolicy::Default;
- std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
- std::vector<zen::CacheKey> Keys;
-
- for (const zen::CacheKey& Key : ExistingKeys)
- {
- Keys.push_back(Key);
- Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero));
- }
-
- GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- size_t KeyIndex = 0;
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- const bool Missing = Index++ % 2 != 0;
-
- if (Missing)
- {
- CHECK(!Record);
- }
- else
- {
- const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- CHECK(Value.Body);
- }
- }
- }
- }
-
- SUBCASE("policy - 'QueryLocal' does not query upstream")
- {
- using namespace utils;
-
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamServer(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalServer(TestEnv);
-
- SpawnServer(UpstreamServer, UpstreamCfg);
- SpawnServer(LocalServer, LocalCfg);
-
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);
-
- CachePolicy Policy = CachePolicy::QueryLocal;
- GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(!Record);
- }
- }
-
- SUBCASE("policy - 'QueryRemote' does query upstream")
- {
- using namespace utils;
-
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamServer(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalServer(TestEnv);
-
- SpawnServer(UpstreamServer, UpstreamCfg);
- SpawnServer(LocalServer, LocalCfg);
-
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);
-
- CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
- GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- }
- }
-
- SUBCASE("RpcAcceptOptions")
- {
- using namespace utils;
-
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const uint16_t PortNumber = 13337;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
-
- ZenServerInstance Inst(TestEnv);
- Inst.SetTestDir(TestDir);
- Inst.SpawnServer(PortNumber);
- Inst.WaitUntilReady();
-
- std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024);
- std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size());
-
- std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end());
- Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end());
-
- {
- GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
- IoBufferFileReference Ref;
- bool IsFileRef = Body.GetFileReference(Ref);
- CHECK(!IsFileRef);
- }
- }
- }
-
- // File path, but only for large files
- {
- GetCacheRecordResult Result =
- GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
- IoBufferFileReference Ref;
- bool IsFileRef = Body.GetFileReference(Ref);
- CHECK(IsFileRef == (Body.Size() > 1024));
- }
- }
- }
-
- // File path, for all files
- {
- GetCacheRecordResult Result =
- GetCacheRecords(BaseUri,
- "ue4.ddc"sv,
- Keys,
- CachePolicy::Default,
- RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences);
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
- IoBufferFileReference Ref;
- bool IsFileRef = Body.GetFileReference(Ref);
- CHECK(IsFileRef);
- }
- }
- }
-
- // File handle, but only for large files
- {
- GetCacheRecordResult Result = GetCacheRecords(BaseUri,
- "ue4.ddc"sv,
- Keys,
- CachePolicy::Default,
- RpcAcceptOptions::kAllowLocalReferences,
- GetCurrentProcessId());
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
- IoBufferFileReference Ref;
- bool IsFileRef = Body.GetFileReference(Ref);
- CHECK(IsFileRef == (Body.Size() > 1024));
- }
- }
- }
-
- // File handle, for all files
- {
- GetCacheRecordResult Result =
- GetCacheRecords(BaseUri,
- "ue4.ddc"sv,
- Keys,
- CachePolicy::Default,
- RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences,
- GetCurrentProcessId());
-
- CHECK(Result.Result.Results.size() == Keys.size());
-
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- CHECK(Record);
- const CacheKey& ExpectedKey = Keys[Index++];
- CHECK(Record->Key == ExpectedKey);
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
- IoBufferFileReference Ref;
- bool IsFileRef = Body.GetFileReference(Ref);
- CHECK(IsFileRef);
- }
- }
- }
- }
-}
-
-TEST_CASE("zcache.failing.upstream")
-{
- // This is an exploratory test that takes a long time to run, so lets skip it by default
- if (true)
- {
- return;
- }
-
- using namespace std::literals;
- using namespace utils;
-
- const uint16_t Upstream1PortNumber = 13338;
- ZenConfig Upstream1Cfg = ZenConfig::New(Upstream1PortNumber);
- ZenServerInstance Upstream1Server(TestEnv);
-
- const uint16_t Upstream2PortNumber = 13339;
- ZenConfig Upstream2Cfg = ZenConfig::New(Upstream2PortNumber);
- ZenServerInstance Upstream2Server(TestEnv);
-
- std::vector<std::uint16_t> UpstreamPorts = {Upstream1PortNumber, Upstream2PortNumber};
- ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(UpstreamPorts, false);
- LocalCfg.Args += (" --upstream-thread-count 2");
- ZenServerInstance LocalServer(TestEnv);
- const uint16_t LocalPortNumber = 13337;
- const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1PortNumber);
- const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2PortNumber);
-
- SpawnServer(Upstream1Server, Upstream1Cfg);
- SpawnServer(Upstream2Server, Upstream2Cfg);
- SpawnServer(LocalServer, LocalCfg);
- bool Upstream1Running = true;
- bool Upstream2Running = true;
-
- using namespace std::literals;
-
- auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
- const zen::CacheKey& CacheKey,
- size_t PayloadSize,
- CachePolicy RecordPolicy) {
- std::vector<uint32_t> Data;
- Data.resize(PayloadSize / 4);
- for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx)
- {
- Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx;
- }
-
- CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4));
- Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
- };
-
- auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
- std::string_view Namespace,
- std::string_view Bucket,
- size_t Num,
- size_t KeyOffset,
- size_t PayloadSize = 8192) -> std::vector<CacheKey> {
- std::vector<zen::CacheKey> OutKeys;
-
- cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
- for (size_t Key = 1; Key <= Num; ++Key)
- {
- zen::IoHash KeyHash;
- ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key;
- const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
-
- AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
- OutKeys.push_back(CacheKey);
- }
-
- CbPackage Package;
- CHECK(Request.Format(Package));
-
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
-
- if (Result.status_code != 200)
- {
- ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
- OutKeys.clear();
- }
-
- return OutKeys;
- };
-
- struct GetCacheRecordResult
- {
- zen::CbPackage Response;
- cacherequests::GetCacheRecordsResult Result;
- bool Success = false;
- };
-
- auto GetCacheRecords = [](std::string_view BaseUri,
- std::string_view Namespace,
- std::span<zen::CacheKey> Keys,
- zen::CachePolicy Policy) -> GetCacheRecordResult {
- cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
- .DefaultPolicy = Policy,
- .Namespace = std::string(Namespace)};
- for (const CacheKey& Key : Keys)
- {
- Request.Requests.push_back({.Key = Key});
- }
-
- CbObjectWriter RequestWriter;
- CHECK(Request.Format(RequestWriter));
-
- BinaryWriter Body;
- RequestWriter.Save(Body);
-
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
-
- GetCacheRecordResult OutResult;
-
- if (Result.status_code == 200)
- {
- CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
- if (!Response.IsNull())
- {
- OutResult.Response = std::move(Response);
- CHECK(OutResult.Result.Parse(OutResult.Response));
- OutResult.Success = true;
- }
- }
- else
- {
- ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code);
- }
-
- return OutResult;
- };
-
- // Populate with some simple data
-
- CachePolicy Policy = CachePolicy::Default;
-
- const size_t ThreadCount = 128;
- const size_t KeyMultiplier = 16384;
- const size_t RecordsPerRequest = 64;
- WorkerThreadPool Pool(ThreadCount);
-
- std::atomic_size_t Completed = 0;
-
- auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier];
- RwLock KeysLock;
-
- for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
- {
- size_t Iteration = I;
- Pool.ScheduleWork([&] {
- std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
- if (NewKeys.size() != RecordsPerRequest)
- {
- ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
- Completed.fetch_add(1);
- return;
- }
- {
- RwLock::ExclusiveLockScope _(KeysLock);
- Keys[Iteration].swap(NewKeys);
- }
- Completed.fetch_add(1);
- });
- }
- bool UseUpstream1 = false;
- while (Completed < ThreadCount * KeyMultiplier)
- {
- Sleep(8000);
-
- if (UseUpstream1)
- {
- if (Upstream2Running)
- {
- Upstream2Server.EnableTermination();
- Upstream2Server.Shutdown();
- Sleep(100);
- Upstream2Running = false;
- }
- if (!Upstream1Running)
- {
- SpawnServer(Upstream1Server, Upstream1Cfg);
- Upstream1Running = true;
- }
- UseUpstream1 = !UseUpstream1;
- }
- else
- {
- if (Upstream1Running)
- {
- Upstream1Server.EnableTermination();
- Upstream1Server.Shutdown();
- Sleep(100);
- Upstream1Running = false;
- }
- if (!Upstream2Running)
- {
- SpawnServer(Upstream2Server, Upstream2Cfg);
- Upstream2Running = true;
- }
- UseUpstream1 = !UseUpstream1;
- }
- }
-
- Completed = 0;
- for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
- {
- size_t Iteration = I;
- std::vector<CacheKey>& LocalKeys = Keys[Iteration];
- if (LocalKeys.empty())
- {
- Completed.fetch_add(1);
- continue;
- }
- Pool.ScheduleWork([&] {
- GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);
-
- if (!Result.Success)
- {
- ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
- Completed.fetch_add(1);
- return;
- }
-
- if (Result.Result.Results.size() != LocalKeys.size())
- {
- ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
- Completed.fetch_add(1);
- return;
- }
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- const CacheKey& ExpectedKey = LocalKeys[Index++];
- if (!Record)
- {
- continue;
- }
- if (Record->Key != ExpectedKey)
- {
- continue;
- }
- if (Record->Values.size() != 1)
- {
- continue;
- }
-
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
- {
- if (!Value.Body)
- {
- continue;
- }
- }
- }
- Completed.fetch_add(1);
- });
- }
- while (Completed < ThreadCount * KeyMultiplier)
- {
- Sleep(10);
- }
-}
-
-TEST_CASE("zcache.rpc.allpolicies")
-{
- using namespace std::literals;
- using namespace utils;
-
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamServer(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalServer(TestEnv);
- const uint16_t LocalPortNumber = 13337;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
-
- SpawnServer(UpstreamServer, UpstreamCfg);
- SpawnServer(LocalServer, LocalCfg);
-
- std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
- std::string_view TestBucket = "allpoliciestest"sv;
- std::string_view TestNamespace = "ue4.ddc"sv;
-
- // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
- // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
- constexpr int NumKeys = 256;
- constexpr int NumValues = 4;
- Oid ValueIds[NumValues];
- IoHash Hash;
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- ExtendableStringBuilder<16> ValueName;
- ValueName << "ValueId_"sv << ValueIndex;
- static_assert(sizeof(IoHash) >= sizeof(Oid));
- ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
- }
-
- struct KeyData;
- struct UserData
- {
- UserData& Set(KeyData* InKeyData, int InValueIndex)
- {
- Data = InKeyData;
- ValueIndex = InValueIndex;
- return *this;
- }
- KeyData* Data = nullptr;
- int ValueIndex = 0;
- };
- struct KeyData
- {
- CompressedBuffer BufferValues[NumValues];
- uint64_t IntValues[NumValues];
- UserData ValueUserData[NumValues];
- bool ReceivedChunk[NumValues];
- CacheKey Key;
- UserData KeyUserData;
- uint32_t KeyIndex = 0;
- bool GetRequestsData = true;
- bool UseValueAPI = false;
- bool UseValuePolicy = false;
- bool ForceMiss = false;
- bool UseLocal = true;
- bool UseRemote = true;
- bool ShouldBeHit = true;
- bool ReceivedPut = false;
- bool ReceivedGet = false;
- bool ReceivedPutValue = false;
- bool ReceivedGetValue = false;
- };
- struct CachePutRequest
- {
- CacheKey Key;
- CbObject Record;
- CacheRecordPolicy Policy;
- KeyData* Values;
- UserData* Data;
- };
- struct CachePutValueRequest
- {
- CacheKey Key;
- CompressedBuffer Value;
- CachePolicy Policy;
- UserData* Data;
- };
- struct CacheGetRequest
- {
- CacheKey Key;
- CacheRecordPolicy Policy;
- UserData* Data;
- };
- struct CacheGetValueRequest
- {
- CacheKey Key;
- CachePolicy Policy;
- UserData* Data;
- };
- struct CacheGetChunkRequest
- {
- CacheKey Key;
- Oid ValueId;
- uint64_t RawOffset;
- uint64_t RawSize;
- IoHash RawHash;
- CachePolicy Policy;
- UserData* Data;
- };
-
- KeyData KeyDatas[NumKeys];
- std::vector<CachePutRequest> PutRequests;
- std::vector<CachePutValueRequest> PutValueRequests;
- std::vector<CacheGetRequest> GetRequests;
- std::vector<CacheGetValueRequest> GetValueRequests;
- std::vector<CacheGetChunkRequest> ChunkRequests;
-
- for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
- {
- IoHashStream KeyWriter;
- KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
- KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
- IoHash KeyHash = KeyWriter.GetHash();
- KeyData& KeyData = KeyDatas[KeyIndex];
-
- KeyData.Key = CacheKey::Create(TestBucket, KeyHash);
- KeyData.KeyIndex = KeyIndex;
- KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0;
- KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0;
- KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0;
- KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0;
- KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0;
- KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0;
- KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
- CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
- SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
- CachePolicy PutPolicy = SharedPolicy;
- CachePolicy GetPolicy = SharedPolicy;
- GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
- CacheKey& Key = KeyData.Key;
-
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
- KeyData.BufferValues[ValueIndex] =
- CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
- KeyData.ReceivedChunk[ValueIndex] = false;
- }
-
- UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
- }
- if (!KeyData.UseValueAPI)
- {
- CbObjectWriter Builder;
- Builder.BeginObject("key"sv);
- Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
- Builder.EndObject();
- Builder.BeginArray("Values"sv);
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- Builder.BeginObject();
- Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
- Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash());
- Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize());
- Builder.EndObject();
- }
- Builder.EndArray();
-
- CacheRecordPolicy PutRecordPolicy;
- CacheRecordPolicy GetRecordPolicy;
- if (!KeyData.UseValuePolicy)
- {
- PutRecordPolicy = CacheRecordPolicy(PutPolicy);
- GetRecordPolicy = CacheRecordPolicy(GetPolicy);
- }
- else
- {
- // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
- // it will use the wrong value for SkipData and fail our tests.
- CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
- CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
- GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
- }
- PutRecordPolicy = PutBuilder.Build();
- GetRecordPolicy = GetBuilder.Build();
- }
- if (!KeyData.ForceMiss)
- {
- PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
- }
- GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
- ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
- }
- }
- else
- {
- if (!KeyData.ForceMiss)
- {
- PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
- }
- GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
- ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
- }
- }
-
- // PutCacheRecords
- {
- CachePolicy BatchDefaultPolicy = CachePolicy::Default;
- cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
- .DefaultPolicy = BatchDefaultPolicy,
- .Namespace = std::string(TestNamespace)};
- Request.Requests.reserve(PutRequests.size());
- for (CachePutRequest& PutRequest : PutRequests)
- {
- cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back();
- RecordRequest.Key = PutRequest.Key;
- RecordRequest.Policy = PutRequest.Policy;
- RecordRequest.Values.reserve(NumValues);
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]});
- }
- PutRequest.Data->Data->ReceivedPut = true;
- }
-
- CbPackage Package;
- CHECK(Request.Format(Package));
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK_MESSAGE(Result.status_code == 200, "PutCacheRecords unexpectedly failed.");
- }
-
- // PutCacheValues
- {
- CachePolicy BatchDefaultPolicy = CachePolicy::Default;
-
- cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic,
- .DefaultPolicy = BatchDefaultPolicy,
- .Namespace = std::string(TestNamespace)};
- Request.Requests.reserve(PutValueRequests.size());
- for (CachePutValueRequest& PutRequest : PutValueRequests)
- {
- Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy});
- PutRequest.Data->Data->ReceivedPutValue = true;
- }
-
- CbPackage Package;
- CHECK(Request.Format(Package));
-
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK_MESSAGE(Result.status_code == 200, "PutCacheValues unexpectedly failed.");
- }
-
- for (KeyData& KeyData : KeyDatas)
- {
- if (!KeyData.ForceMiss)
- {
- if (!KeyData.UseValueAPI)
- {
- CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str());
- }
- else
- {
- CHECK_MESSAGE(KeyData.ReceivedPutValue,
- WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str());
- }
- }
- }
-
- // GetCacheRecords
- {
- CachePolicy BatchDefaultPolicy = CachePolicy::Default;
- cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
- .DefaultPolicy = BatchDefaultPolicy,
- .Namespace = std::string(TestNamespace)};
- Request.Requests.reserve(GetRequests.size());
- for (CacheGetRequest& GetRequest : GetRequests)
- {
- Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
- }
-
- CbPackage Package;
- CHECK(Request.Format(Package));
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed.");
- CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
- bool Loaded = !Response.IsNull();
- CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load.");
- cacherequests::GetCacheRecordsResult RequestResult;
- CHECK(RequestResult.Parse(Response));
- CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count.");
- for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results)
- {
- bool Succeeded = RecordResult.has_value();
- CacheGetRequest& GetRequest = GetRequests[Index++];
- KeyData* KeyData = GetRequest.Data->Data;
- KeyData->ReceivedGet = true;
- WriteToString<32> Name("Get(", KeyData->KeyIndex, ")");
- if (KeyData->ShouldBeHit)
- {
- CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
- }
- else if (KeyData->ForceMiss)
- {
- CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str());
- }
- if (!KeyData->ForceMiss && Succeeded)
- {
- CHECK_MESSAGE(RecordResult->Values.size() == NumValues,
- WriteToString<32>(Name, " number of values did not match.").c_str());
- for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values)
- {
- int ExpectedValueIndex = 0;
- for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
- {
- if (ValueIds[ExpectedValueIndex] == Value.Id)
- {
- break;
- }
- }
- CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str());
-
- WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")");
-
- CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
- CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(),
- WriteToString<32>(ValueName, " RawHash did not match.").c_str());
- CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(),
- WriteToString<32>(ValueName, " RawSize did not match.").c_str());
-
- if (KeyData->GetRequestsData)
- {
- SharedBuffer Buffer = Value.Body.Decompress();
- CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize,
- WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str());
- uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
- uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
- CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str());
- }
- }
- }
- }
- }
-
- // GetCacheValues
- {
- CachePolicy BatchDefaultPolicy = CachePolicy::Default;
-
- cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic,
- .DefaultPolicy = BatchDefaultPolicy,
- .Namespace = std::string(TestNamespace)};
- GetCacheValuesRequest.Requests.reserve(GetValueRequests.size());
- for (CacheGetValueRequest& GetRequest : GetValueRequests)
- {
- GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
- }
-
- CbPackage Package;
- CHECK(GetCacheValuesRequest.Format(Package));
-
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed.");
- IoBuffer MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- CbPackage Response = ParsePackageMessage(MessageBuffer);
- bool Loaded = !Response.IsNull();
- CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load.");
- cacherequests::GetCacheValuesResult GetCacheValuesResult;
- CHECK(GetCacheValuesResult.Parse(Response));
- for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results)
- {
- bool Succeeded = ValueResult.RawHash != IoHash::Zero;
- CacheGetValueRequest& Request = GetValueRequests[Index++];
- KeyData* KeyData = Request.Data->Data;
- KeyData->ReceivedGetValue = true;
- WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv);
-
- if (KeyData->ShouldBeHit)
- {
- CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
- }
- else if (KeyData->ForceMiss)
- {
- CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str());
- }
- if (!KeyData->ForceMiss && Succeeded)
- {
- CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
- CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
- WriteToString<32>(Name, " RawHash did not match.").c_str());
- CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
- WriteToString<32>(Name, " RawSize did not match.").c_str());
-
- if (KeyData->GetRequestsData)
- {
- SharedBuffer Buffer = ValueResult.Body.Decompress();
- CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
- WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
- uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
- uint64_t ExpectedIntValue = KeyData->IntValues[0];
- CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
- }
- }
- }
- }
-
- // GetCacheChunks
- {
- std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
- return A.Key.Hash < B.Key.Hash;
- });
- CachePolicy BatchDefaultPolicy = CachePolicy::Default;
- cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic,
- .DefaultPolicy = BatchDefaultPolicy,
- .Namespace = std::string(TestNamespace)};
- GetCacheChunksRequest.Requests.reserve(ChunkRequests.size());
- for (CacheGetChunkRequest& ChunkRequest : ChunkRequests)
- {
- GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key,
- .ValueId = ChunkRequest.ValueId,
- .ChunkId = IoHash(),
- .RawOffset = ChunkRequest.RawOffset,
- .RawSize = ChunkRequest.RawSize,
- .Policy = ChunkRequest.Policy});
- }
- CbPackage Package;
- CHECK(GetCacheChunksRequest.Format(Package));
-
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
- cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed.");
- CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
- bool Loaded = !Response.IsNull();
- CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
- cacherequests::GetCacheChunksResult GetCacheChunksResult;
- CHECK(GetCacheChunksResult.Parse(Response));
- CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(),
- "GetCacheChunks response count did not match request count.");
-
- for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results)
- {
- bool Succeeded = ValueResult.RawHash != IoHash::Zero;
-
- CacheGetChunkRequest& Request = ChunkRequests[Index++];
- KeyData* KeyData = Request.Data->Data;
- int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0;
- KeyData->ReceivedChunk[ValueIndex] = true;
- WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv);
-
- if (KeyData->ShouldBeHit)
- {
- CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str());
- }
- else if (KeyData->ForceMiss)
- {
- CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str());
- }
- if (KeyData->ShouldBeHit && Succeeded)
- {
- CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
- CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
- WriteToString<32>(Name, " had unexpected RawHash.").c_str());
- CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
- WriteToString<32>(Name, " had unexpected RawSize.").c_str());
-
- if (KeyData->GetRequestsData)
- {
- SharedBuffer Buffer = ValueResult.Body.Decompress();
- CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
- WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
- uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
- uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
- CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
- }
- }
- }
- }
-
- for (KeyData& KeyData : KeyDatas)
- {
- if (!KeyData.UseValueAPI)
- {
- CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
- for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
- {
- CHECK_MESSAGE(
- KeyData.ReceivedChunk[ValueIndex],
- WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str());
- }
- }
- else
- {
- CHECK_MESSAGE(KeyData.ReceivedGetValue,
- WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
- CHECK_MESSAGE(KeyData.ReceivedChunk[0],
- WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
- }
- }
-}
-
-class ZenServerTestHelper
-{
-public:
- ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {}
- ~ZenServerTestHelper() {}
-
- void SpawnServers(std::string_view AdditionalServerArgs = std::string_view())
- {
- SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs);
- }
-
- void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs)
- {
- ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount);
-
- m_Instances.resize(m_ServerCount);
-
- for (int i = 0; i < m_ServerCount; ++i)
- {
- auto& Instance = m_Instances[i];
-
- Instance = std::make_unique<ZenServerInstance>(TestEnv);
- Instance->SetTestDir(TestEnv.CreateNewTestDir());
-
- Callback(*Instance);
-
- Instance->SpawnServer(13337 + i, AdditionalServerArgs);
- }
-
- for (int i = 0; i < m_ServerCount; ++i)
- {
- auto& Instance = m_Instances[i];
-
- Instance->WaitUntilReady();
- }
- }
-
- ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; }
-
-private:
- std::string m_HelperId;
- int m_ServerCount = 0;
- std::vector<std::unique_ptr<ZenServerInstance> > m_Instances;
-};
-
-class IoDispatcher
-{
-public:
- IoDispatcher(asio::io_context& IoCtx) : m_IoCtx(IoCtx) {}
- ~IoDispatcher() { Stop(); }
-
- void Run()
- {
- Stop();
-
- m_Running = true;
-
- m_IoThread = std::thread([this]() {
- try
- {
- m_IoCtx.run();
- }
- catch (std::exception& Error)
- {
- m_Error = Error;
- }
- });
- }
-
- void Stop()
- {
- if (m_Running)
- {
- m_Running = false;
-
- if (m_IoThread.joinable())
- {
- m_IoThread.join();
- }
- }
- }
-
- bool IsRunning() const { return m_Running; }
-
- const std::exception& Error() { return m_Error; }
-
-private:
- asio::io_context& m_IoCtx;
- std::thread m_IoThread;
- std::exception m_Error;
- std::atomic_bool m_Running{false};
-};
-
-TEST_CASE("http.basics")
-{
- using namespace std::literals;
-
- ZenServerTestHelper Servers{"http.basics"sv, 1};
- Servers.SpawnServers();
-
- ZenServerInstance& Instance = Servers.GetInstance(0);
- const std::string BaseUri = Instance.GetBaseUri();
-
- {
- cpr::Response r = cpr::Get(cpr::Url{fmt::format("{}/testing/hello", BaseUri)});
- CHECK(IsHttpSuccessCode(r.status_code));
- }
-
- {
- cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/hello", BaseUri)});
- CHECK_EQ(r.status_code, 404);
- }
-
- {
- cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/echo", BaseUri)}, cpr::Body{"yoyoyoyo"});
- CHECK_EQ(r.status_code, 200);
- CHECK_EQ(r.text, "yoyoyoyo");
- }
-}
-
-TEST_CASE("http.package")
-{
- using namespace std::literals;
-
- ZenServerTestHelper Servers{"http.package"sv, 1};
- Servers.SpawnServers();
-
- ZenServerInstance& Instance = Servers.GetInstance(0);
- const std::string BaseUri = Instance.GetBaseUri();
-
- static const uint8_t Data1[] = {0, 1, 2, 3};
- static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8};
-
- zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}),
- zen::OodleCompressor::NotSet,
- zen::OodleCompressionLevel::None);
- zen::CbAttachment Attach1{AttachmentData1, AttachmentData1.DecodeRawHash()};
- zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}),
- zen::OodleCompressor::NotSet,
- zen::OodleCompressionLevel::None);
- zen::CbAttachment Attach2{AttachmentData2, AttachmentData2.DecodeRawHash()};
-
- zen::CbObjectWriter Writer;
-
- Writer.AddAttachment("attach1", Attach1);
- Writer.AddAttachment("attach2", Attach2);
-
- zen::CbObject CoreObject = Writer.Save();
-
- zen::CbPackage TestPackage;
- TestPackage.SetObject(CoreObject);
- TestPackage.AddAttachment(Attach1);
- TestPackage.AddAttachment(Attach2);
-
- zen::HttpClient TestClient(BaseUri);
- zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage);
-
- zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
-
- CHECK_EQ(ResponsePackage, TestPackage);
-}
-
-TEST_CASE("websocket.basic")
-{
- if (true)
- {
- return;
- }
-
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const uint16_t PortNumber = 13337;
- const auto MaxWaitTime = std::chrono::seconds(5);
-
- ZenServerInstance Inst(TestEnv);
- Inst.SetTestDir(TestDir);
- Inst.SpawnServer(PortNumber, "--websocket-port=8848"sv);
- Inst.WaitUntilReady();
-
- asio::io_context IoCtx;
- IoDispatcher IoDispatcher(IoCtx);
- auto WebSocket = WebSocketClient::Create(IoCtx);
-
- auto ConnectFuture = WebSocket->Connect({.Host = "127.0.0.1", .Port = 8848, .Endpoint = "/zen"});
- IoDispatcher.Run();
-
- ConnectFuture.wait_for(MaxWaitTime);
- CHECK(ConnectFuture.get());
-
- for (size_t Idx = 0; Idx < 10; Idx++)
- {
- CbObjectWriter Request;
- Request << "Method"sv
- << "SayHello"sv;
-
- WebSocketMessage RequestMsg;
- RequestMsg.SetMessageType(WebSocketMessageType::kRequest);
- RequestMsg.SetBody(Request.Save());
-
- auto ResponseFuture = WebSocket->SendRequest(std::move(RequestMsg));
- ResponseFuture.wait_for(MaxWaitTime);
-
- CbObject Response = ResponseFuture.get().Body().GetObject();
- std::string_view Message = Response["Result"].AsString();
-
- CHECK(Message == "Hello Friend!!"sv);
- }
-
- WebSocket->Disconnect();
-
- IoCtx.stop();
- IoDispatcher.Stop();
-}
-
-std::string
-OidAsString(const Oid& Id)
-{
- StringBuilder<25> OidStringBuilder;
- Id.ToString(OidStringBuilder);
- return OidStringBuilder.ToString();
-}
-
-CbPackage
-CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments)
-{
- CbPackage Package;
- CbObjectWriter Object;
- Object << "key"sv << OidAsString(Id);
- if (!Attachments.empty())
- {
- Object.BeginArray("bulkdata");
- for (const auto& Attachment : Attachments)
- {
- CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
- Object.BeginObject();
- Object << "id"sv << Attachment.first;
- Object << "type"sv
- << "Standard"sv;
- Object << "data"sv << Attach;
- Object.EndObject();
-
- Package.AddAttachment(Attach);
- ZEN_DEBUG("Added attachment {}", Attach.GetHash());
- }
- Object.EndArray();
- }
- Package.SetObject(Object.Save());
- return Package;
-};
-
-std::vector<std::pair<Oid, CompressedBuffer> >
-CreateAttachments(const std::span<const size_t>& Sizes)
-{
- std::vector<std::pair<Oid, CompressedBuffer> > Result;
- Result.reserve(Sizes.size());
- for (size_t Size : Sizes)
- {
- std::vector<uint8_t> Data;
- Data.resize(Size);
- uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
- for (size_t Idx = 0; Idx < Size / 2; ++Idx)
- {
- DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
- }
- if (Size & 1)
- {
- Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
- }
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
- Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
- }
- return Result;
-}
-
-cpr::Body
-AsBody(const IoBuffer& Payload)
-{
- return cpr::Body{(const char*)Payload.GetData(), Payload.Size()};
-};
-
-enum CbWriterMeta
-{
- BeginObject,
- EndObject,
- BeginArray,
- EndArray
-};
-
-inline CbWriter&
-operator<<(CbWriter& Writer, CbWriterMeta Meta)
-{
- switch (Meta)
- {
- case BeginObject:
- Writer.BeginObject();
- break;
- case EndObject:
- Writer.EndObject();
- break;
- case BeginArray:
- Writer.BeginArray();
- break;
- case EndArray:
- Writer.EndArray();
- break;
- default:
- ZEN_ASSERT(false);
- }
- return Writer;
-}
-
-TEST_CASE("project.remote")
-{
- using namespace std::literals;
-
- ZenServerTestHelper Servers("remote", 3);
- Servers.SpawnServers("--debug");
-
- std::vector<Oid> OpIds;
- OpIds.reserve(24);
- for (size_t I = 0; I < 24; ++I)
- {
- OpIds.emplace_back(Oid::NewOid());
- }
-
- std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments;
- {
- std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269,
- 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759,
- 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045,
- 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
- auto It = AttachmentSizes.begin();
- Attachments[OpIds[0]] = {};
- Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
- Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[4]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
- Attachments[OpIds[5]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
- Attachments[OpIds[6]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[7]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
- Attachments[OpIds[8]] = CreateAttachments(std::initializer_list<size_t>{});
- Attachments[OpIds[9]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
- Attachments[OpIds[10]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[11]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
- Attachments[OpIds[12]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
- Attachments[OpIds[13]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[14]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
- Attachments[OpIds[15]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
- Attachments[OpIds[16]] = CreateAttachments(std::initializer_list<size_t>{});
- Attachments[OpIds[17]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
- Attachments[OpIds[18]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
- Attachments[OpIds[19]] = CreateAttachments(std::initializer_list<size_t>{});
- Attachments[OpIds[20]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[21]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- Attachments[OpIds[22]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
- Attachments[OpIds[23]] = CreateAttachments(std::initializer_list<size_t>{*It++});
- ZEN_ASSERT(It == AttachmentSizes.end());
- }
-
- auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
- XXH3_128Stream KeyHasher;
- Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash = KeyHasher.GetHash();
- Oid Id;
- memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits);
- IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
- const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
- Ops.insert({Id, OpCoreHash});
- };
-
- auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) {
- CbObjectWriter Project;
- Project.AddString("id"sv, ProjectName);
- Project.AddString("root"sv, ""sv);
- Project.AddString("engine"sv, ""sv);
- Project.AddString("project"sv, ""sv);
- Project.AddString("projectfile"sv, ""sv);
- IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer();
- std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName);
- Session.SetUrl({ProjectRequest});
- Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- };
-
- auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) {
- std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName);
- Session.SetUrl({CreateOplogRequest});
- Session.SetBody(cpr::Body{});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- };
-
- auto MakeOp = [](cpr::Session& Session,
- std::string_view UrlBase,
- std::string_view ProjectName,
- std::string_view OplogName,
- const CbPackage& OpPackage) {
- std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName);
- Session.SetUrl({CreateOpRequest});
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
- Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- };
-
- cpr::Session Session;
- MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0");
- MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
-
- std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
- for (const Oid& OpId : OpIds)
- {
- CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]);
- CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size());
- AddOp(OpPackage.GetObject(), SourceOps);
- MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage);
- }
-
- std::vector<IoHash> AttachmentHashes;
- AttachmentHashes.reserve(Attachments.size());
- for (const auto& AttachmentOplog : Attachments)
- {
- for (const auto& Attachment : AttachmentOplog.second)
- {
- AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash());
- }
- }
-
- auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer {
- CbObjectWriter Writer;
- Write(Writer);
- IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer();
- Result.MakeOwned();
- return Result;
- };
-
- auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int ServerIndex,
- std::string_view Project,
- std::string_view Oplog) {
- std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
- Session.SetUrl({GetChunksRequest});
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "getchunks"sv;
- Writer << "chunks"sv << BeginArray;
- for (const IoHash& Chunk : AttachmentHashes)
- {
- Writer << Chunk;
- }
- Writer << EndArray; // chunks
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
- CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
- };
-
- auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) {
- std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps;
- std::vector<CbObject> ResultingOplog;
-
- std::string GetOpsRequest =
- fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
- Session.SetUrl({GetOpsRequest});
- cpr::Response Response = Session.Get();
- CHECK(IsHttpSuccessCode(Response.status_code));
-
- IoBuffer Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
- CbObject OplogResonse = LoadCompactBinaryObject(Payload);
- CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView();
-
- for (CbFieldView OpEntry : EntriesArray)
- {
- CbObjectView Core = OpEntry.AsObjectView();
- BinaryWriter Writer;
- Core.CopyTo(Writer);
- MemoryView OpView = Writer.GetView();
- IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
- CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
- AddOp(Op, TargetOps);
- }
- CHECK(SourceOps == TargetOps);
- };
-
- SUBCASE("File")
- {
- ScopedTemporaryDirectory TempDir;
- {
- std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
- Session.SetUrl({SaveOplogRequest});
-
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "export"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "maxblocksize"sv << 3072u;
- Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
- {
- Writer << "path"sv << path;
- Writer << "name"sv
- << "proj0_oplog0"sv;
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- {
- MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- std::string LoadOplogRequest =
- fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- Session.SetUrl({LoadOplogRequest});
-
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "import"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
- {
- Writer << "path"sv << path;
- Writer << "name"sv
- << "proj0_oplog0"sv;
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
-
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
- }
-
- SUBCASE("File disable blocks")
- {
- ScopedTemporaryDirectory TempDir;
- {
- std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
- Session.SetUrl({SaveOplogRequest});
-
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "export"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "maxblocksize"sv << 3072u;
- Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
- {
- Writer << "path"sv << TempDir.Path().string();
- Writer << "name"sv
- << "proj0_oplog0"sv;
- Writer << "disableblocks"sv << true;
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- {
- MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- std::string LoadOplogRequest =
- fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- Session.SetUrl({LoadOplogRequest});
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "import"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
- {
- Writer << "path"sv << TempDir.Path().string();
- Writer << "name"sv
- << "proj0_oplog0"sv;
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
- }
-
- SUBCASE("File force temp blocks")
- {
- ScopedTemporaryDirectory TempDir;
- {
- std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
- Session.SetUrl({SaveOplogRequest});
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "export"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "maxblocksize"sv << 3072u;
- Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
- {
- Writer << "path"sv << TempDir.Path().string();
- Writer << "name"sv
- << "proj0_oplog0"sv;
- Writer << "enabletempblocks"sv << true;
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- {
- MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- std::string LoadOplogRequest =
- fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- Session.SetUrl({LoadOplogRequest});
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "import"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
- {
- Writer << "path"sv << TempDir.Path().string();
- Writer << "name"sv
- << "proj0_oplog0"sv;
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
- }
-
- SUBCASE("Zen")
- {
- ScopedTemporaryDirectory TempDir;
- {
- std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
- std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
- MakeProject(Session, ExportTargetUri, "proj0_copy");
- MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy");
-
- std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0");
- Session.SetUrl({SaveOplogRequest});
-
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "export"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "maxblocksize"sv << 3072u;
- Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "force"sv << false;
- Writer << "zen"sv << BeginObject;
- {
- Writer << "url"sv << ExportTargetUri.substr(7);
- Writer << "project"
- << "proj0_copy";
- Writer << "oplog"
- << "oplog0_copy";
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
-
- {
- std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
- std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
- MakeProject(Session, ImportTargetUri, "proj1");
- MakeOplog(Session, ImportTargetUri, "proj1", "oplog1");
- std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1");
- Session.SetUrl({LoadOplogRequest});
-
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
- Writer << "method"sv
- << "import"sv;
- Writer << "params" << BeginObject;
- {
- Writer << "force"sv << false;
- Writer << "zen"sv << BeginObject;
- {
- Writer << "url"sv << ImportSourceUri.substr(7);
- Writer << "project"
- << "proj0_copy";
- Writer << "oplog"
- << "oplog0_copy";
- }
- Writer << EndObject; // "file"
- }
- Writer << EndObject; // "params"
- });
- Session.SetBody(AsBody(Payload));
- Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
- cpr::Response Response = Session.Post();
- CHECK(IsHttpSuccessCode(Response.status_code));
- }
- ValidateAttachments(2, "proj1", "oplog1");
- ValidateOplog(2, "proj1", "oplog1");
- }
-}
-
-# if 0
-TEST_CASE("lifetime.owner")
-{
- // This test is designed to verify that the hand-over of sponsor processes is handled
- // correctly for the case when a second or third process is launched on the same port
- //
- // Due to the nature of it, it cannot be
-
- const uint16_t PortNumber = 23456;
-
- ZenServerInstance Zen1(TestEnv);
- std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
- Zen1.SetTestDir(TestDir1);
- Zen1.SpawnServer(PortNumber);
- Zen1.WaitUntilReady();
- Zen1.Detach();
-
- ZenServerInstance Zen2(TestEnv);
- std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
- Zen2.SetTestDir(TestDir2);
- Zen2.SpawnServer(PortNumber);
- Zen2.WaitUntilReady();
- Zen2.Detach();
-}
-
-TEST_CASE("lifetime.owner.2")
-{
- // This test is designed to verify that the hand-over of sponsor processes is handled
- // correctly for the case when a second or third process is launched on the same port
- //
- // Due to the nature of it, it cannot be
-
- const uint16_t PortNumber = 13456;
-
- std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
- std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
-
- ZenServerInstance Zen1(TestEnv);
- Zen1.SetTestDir(TestDir1);
- Zen1.SpawnServer(PortNumber);
- Zen1.WaitUntilReady();
-
- ZenServerInstance Zen2(TestEnv);
- Zen2.SetTestDir(TestDir2);
- Zen2.SetOwnerPid(Zen1.GetPid());
- Zen2.SpawnServer(PortNumber + 1);
- Zen2.Detach();
-
- ZenServerInstance Zen3(TestEnv);
- Zen3.SetTestDir(TestDir2);
- Zen3.SetOwnerPid(Zen1.GetPid());
- Zen3.SpawnServer(PortNumber + 1);
- Zen3.Detach();
-
- ZenServerInstance Zen4(TestEnv);
- Zen4.SetTestDir(TestDir2);
- Zen4.SetOwnerPid(Zen1.GetPid());
- Zen4.SpawnServer(PortNumber + 1);
- Zen4.Detach();
-}
-# endif
-
-} // namespace zen::tests
-#else
-int
-main()
-{
-}
-#endif