// Copyright Epic Games, Inc. All Rights Reserved. #define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") #endif #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// #include "projectclient.h" ////////////////////////////////////////////////////////////////////////// #define DOCTEST_CONFIG_IMPLEMENT #include #undef DOCTEST_CONFIG_IMPLEMENT using namespace fmt::literals; /* ___ ___ _________ _________ ________ ________ ___ ___ _______ ________ _________ |\ \|\ \|\___ ___\\___ ___\\ __ \ |\ ____\|\ \ |\ \|\ ___ \ |\ ___ \|\___ ___\ \ \ \\\ \|___ \ \_\|___ \ \_\ \ \|\ \ \ \ \___|\ \ \ \ \ \ \ __/|\ \ \\ \ \|___ \ \_| \ \ __ \ \ \ \ \ \ \ \ \ ____\ \ \ \ \ \ \ \ \ \ \ \_|/_\ \ \\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \___| \ \ \____\ \ \____\ \ \ \ \_|\ \ \ \\ \ \ \ \ \ \ \__\ \__\ \ \__\ \ \__\ \ \__\ \ \_______\ \_______\ \__\ \_______\ \__\\ \__\ \ \__\ \|__|\|__| \|__| \|__| \|__| \|_______|\|_______|\|__|\|_______|\|__| \|__| \|__| */ class HttpConnectionPool; /** * Http client connection * * Represents an established socket connection to a certain endpoint */ class HttpClientConnection { static HttpClientConnection* This(http_parser* Parser) { return (HttpClientConnection*)Parser->data; }; public: HttpClientConnection(asio::io_context& IoContext, HttpConnectionPool& Pool, asio::ip::tcp::socket&& InSocket) : m_IoContext(IoContext) , m_Pool(Pool) , m_Resolver(IoContext) , m_Socket(std::move(InSocket)) { } ~HttpClientConnection() {} HttpConnectionPool& ConnectionPool() { return m_Pool; } void SetKeepAlive(bool NewState) { m_KeepAlive = NewState; } void Get(const std::string_view Server, int Port, const std::string_view Path) { ZEN_UNUSED(Port); http_parser_init(&m_HttpParser, HTTP_RESPONSE); m_HttpParser.data = this; m_HttpParserSettings = http_parser_settings{ .on_message_begin = [](http_parser* p) -> int { return This(p)->OnMessageBegin(); }, .on_url = nullptr, .on_status = nullptr, .on_header_field = [](http_parser* p, const char* data, size_t size) { return This(p)->OnHeader(data, size); }, .on_header_value = [](http_parser* p, const char* data, size_t size) { return This(p)->OnHeaderValue(data, size); }, .on_headers_complete = [](http_parser* p) -> int { return This(p)->OnHeadersComplete(); }, .on_body = [](http_parser* p, const char* data, size_t size) { return This(p)->OnBody(data, size); }, .on_message_complete = [](http_parser* p) -> int { return This(p)->OnMessageComplete(); }, .on_chunk_header = nullptr, .on_chunk_complete = nullptr}; m_Headers.reserve(16); zen::ExtendableStringBuilder<256> RequestBody; RequestBody << "GET " << Path << " HTTP/1.1\r\n"; RequestBody << "Host: " << Server << "\r\n"; RequestBody << "Accept: */*\r\n"; RequestBody << "Connection: " << (m_KeepAlive ? "keep-alive" : "close") << "\r\n\r\n"; // TODO: support keep-alive m_RequestBody = RequestBody; OnConnected(); } private: void Reset() {} void OnError(const std::error_code& Error) { ZEN_ERROR("HTTP client error! '{}'", Error.message()); } int OnHeader(const char* Data, size_t Bytes) { m_CurrentHeaderName = std::string_view(Data, Bytes); return 0; } int OnHeaderValue(const char* Data, size_t Bytes) { m_Headers.emplace_back(HeaderEntry{m_CurrentHeaderName, {Data, Bytes}}); return 0; } int OnHeadersComplete() { ZEN_DEBUG("Headers complete"); return 0; } int OnMessageComplete() { if (http_should_keep_alive(&m_HttpParser)) { Reset(); } else { m_Socket.close(); m_RequestState = RequestState::Done; } return 0; } int OnMessageBegin() { return 0; } int OnBody(const char* Data, size_t Bytes) { ZEN_UNUSED(Data, Bytes); return 0; } void OnConnected() { // Send initial request payload asio::async_write(m_Socket, asio::const_buffer(m_RequestBody.data(), m_RequestBody.size()), [this](const std::error_code& Error, size_t Bytes) { ZEN_UNUSED(Bytes); if (Error) { return OnError(Error); } OnRequestWritten(); }); } void OnRequestWritten() { asio::async_read(m_Socket, m_ResponseBuffer, asio::transfer_at_least(1), [this](const std::error_code& Error, size_t Bytes) { if (Error) { return OnError(Error); } OnStatusLineRead(Bytes); }); } void OnStatusLineRead(size_t Bytes) { // Parse size_t rv = http_parser_execute(&m_HttpParser, &m_HttpParserSettings, (const char*)m_ResponseBuffer.data(), Bytes); ZEN_UNUSED(rv); if (m_HttpParser.http_errno != 0) { // Something bad! ZEN_ERROR("parse error {}", (uint32_t)m_HttpParser.http_errno); } switch (m_RequestState) { case RequestState::Init: asio::async_read(m_Socket, m_ResponseBuffer, asio::transfer_at_least(1), [this](const std::error_code& Error, size_t Bytes) { if (Error) { return OnError(Error); } OnStatusLineRead(Bytes); }); return; case RequestState::Done: break; } } private: asio::io_context& m_IoContext; HttpConnectionPool& m_Pool; asio::ip::tcp::resolver m_Resolver; asio::ip::tcp::socket m_Socket; std::string m_Uri; std::string m_RequestBody; // Initial request data http_parser m_HttpParser{}; http_parser_settings m_HttpParserSettings{}; uint8_t m_ResponseIoBuffer[4096]; asio::mutable_buffer m_ResponseBuffer{m_ResponseIoBuffer, sizeof m_ResponseIoBuffer}; enum class RequestState { Init, Done }; RequestState m_RequestState = RequestState::Init; struct HeaderEntry { std::string_view Name; std::string_view Value; }; std::string_view m_CurrentHeaderName; // Used while parsing headers std::vector m_Headers; bool m_KeepAlive = false; }; ////////////////////////////////////////////////////////////////////////// class HttpConnectionPool { public: HttpConnectionPool(asio::io_context& Context, std::string_view HostName, uint16_t Port); ~HttpConnectionPool(); std::unique_ptr GetConnection(); void ReturnConnection(std::unique_ptr&& Connection); private: zen::RwLock m_Lock; asio::io_context& m_Context; std::vector m_AvailableConnections; std::string m_HostName; uint16_t m_Port; }; HttpConnectionPool::HttpConnectionPool(asio::io_context& Context, std::string_view HostName, uint16_t Port) : m_Context(Context) , m_HostName(HostName) , m_Port(Port) { } HttpConnectionPool::~HttpConnectionPool() { zen::RwLock::ExclusiveLockScope ScopedLock(m_Lock); for (auto $ : m_AvailableConnections) { delete $; } } std::unique_ptr HttpConnectionPool::GetConnection() { zen::RwLock::ExclusiveLockScope ScopedLock(m_Lock); if (m_AvailableConnections.empty()) { zen::StringBuilder<16> Service; Service << int64_t(m_Port); asio::ip::tcp::resolver Resolver{m_Context}; std::error_code ErrCode; auto it = Resolver.resolve(m_HostName, Service, ErrCode); auto itEnd = asio::ip::tcp::resolver::iterator(); if (ErrCode) { return nullptr; } asio::ip::tcp::socket Socket{m_Context}; asio::connect(Socket, it, ErrCode); if (ErrCode) { return nullptr; } return std::make_unique(m_Context, *this, std::move(Socket)); } std::unique_ptr Connection{m_AvailableConnections.back()}; m_AvailableConnections.pop_back(); return std::move(Connection); } void HttpConnectionPool::ReturnConnection(std::unique_ptr&& Connection) { zen::RwLock::ExclusiveLockScope ScopedLock(m_Lock); m_AvailableConnections.emplace_back(Connection.release()); } ////////////////////////////////////////////////////////////////////////// class HttpContext { public: HttpContext(asio::io_context& Context) : m_Context(Context) {} ~HttpContext() = default; std::unique_ptr GetConnection(std::string_view HostName, uint16_t Port) { return ConnectionPool(HostName, Port).GetConnection(); } void ReturnConnection(std::unique_ptr Connection) { Connection->ConnectionPool().ReturnConnection(std::move(Connection)); } HttpConnectionPool& ConnectionPool(std::string_view HostName, uint16_t Port) { zen::RwLock::ExclusiveLockScope _(m_Lock); ConnectionId ConnId{std::string(HostName), Port}; if (auto It = m_ConnectionPools.find(ConnId); It == end(m_ConnectionPools)) { // Not found - create new entry auto In = m_ConnectionPools.insert({ConnId, std::move(HttpConnectionPool(m_Context, HostName, Port))}); return In.first->second; } else { return It->second; } } private: asio::io_context& m_Context; struct ConnectionId { inline bool operator<(const ConnectionId& Rhs) const { if (HostName != Rhs.HostName) { return HostName < Rhs.HostName; } return Port < Rhs.Port; } std::string HostName; uint16_t Port; }; zen::RwLock m_Lock; std::map m_ConnectionPools; }; ////////////////////////////////////////////////////////////////////////// class HttpClientRequest { public: HttpClientRequest(HttpContext& Context) : m_HttpContext(Context) {} ~HttpClientRequest() { if (m_Connection) { m_HttpContext.ReturnConnection(std::move(m_Connection)); } } void Get(const std::string_view Url) { http_parser_url ParsedUrl; int ErrCode = http_parser_parse_url(Url.data(), Url.size(), 0, &ParsedUrl); if (ErrCode) { ZEN_NOT_IMPLEMENTED(); } if ((ParsedUrl.field_set & (UF_HOST | UF_PORT | UF_PATH)) != (UF_HOST | UF_PORT | UF_PATH)) { // Bad URL } std::string_view HostName(Url.data() + ParsedUrl.field_data[UF_HOST].off, ParsedUrl.field_data[UF_HOST].len); std::string_view Path(Url.data() + ParsedUrl.field_data[UF_PATH].off); m_Connection = m_HttpContext.GetConnection(HostName, ParsedUrl.port); m_Connection->Get(HostName, ParsedUrl.port, Path); } private: HttpContext& m_HttpContext; std::unique_ptr m_Connection; }; ////////////////////////////////////////////////////////////////////////// // // Custom logging -- test code, this should be tweaked // namespace logging { using namespace spdlog; using namespace spdlog::details; using namespace std::literals; class full_formatter final : public spdlog::formatter { public: full_formatter(std::string_view LogId, std::chrono::time_point Epoch) : m_Epoch(Epoch), m_LogId(LogId) {} virtual std::unique_ptr clone() const override { return std::make_unique(m_LogId, m_Epoch); } static constexpr bool UseDate = false; virtual void format(const details::log_msg& msg, memory_buf_t& dest) override { using std::chrono::duration_cast; using std::chrono::milliseconds; using std::chrono::seconds; if constexpr (UseDate) { auto secs = std::chrono::duration_cast(msg.time.time_since_epoch()); if (secs != m_LastLogSecs) { m_CachedTm = os::localtime(log_clock::to_time_t(msg.time)); m_LastLogSecs = secs; } } const auto& tm_time = m_CachedTm; // cache the date/time part for the next second. auto duration = msg.time - m_Epoch; auto secs = duration_cast(duration); if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0) { m_CachedDatetime.clear(); m_CachedDatetime.push_back('['); if constexpr (UseDate) { fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime); m_CachedDatetime.push_back('-'); fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime); m_CachedDatetime.push_back('-'); fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime); m_CachedDatetime.push_back(' '); fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime); } else { int Count = int(secs.count()); const int LogSecs = Count % 60; Count /= 60; const int LogMins = Count % 60; Count /= 60; const int LogHours = Count; fmt_helper::pad2(LogHours, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(LogMins, m_CachedDatetime); m_CachedDatetime.push_back(':'); fmt_helper::pad2(LogSecs, m_CachedDatetime); } m_CachedDatetime.push_back('.'); m_CacheTimestamp = secs; } dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end()); auto millis = fmt_helper::time_fraction(msg.time); fmt_helper::pad3(static_cast(millis.count()), dest); dest.push_back(']'); dest.push_back(' '); if (!m_LogId.empty()) { dest.push_back('['); fmt_helper::append_string_view(m_LogId, dest); dest.push_back(']'); dest.push_back(' '); } // append logger name if exists if (msg.logger_name.size() > 0) { dest.push_back('['); fmt_helper::append_string_view(msg.logger_name, dest); dest.push_back(']'); dest.push_back(' '); } dest.push_back('['); // wrap the level name with color msg.color_range_start = dest.size(); fmt_helper::append_string_view(level::to_string_view(msg.level), dest); msg.color_range_end = dest.size(); dest.push_back(']'); dest.push_back(' '); // add source location if present if (!msg.source.empty()) { dest.push_back('['); const char* filename = details::short_filename_formatter::basename(msg.source.filename); fmt_helper::append_string_view(filename, dest); dest.push_back(':'); fmt_helper::append_int(msg.source.line, dest); dest.push_back(']'); dest.push_back(' '); } fmt_helper::append_string_view(msg.payload, dest); fmt_helper::append_string_view("\n"sv, dest); } private: std::chrono::time_point m_Epoch; std::tm m_CachedTm; std::chrono::seconds m_LastLogSecs; std::chrono::seconds m_CacheTimestamp{0}; memory_buf_t m_CachedDatetime; std::string m_LogId; }; } // namespace logging ////////////////////////////////////////////////////////////////////////// #if 0 # include # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") int main() { mi_version(); zen::Sleep(1000); zen::Stopwatch timer; const int RequestCount = 100000; cpr::Session Sessions[10]; for (auto& Session : Sessions) { Session.SetUrl(cpr::Url{"http://localhost:1337/test/hello"}); //Session.SetUrl(cpr::Url{ "http://arn-wd-l0182:1337/test/hello" }); } auto Run = [](cpr::Session& Session) { for (int i = 0; i < 10000; ++i) { cpr::Response Result = Session.Get(); if (Result.status_code != 200) { ZEN_WARN("request response: {}", Result.status_code); } } }; Concurrency::parallel_invoke([&] { Run(Sessions[0]); }, [&] { Run(Sessions[1]); }, [&] { Run(Sessions[2]); }, [&] { Run(Sessions[3]); }, [&] { Run(Sessions[4]); }, [&] { Run(Sessions[5]); }, [&] { Run(Sessions[6]); }, [&] { Run(Sessions[7]); }, [&] { Run(Sessions[8]); }, [&] { Run(Sessions[9]); }); // cpr::Response r = cpr::Get(cpr::Url{ "http://localhost:1337/test/hello" }); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(timer.getElapsedTimeMs()), zen::NiceRate(RequestCount, (uint32_t)timer.getElapsedTimeMs(), "req")); return 0; } #elif 0 //#include int main() { mi_version(); restinio::run(restinio::on_thread_pool(32).port(8080).request_handler( [](auto req) { return req->create_response().set_body("Hello, World!").done(); })); return 0; } #else zen::ZenServerEnvironment TestEnv; int main(int argc, char** argv) { mi_version(); zen::zencore_forcelinktests(); zen::zenhttp_forcelinktests(); zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); spdlog::set_formatter(std::make_unique< ::logging::full_formatter>("test", std::chrono::system_clock::now())); std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir); ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); return doctest::Context(argc, argv).run(); } namespace zen::tests { # if 1 TEST_CASE("asio.http") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Instance(TestEnv); Instance.SetTestDir(TestDir); Instance.SpawnServer(13337); ZEN_INFO("Waiting..."); Instance.WaitUntilReady(); // asio test asio::io_context IoContext; HttpContext HttpCtx(IoContext); HttpClientRequest Request(HttpCtx); Request.Get("http://localhost:13337/test/hello"); IoContext.run(); } # endif TEST_CASE("default.single") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Instance(TestEnv); Instance.SetTestDir(TestDir); Instance.SpawnServer(13337); ZEN_INFO("Waiting..."); Instance.WaitUntilReady(); std::atomic RequestCount{0}; std::atomic BatchCounter{0}; ZEN_INFO("Running single server test..."); auto IssueTestRequests = [&] { const uint64_t BatchNo = BatchCounter.fetch_add(1); const DWORD ThreadId = GetCurrentThreadId(); ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId); cpr::Session cli; cli.SetUrl(cpr::Url{"http://localhost:13337/test/hello"}); for (int i = 0; i < 10000; ++i) { auto res = cli.Get(); ++RequestCount; } ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); }; auto fun10 = [&] { Concurrency::parallel_invoke(IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests); }; zen::Stopwatch timer; // Concurrency::parallel_invoke(fun10, fun10, fun, fun, fun, fun, fun, fun, fun, fun); Concurrency::parallel_invoke(IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests, IssueTestRequests); uint64_t Elapsed = timer.getElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } TEST_CASE("multi.basic") { ZenServerInstance Instance1(TestEnv); std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); Instance1.SetTestDir(TestDir1); Instance1.SpawnServer(13337); ZenServerInstance Instance2(TestEnv); std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); Instance2.SetTestDir(TestDir2); Instance2.SpawnServer(13338); ZEN_INFO("Waiting..."); Instance1.WaitUntilReady(); Instance2.WaitUntilReady(); std::atomic RequestCount{0}; std::atomic BatchCounter{0}; auto IssueTestRequests = [&](int PortNumber) { const uint64_t BatchNo = BatchCounter.fetch_add(1); const DWORD ThreadId = GetCurrentThreadId(); ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber); cpr::Session cli; cli.SetUrl(cpr::Url{"http://localhost:{}/test/hello"_format(PortNumber)}); for (int i = 0; i < 10000; ++i) { auto res = cli.Get(); ++RequestCount; } ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId); }; zen::Stopwatch timer; ZEN_INFO("Running multi-server test..."); Concurrency::parallel_invoke([&] { IssueTestRequests(13337); }, [&] { IssueTestRequests(13338); }, [&] { IssueTestRequests(13337); }, [&] { IssueTestRequests(13338); }); uint64_t Elapsed = timer.getElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } TEST_CASE("cas.basic") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const int IterationCount = 1000; std::vector ChunkSizes(IterationCount); std::vector ChunkHashes(IterationCount); { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); std::atomic RequestCount{0}; std::atomic BatchCounter{0}; zen::Stopwatch timer; std::mt19937_64 mt; auto BaseUri = "http://localhost:{}/cas"_format(PortNumber); cpr::Session cli; cli.SetUrl(cpr::Url{BaseUri}); // Populate CAS with some generated data for (int i = 0; i < IterationCount; ++i) { const int ChunkSize = mt() % 10000 + 5; std::string body = fmt::format("{}", i); body.resize(ChunkSize, ' '); ChunkSizes[i] = ChunkSize; ChunkHashes[i] = zen::IoHash::HashBuffer(body.data(), body.size()); cli.SetBody(body); auto res = cli.Post(); CHECK(!res.error); ++RequestCount; } // Verify that the chunks persisted for (int i = 0; i < IterationCount; ++i) { zen::ExtendableStringBuilder<128> Uri; Uri << BaseUri << "/"; ChunkHashes[i].ToHexString(Uri); auto res = cpr::Get(cpr::Url{Uri.c_str()}); CHECK(!res.error); CHECK(res.status_code == 200); CHECK(res.text.size() == ChunkSizes[i]); zen::IoHash Hash = zen::IoHash::HashBuffer(res.text.data(), res.text.size()); CHECK(ChunkHashes[i] == Hash); ++RequestCount; } uint64_t Elapsed = timer.getElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } // Verify that the data persists between process runs (the previous server has exited at this point) { ZenServerInstance Instance2(TestEnv); Instance2.SetTestDir(TestDir); Instance2.SpawnServer(PortNumber); Instance2.WaitUntilReady(); for (int i = 0; i < IterationCount; ++i) { zen::ExtendableStringBuilder<128> Uri; Uri << "http://localhost:{}/cas/"_format(PortNumber); ChunkHashes[i].ToHexString(Uri); auto res = cpr::Get(cpr::Url{Uri.c_str()}); CHECK(res.status_code == 200); CHECK(res.text.size() == ChunkSizes[i]); zen::IoHash Hash = zen::IoHash::HashBuffer(res.text.data(), res.text.size()); CHECK(ChunkHashes[i] == Hash); } } } TEST_CASE("project.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); std::atomic RequestCount{0}; zen::Stopwatch timer; std::mt19937_64 mt; zen::StringBuilder<64> BaseUri; BaseUri << "http://localhost:{}/prj/test"_format(PortNumber); SUBCASE("build store init") { { { zen::CbObjectWriter Body; Body << "id" << "test"; Body << "root" << "/zooom"; Body << "project" << "/zooom"; Body << "engine" << "/zooom"; zen::MemoryOutStream MemOut; zen::BinaryWriter Writer{MemOut}; Body.Save(Writer); auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); CHECK(Response.status_code == 201); } { auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); CHECK(Response.status_code == 200); zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); CHECK(ResponseObject["id"].AsString() == "test"sv); CHECK(ResponseObject["root"].AsString() == "/zooom"sv); } } BaseUri << "/oplog/ps5"; { { zen::StringBuilder<64> PostUri; PostUri << BaseUri; auto Response = cpr::Post(cpr::Url{PostUri.c_str()}); CHECK(Response.status_code == 201); } { auto Response = cpr::Get(cpr::Url{BaseUri.c_str()}); CHECK(Response.status_code == 200); zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); CHECK(ResponseObject["id"].AsString() == "ps5"sv); CHECK(ResponseObject["project"].AsString() == "test"sv); } } SUBCASE("build store persistence") { uint8_t AttachData[] = {1, 2, 3}; zen::CbAttachment Attach{zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})}; zen::CbObjectWriter OpWriter; OpWriter << "key" << "foo" << "attachment" << Attach; const std::string_view ChunkId{ "00000000" "00000000" "00010000"}; auto FileOid = zen::Oid::FromHexString(ChunkId); OpWriter.BeginArray("files"); OpWriter.BeginObject(); OpWriter << "id" << FileOid; OpWriter << "clientpath" << __FILE__; OpWriter << "serverpath" << __FILE__; OpWriter.EndObject(); OpWriter.EndArray(); zen::CbObject Op = OpWriter.Save(); zen::MemoryOutStream MemOut; zen::BinaryWriter Writer(MemOut); zen::CbPackage OpPackage(Op); OpPackage.AddAttachment(Attach); OpPackage.Save(Writer); { zen::StringBuilder<64> PostUri; PostUri << BaseUri << "/new"; auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); REQUIRE(!Response.error); CHECK(Response.status_code == 201); } // Read file data { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << BaseUri << "/" << ChunkId; auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); REQUIRE(!Response.error); CHECK(Response.status_code == 200); } { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10"; auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()}); REQUIRE(!Response.error); CHECK(Response.status_code == 200); CHECK(Response.text.size() == 10); } ZEN_INFO("+++++++"); } SUBCASE("build store op commit") { ZEN_INFO("-------"); } } const uint64_t Elapsed = timer.getElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } # if 0 // this is extremely WIP TEST_CASE("project.pipe") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); zen::LocalProjectClient LocalClient(PortNumber); zen::CbObjectWriter Cbow; Cbow << "hey" << 42; zen::CbObject Response = LocalClient.MessageTransaction(Cbow.Save()); } # endif TEST_CASE("zcache.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const int kIterationCount = 100; const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); // Populate with some simple data for (int i = 0; i < kIterationCount; ++i) { zen::CbObjectWriter Cbo; Cbo << "index" << i; zen::MemoryOutStream MemOut; zen::BinaryWriter Writer{MemOut}; Cbo.Save(Writer); zen::IoHash Key = HashKey(i); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cb"}}); CHECK(Result.status_code == 201); } // Retrieve data for (int i = 0; i < kIterationCount; ++i) { zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i); cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}); CHECK(Result.status_code == 200); } // Ensure bad bucket identifiers are rejected { zen::CbObjectWriter Cbo; Cbo << "index" << 42; zen::MemoryOutStream MemOut; zen::BinaryWriter Writer{MemOut}; Cbo.Save(Writer); zen::IoHash Key = HashKey(442); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, "te!st", Key)}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cb"}}); CHECK(Result.status_code == 400); } } // Verify that the data persists between process runs (the previous server has exited at this point) { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); // Retrieve data again for (int i = 0; i < kIterationCount; ++i) { zen::IoHash Key = HashKey(i); cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}); CHECK(Result.status_code == 200); } } } TEST_CASE("zcache.cbpackage") { using namespace std::literals; auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); zen::CbWriter Obj; Obj.BeginObject("obj"sv); Obj.AddBinaryAttachment("data", OutAttachmentKey); Obj.EndObject(); zen::CbPackage Package; Package.SetObject(Obj.Save().AsObject()); Package.AddAttachment(zen::CbAttachment(CompressedData)); return Package; }; auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { zen::MemoryOutStream MemStream; zen::BinaryWriter Writer(MemStream); Package.Save(Writer); return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { std::span LhsAttachments = Lhs.GetAttachments(); std::span RhsAttachments = Rhs.GetAttachments(); if (LhsAttachments.size() != LhsAttachments.size()) { return false; } for (const zen::CbAttachment& LhsAttachment : LhsAttachments) { const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); CHECK(RhsAttachment); zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); CHECK(!LhsBuffer.IsNull()); zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); CHECK(!RhsBuffer.IsNull()); if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) { return false; } } return true; }; SUBCASE("PUT/GET returns correct package") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); Instance1.SpawnServer(PortNumber); Instance1.WaitUntilReady(); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // PUT { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Body{(const char*)Body.Data(), Body.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // GET { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Response); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } SUBCASE("PUT propagates upstream") { // Setup local and remote server std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); const uint16_t LocalPortNumber = 13337; const uint16_t RemotePortNumber = 13338; const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); ZenServerInstance RemoteInstance(TestEnv); RemoteInstance.SetTestDir(RemoteDataDir); RemoteInstance.SpawnServer(RemotePortNumber); ZenServerInstance LocalInstance(TestEnv); LocalInstance.SetTestDir(LocalDataDir); LocalInstance.SpawnServer(LocalPortNumber, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); LocalInstance.WaitUntilReady(); RemoteInstance.WaitUntilReady(); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // Store the cache record package in the local instance { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Body{(const char*)Body.Data(), Body.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // The cache record can be retrieved as a package from the local instance { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } // The cache record can be retrieved as a package from the remote instance { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } SUBCASE("GET finds upstream when missing in local") { // Setup local and remote server std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); const uint16_t LocalPortNumber = 13337; const uint16_t RemotePortNumber = 13338; const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); ZenServerInstance RemoteInstance(TestEnv); RemoteInstance.SetTestDir(RemoteDataDir); RemoteInstance.SpawnServer(RemotePortNumber); ZenServerInstance LocalInstance(TestEnv); LocalInstance.SetTestDir(LocalDataDir); LocalInstance.SpawnServer(LocalPortNumber, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); LocalInstance.WaitUntilReady(); RemoteInstance.WaitUntilReady(); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // Store the cache record package in upstream cache { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Body{(const char*)Body.Data(), Body.Size()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } // The cache record can be retrieved as a package from the local cache { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); zen::CbPackage Package; const bool Ok = Package.TryLoad(Body); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } } TEST_CASE("zcache.policy") { using namespace std::literals; struct ZenConfig { std::filesystem::path DataDir; uint16_t Port; std::string BaseUri; std::string Args; static ZenConfig New(uint16_t Port = 13337, std::string Args = "") { return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .BaseUri = "http://localhost:{}/z$"_format(Port), .Args = std::move(Args)}; } static ZenConfig NewWithUpstream(uint16_t UpstreamPort) { return New(13337, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort)); } void Spawn(ZenServerInstance& Inst) { Inst.SetTestDir(DataDir); Inst.SpawnServer(Port, Args); Inst.WaitUntilReady(); } }; auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { auto Buf = zen::UniqueBuffer::Alloc(Size); uint8_t* Data = reinterpret_cast(Buf.GetData()); for (uint64_t Idx = 0; Idx < Size; Idx++) { Data[Idx] = Idx % 256; } OutHash = zen::IoHash::HashBuffer(Data, Size); return Buf; }; auto GeneratePackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); zen::CbWriter Obj; Obj.BeginObject("obj"sv); Obj.AddBinaryAttachment("data", OutAttachmentKey); Obj.EndObject(); zen::CbPackage Package; Package.SetObject(Obj.Save().AsObject()); Package.AddAttachment(zen::CbAttachment(CompressedData)); return Package; }; auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { zen::MemoryOutStream MemStream; zen::BinaryWriter Writer(MemStream); Package.Save(Writer); return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; SUBCASE("query - 'local' does not query upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value upstream { cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local' does not store upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value locally { cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local/remote' stores local and upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; auto BinaryValue = GenerateData(1024, Key); // Store binary cache value locally and upstream { cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } } SUBCASE("query - 'local' does not query upstream (cppackage)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; zen::CbPackage Package = GeneratePackage(Key); auto Buf = ToBuffer(Package); // Store package upstream { cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local' does not store upstream (cbpackge)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; zen::CbPackage Package = GeneratePackage(Key); auto Buf = ToBuffer(Package); // Store packge locally { cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 404); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") { ZenConfig UpstreamCfg = ZenConfig::New(13338); ZenServerInstance UpstreamInst(TestEnv); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalInst(TestEnv); const auto Bucket = "legacy"sv; UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); zen::IoHash Key; zen::CbPackage Package = GeneratePackage(Key); auto Buf = ToBuffer(Package); // Store package locally and upstream { cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } { cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } } } struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) : m_HostName(Host) , m_PortNumber(Port) , m_TreePath(TreePath) { } void Build(std::string_view Command, std::string_view Arguments) { zen::FileSystemTraversal Traversal; Traversal.TraverseFileSystem(m_TreePath, m_Visit); zen::CbObjectWriter PrepReq; PrepReq << "cmd" << Command; PrepReq << "args" << Arguments; PrepReq.BeginArray("files"); for (const auto& Kv : m_Visit.m_Files) { PrepReq.BeginObject(); PrepReq << "file" << zen::WideToUtf8(Kv.first) << "size" << Kv.second.Size << "hash" << Kv.second.Hash; PrepReq.EndObject(); } PrepReq.EndArray(); zen::BinaryWriter MemWriter(m_MemOut); PrepReq.Save(MemWriter); } void Prep() { cpr::Response Response = cpr::Post(cpr::Url("{}/prep"_format(m_BaseUri)), cpr::Body((const char*)m_MemOut.Data(), m_MemOut.Size())); if (Response.status_code < 300) { zen::IoBuffer Payload(zen::IoBuffer::Clone, Response.text.data(), Response.text.size()); zen::CbObject Result = zen::LoadCompactBinaryObject(Payload); for (auto& Need : Result["need"]) { zen::IoHash NeedHash = Need.AsHash(); if (auto It = m_Visit.m_HashToFile.find(NeedHash); It != m_Visit.m_HashToFile.end()) { zen::IoBuffer FileData = zen::IoBufferBuilder::MakeFromFile(It->second.c_str()); cpr::Response CasResponse = cpr::Post(cpr::Url(m_CasUri), cpr::Body((const char*)FileData.Data(), FileData.Size())); if (CasResponse.status_code >= 300) { ZEN_ERROR("CAS put failed with {}", CasResponse.status_code); } } else { ZEN_ERROR("unknown hash in 'need' list: {}", NeedHash); } } } } zen::CbObject Exec() { cpr::Response JobResponse = cpr::Post(cpr::Url(m_BaseUri), cpr::Body((const char*)m_MemOut.Data(), m_MemOut.Size())); if (JobResponse.status_code < 300) { zen::IoBuffer Payload(zen::IoBuffer::Clone, JobResponse.text.data(), JobResponse.text.size()); return zen::LoadCompactBinaryObject(std::move(Payload)); } ZEN_INFO("job exec: {}", JobResponse.status_code); return {}; } private: struct Visitor : public zen::FileSystemTraversal::TreeVisitor { const std::filesystem::path& m_RootPath; Visitor(const std::filesystem::path& RootPath) : m_RootPath(RootPath) {} virtual void VisitFile(const std::filesystem::path& Parent, const std::wstring_view& FileName, uint64_t FileSize) override { std::filesystem::path FullPath = Parent / FileName; zen::IoHashStream Ios; zen::ScanFile(FullPath, 64 * 1024, [&](const void* Data, size_t Size) { Ios.Append(Data, Size); }); zen::IoHash Hash = Ios.GetHash(); std::wstring RelativePath = FullPath.lexically_relative(m_RootPath).native(); // ZEN_INFO("File: {:32} => {} ({})", zen::WideToUtf8(RelativePath), Hash, FileSize); FileEntry& Entry = m_Files[RelativePath]; Entry.Hash = Hash; Entry.Size = FileSize; m_HashToFile[Hash] = FullPath; } virtual bool VisitDirectory(const std::filesystem::path& Parent, const std::wstring_view& DirectoryName) override { std::filesystem::path FullPath = Parent / DirectoryName; if (DirectoryName.starts_with(L".")) { return false; } return true; } struct FileEntry { uint64_t Size; zen::IoHash Hash; }; std::map m_Files; std::unordered_map m_HashToFile; }; std::string m_HostName; int m_PortNumber; std::filesystem::path m_TreePath; const std::string m_BaseUri = "http://{}:{}/exec/jobs"_format(m_HostName, m_PortNumber); const std::string m_CasUri = "http://{}:{}/cas"_format(m_HostName, m_PortNumber); Visitor m_Visit{m_TreePath}; zen::MemoryOutStream m_MemOut; }; TEST_CASE("exec.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; ZenServerInstance Zen1(TestEnv); Zen1.SetTestDir(TestDir); Zen1.SpawnServer(PortNumber); Zen1.WaitUntilReady(); std::filesystem::path TreePath = TestEnv.GetTestRootDir("test/remote1"); { RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); RemoteRequest.Build("zentest-appstub.exe", ""); RemoteRequest.Prep(); zen::CbObject Result = RemoteRequest.Exec(); CHECK(Result["exitcode"].AsInt32(-1) == 0); } { RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); RemoteRequest.Build("zentest-appstub.exe", "-f=1"); RemoteRequest.Prep(); zen::CbObject Result = RemoteRequest.Exec(); CHECK(Result["exitcode"].AsInt32(-1) == 1); } } TEST_CASE("mesh.basic") { using namespace std::literals; const int kInstanceCount = 4; ZEN_INFO("spawning {} instances", kInstanceCount); std::unique_ptr Instances[kInstanceCount]; for (int i = 0; i < kInstanceCount; ++i) { auto& Instance = Instances[i]; Instance = std::make_unique(TestEnv); Instance->SetTestDir(TestEnv.CreateNewTestDir()); Instance->EnableMesh(); Instance->SpawnServer(13337 + i); } for (int i = 0; i < kInstanceCount; ++i) { auto& Instance = Instances[i]; Instance->WaitUntilReady(); } } class ZenServerTestHelper { public: ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {} ~ZenServerTestHelper() {} void SpawnServers() { SpawnServers([](ZenServerInstance&) {}); } void SpawnServers(auto&& Callback) { ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount); m_Instances.resize(m_ServerCount); for (int i = 0; i < m_ServerCount; ++i) { auto& Instance = m_Instances[i]; Instance = std::make_unique(TestEnv); Instance->SetTestDir(TestEnv.CreateNewTestDir()); Callback(*Instance); Instance->SpawnServer(13337 + i); } for (int i = 0; i < m_ServerCount; ++i) { auto& Instance = m_Instances[i]; Instance->WaitUntilReady(); } } ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } private: std::string m_HelperId; int m_ServerCount = 0; std::vector > m_Instances; }; TEST_CASE("http.basics") { using namespace std::literals; ZenServerTestHelper Servers{"http.basics"sv, 1}; Servers.SpawnServers(); ZenServerInstance& Instance = Servers.GetInstance(0); const std::string BaseUri = Instance.GetBaseUri(); { cpr::Response r = cpr::Get(cpr::Url{"{}/testing/hello"_format(BaseUri)}); CHECK_EQ(r.status_code, 200); } { cpr::Response r = cpr::Post(cpr::Url{"{}/testing/hello"_format(BaseUri)}); CHECK_EQ(r.status_code, 404); } { cpr::Response r = cpr::Post(cpr::Url{"{}/testing/echo"_format(BaseUri)}, cpr::Body{"yoyoyoyo"}); CHECK_EQ(r.status_code, 200); CHECK_EQ(r.text, "yoyoyoyo"); } } TEST_CASE("http.package") { using namespace std::literals; ZenServerTestHelper Servers{"http.package"sv, 1}; Servers.SpawnServers(); ZenServerInstance& Instance = Servers.GetInstance(0); const std::string BaseUri = Instance.GetBaseUri(); static const uint8_t Data1[] = {0, 1, 2, 3}; static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8}; zen::CbAttachment Attach1{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}), zen::OodleCompressor::NotSet, zen::OodleCompressionLevel::None)}; zen::CbAttachment Attach2{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}), zen::OodleCompressor::NotSet, zen::OodleCompressionLevel::None)}; zen::CbObjectWriter Writer; Writer.AddAttachment("attach1", Attach1); Writer.AddAttachment("attach2", Attach2); zen::CbObject CoreObject = Writer.Save(); zen::CbPackage TestPackage; TestPackage.SetObject(CoreObject); TestPackage.AddAttachment(Attach1); TestPackage.AddAttachment(Attach2); zen::HttpClient TestClient(BaseUri); zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage); zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); CHECK_EQ(ResponsePackage, TestPackage); } # if 0 TEST_CASE("lifetime.owner") { // This test is designed to verify that the hand-over of sponsor processes is handled // correctly for the case when a second or third process is launched on the same port // // Due to the nature of it, it cannot be const uint16_t PortNumber = 23456; ZenServerInstance Zen1(TestEnv); std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); Zen1.SetTestDir(TestDir1); Zen1.SpawnServer(PortNumber); Zen1.WaitUntilReady(); Zen1.Detach(); ZenServerInstance Zen2(TestEnv); std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); Zen2.SetTestDir(TestDir2); Zen2.SpawnServer(PortNumber); Zen2.WaitUntilReady(); Zen2.Detach(); } TEST_CASE("lifetime.owner.2") { // This test is designed to verify that the hand-over of sponsor processes is handled // correctly for the case when a second or third process is launched on the same port // // Due to the nature of it, it cannot be const uint16_t PortNumber = 13456; std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); ZenServerInstance Zen1(TestEnv); Zen1.SetTestDir(TestDir1); Zen1.SpawnServer(PortNumber); Zen1.WaitUntilReady(); ZenServerInstance Zen2(TestEnv); Zen2.SetTestDir(TestDir2); Zen2.SetOwnerPid(Zen1.GetPid()); Zen2.SpawnServer(PortNumber + 1); Zen2.Detach(); ZenServerInstance Zen3(TestEnv); Zen3.SetTestDir(TestDir2); Zen3.SetOwnerPid(Zen1.GetPid()); Zen3.SpawnServer(PortNumber + 1); Zen3.Detach(); ZenServerInstance Zen4(TestEnv); Zen4.SetTestDir(TestDir2); Zen4.SetOwnerPid(Zen1.GetPid()); Zen4.SpawnServer(PortNumber + 1); Zen4.Detach(); } # endif } // namespace zen::tests #endif