diff options
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 229 |
1 files changed, 227 insertions, 2 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 213648319..1a41a5541 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -11,11 +11,14 @@ #include <zencore/fmtutils.h> #include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/stream.h> #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> -#include <zenhttp/zenhttp.h> #include <zenhttp/httpclient.h> +#include <zenhttp/httpshared.h> +#include <zenhttp/zenhttp.h> #include <zenserverprocess.h> #include <mimalloc.h> @@ -33,6 +36,7 @@ #include <filesystem> #include <map> #include <random> +#include <span> #include <atlbase.h> #include <process.h> @@ -194,6 +198,8 @@ private: size_t rv = http_parser_execute(&m_HttpParser, &m_HttpParserSettings, (const char*)m_ResponseBuffer.data(), Bytes); + ZEN_UNUSED(rv); + if (m_HttpParser.http_errno != 0) { // Something bad! @@ -1088,7 +1094,7 @@ TEST_CASE("project.pipe") } # endif -TEST_CASE("z$.basic") +TEST_CASE("zcache.basic") { using namespace std::literals; @@ -1179,6 +1185,221 @@ TEST_CASE("z$.basic") } } +TEST_CASE("zcache.cbpackage") +{ + using namespace std::literals; + + auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + + OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); + + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData)); + + return Package; + }; + + auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::MemoryOutStream MemStream; + zen::BinaryWriter Writer(MemStream); + + Package.Save(Writer); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { + std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); + std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); + + if (LhsAttachments.size() != LhsAttachments.size()) + { + return false; + } + + for (const zen::CbAttachment& LhsAttachment : LhsAttachments) + { + const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); + CHECK(RhsAttachment); + + zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); + CHECK(!LhsBuffer.IsNull()); + + zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); + CHECK(!RhsBuffer.IsNull()); + + if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) + { + return false; + } + } + + return true; + }; + + SUBCASE("PUT/GET returns correct package") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // PUT + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + // GET + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Response); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("PUT propagates upstream") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); + const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); + + LocalInstance.WaitUntilReady(); + RemoteInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in the local instance + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local instance + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + + // The cache record can be retrieved as a package from the remote instance + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("GET finds upstream when missing in local") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); + const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); + + LocalInstance.WaitUntilReady(); + RemoteInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in upstream cache + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local cache + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } +} + struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) @@ -1482,6 +1703,10 @@ TEST_CASE("http.package") zen::HttpClient TestClient(BaseUri); zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage); + + zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + + CHECK_EQ(ResponsePackage, TestPackage); } #endif |