diff options
| author | Stefan Boberg <[email protected]> | 2025-09-29 13:15:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-29 13:15:16 +0200 |
| commit | d4c6e547a7081b1562a69dc9839d24cb82681c5d (patch) | |
| tree | 3ffe43dcf09bb6d01c2fb860bb1f73882f44827d /src/zenserver-test/zenserver-test.cpp | |
| parent | gracefully handle missing chunks when exporting an oplog (#526) (diff) | |
| download | zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.tar.xz zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.zip | |
split zenserver-test monolith into multiple source files (#528)
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 4636 |
1 files changed, 34 insertions, 4602 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 827a4eb5a..773383954 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -2,73 +2,32 @@ #define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING -#include <zenbase/refcount.h> -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zencore/memoryview.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/string.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zencore/xxhash.h> -#include <zenhttp/httpclient.h> -#include <zenhttp/packageformat.h> -#include <zenhttp/zenhttp.h> -#include <zenutil/buildstoragecache.h> -#include <zenutil/cache/cache.h> -#include <zenutil/cache/cacherequests.h> -#include <zenutil/chunkrequests.h> -#include <zenutil/logging/testformatter.h> -#include <zenutil/zenserverprocess.h> - -#include <http_parser.h> - -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Crypt32.lib") -# pragma comment(lib, "Wldap32.lib") -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_set.h> -#undef GetObject -ZEN_THIRD_PARTY_INCLUDES_END - -#include <atomic> -#include <filesystem> -#include <map> -#include <random> -#include <span> -#include <thread> -#include <typeindex> -#include <unordered_map> - -#if ZEN_PLATFORM_WINDOWS -# include <ppl.h> -# include <process.h> -#endif - -#include <zencore/memory/newdelete.h> - -////////////////////////////////////////////////////////////////////////// - #if ZEN_WITH_TESTS -# define ZEN_TEST_WITH_RUNNER 1 -# include <zencore/testing.h> -# include <zencore/workthreadpool.h> -#endif - -using namespace std::literals; -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# define ZEN_TEST_WITH_RUNNER 1 +# include "zenserver-test.h" + +# include <zencore/except.h> +# include <zencore/fmtutils.h> +# include <zencore/logging.h> +# include <zencore/stream.h> +# include <zencore/string.h> +# include <zencore/testutils.h> +# include <zencore/thread.h> +# include <zencore/timer.h> +# include <zenhttp/httpclient.h> +# include <zenhttp/packageformat.h> +# include <zenutil/logging/testformatter.h> +# include <zenutil/zenserverprocess.h> + +# include <atomic> +# include <filesystem> + +# if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# include <process.h> +# else +# include <thread> struct Concurrency { template<typename... T> @@ -85,12 +44,19 @@ struct Concurrency } } }; -#endif +# endif + +# include <zencore/memory/newdelete.h> ////////////////////////////////////////////////////////////////////////// -#if ZEN_WITH_TESTS +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +namespace zen::tests { zen::ZenServerEnvironment TestEnv; +} int main(int argc, char** argv) @@ -136,7 +102,7 @@ main(int argc, char** argv) } } - TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); + zen::tests::TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); @@ -148,16 +114,6 @@ main(int argc, char** argv) namespace zen::tests { -IoBuffer -MakeCbObjectPayload(std::function<void(CbObjectWriter& Writer)> WriteCB) -{ - CbObjectWriter Writer; - WriteCB(Writer); - IoBuffer Payload = Writer.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - return Payload; -}; - TEST_CASE("default.single") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); @@ -292,2749 +248,6 @@ TEST_CASE("multi.basic") zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } -TEST_CASE("project.basic") -{ - using namespace std::literals; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - - const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); - - std::mt19937_64 mt; - - zen::StringBuilder<64> BaseUri; - BaseUri << fmt::format("http://localhost:{}", PortNumber); - - std::filesystem::path BinPath = zen::GetRunningExecutablePath(); - std::filesystem::path RootPath = BinPath.parent_path().parent_path(); - BinPath = BinPath.lexically_relative(RootPath); - - SUBCASE("build store init") - { - { - HttpClient Http{BaseUri}; - - { - zen::CbObjectWriter Body; - Body << "id" - << "test"; - Body << "root" << RootPath.c_str(); - Body << "project" - << "/zooom"; - Body << "engine" - << "/zooom"; - - zen::BinaryWriter MemOut; - IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer(); - - auto Response = Http.Post("/prj/test"sv, BodyBuf); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } - - { - auto Response = Http.Get("/prj/test"sv); - CHECK(Response.StatusCode == HttpResponseCode::OK); - - CbObject ResponseObject = Response.AsObject(); - - CHECK(ResponseObject["id"].AsString() == "test"sv); - CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); - } - } - - BaseUri << "/prj/test/oplog/foobar"; - - { - HttpClient Http{BaseUri}; - - { - auto Response = Http.Post(""sv); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } - - { - auto Response = Http.Get(""sv); - CHECK(Response.StatusCode == HttpResponseCode::OK); - - CbObject ResponseObject = Response.AsObject(); - - CHECK(ResponseObject["id"].AsString() == "foobar"sv); - CHECK(ResponseObject["project"].AsString() == "test"sv); - } - } - - SUBCASE("build store persistence") - { - uint8_t AttachData[] = {1, 2, 3}; - - zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); - zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; - - zen::CbObjectWriter OpWriter; - OpWriter << "key" - << "foo" - << "attachment" << Attach; - - const std::string_view ChunkId{ - "00000000" - "00000000" - "00010000"}; - auto FileOid = zen::Oid::FromHexString(ChunkId); - - OpWriter.BeginArray("files"); - OpWriter.BeginObject(); - OpWriter << "id" << FileOid; - OpWriter << "clientpath" - << "/{engine}/client/side/path"; - OpWriter << "serverpath" << BinPath.c_str(); - OpWriter.EndObject(); - OpWriter.EndArray(); - - zen::CbObject Op = OpWriter.Save(); - - zen::CbPackage OpPackage(Op); - OpPackage.AddAttachment(Attach); - - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - - HttpClient Http{BaseUri}; - - { - auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); - - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } - - // Read file data - - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId; - auto Response = Http.Get(ChunkGetUri); - - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - } - - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId << "?offset=1&size=10"; - auto Response = Http.Get(ChunkGetUri); - - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - CHECK(Response.ResponsePayload.GetSize() == 10); - } - - ZEN_INFO("+++++++"); - } - - SUBCASE("snapshot") - { - zen::CbObjectWriter OpWriter; - OpWriter << "key" - << "foo"; - - const std::string_view ChunkId{ - "00000000" - "00000000" - "00010000"}; - auto FileOid = zen::Oid::FromHexString(ChunkId); - - OpWriter.BeginArray("files"); - OpWriter.BeginObject(); - OpWriter << "id" << FileOid; - OpWriter << "clientpath" - << "/{engine}/client/side/path"; - OpWriter << "serverpath" << BinPath.c_str(); - OpWriter.EndObject(); - OpWriter.EndArray(); - - zen::CbObject Op = OpWriter.Save(); - - zen::CbPackage OpPackage(Op); - - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - - HttpClient Http{BaseUri}; - - { - auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); - - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } - - // Read file data, it is raw and uncompressed - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId; - auto Response = Http.Get(ChunkGetUri); - - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - - IoBuffer Data = Response.ResponsePayload; - IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); - CHECK(ReferenceData.GetSize() == Data.GetSize()); - CHECK(ReferenceData.GetView().EqualBytes(Data.GetView())); - } - - { - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); - auto Response = Http.Post("/rpc"sv, Payload, {{"Content-Type", "application/x-ue-cb"}}); - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - } - - // Read chunk data, it is now compressed - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId; - auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); - - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - - IoBuffer Data = Response.ResponsePayload; - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); - CHECK(Compressed); - IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); - IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); - CHECK(RawSize == ReferenceData.GetSize()); - CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize()); - CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView())); - } - - ZEN_INFO("+++++++"); - } - - SUBCASE("test chunk not found error") - { - HttpClient Http{BaseUri}; - - for (size_t I = 0; I < 65; I++) - { - zen::StringBuilder<128> PostUri; - PostUri << "/f77c781846caead318084604/info"; - auto Response = Http.Get(PostUri); - - REQUIRE(!Response.Error); - CHECK(Response.StatusCode == HttpResponseCode::NotFound); - } - } - } -} - -namespace utils { - - struct ZenConfig - { - std::filesystem::path DataDir; - uint16_t Port; - std::string BaseUri; - std::string Args; - - static ZenConfig New(std::string Args = "") - { - return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = TestEnv.GetNewPortNumber(), .Args = std::move(Args)}; - } - - static ZenConfig New(uint16_t Port, std::string Args = "") - { - return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)}; - } - - static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort, std::string Args = "") - { - return New(Port, - fmt::format("{}{}--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", - Args, - Args.length() > 0 ? " " : "", - UpstreamPort)); - } - - static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug) - { - std::string Args = Debug ? "--debug" : ""; - for (uint16_t Port : UpstreamPorts) - { - Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port); - } - return New(NewPort, Args); - } - - void Spawn(ZenServerInstance& Inst) - { - Inst.SetTestDir(DataDir); - Inst.SpawnServer(Port, Args); - const uint16_t InstancePort = Inst.WaitUntilReady(); - CHECK_MESSAGE(InstancePort != 0, Inst.GetLogOutput()); - - if (Port != InstancePort) - ZEN_DEBUG("relocation detected from {} to {}", Port, InstancePort); - - Port = InstancePort; - BaseUri = fmt::format("http://localhost:{}/z$", Port); - } - }; - - void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg) { Cfg.Spawn(Server); } - - CompressedBuffer CreateSemiRandomBlob(size_t AttachmentSize, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast) - { - // Convoluted way to get a compressed buffer whose result it large enough to be a separate file - // but also does actually compress - const size_t PartCount = (AttachmentSize / (1u * 1024u * 64)) + 1; - const size_t PartSize = AttachmentSize / PartCount; - auto Part = SharedBuffer(CreateRandomBlob(PartSize)); - std::vector<SharedBuffer> Parts(PartCount, Part); - size_t RemainPartSize = AttachmentSize - (PartSize * PartCount); - if (RemainPartSize > 0) - { - Parts.push_back(SharedBuffer(CreateRandomBlob(RemainPartSize))); - } - CompressedBuffer Value = CompressedBuffer::Compress(CompositeBuffer(std::move(Parts)), OodleCompressor::Mermaid, CompressionLevel); - return Value; - }; - - std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) - { - std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) - { - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size))); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); - } - return Result; - } - - std::vector<std::pair<Oid, CompressedBuffer>> CreateSemiRandomAttachments(const std::span<const size_t>& Sizes) - { - std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) - { - CompressedBuffer Compressed = - CreateSemiRandomBlob(Size, Size > 1024u * 1024u ? OodleCompressionLevel::None : OodleCompressionLevel::VeryFast); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); - } - return Result; - } - -} // namespace utils - -TEST_CASE("zcache.basic") -{ - using namespace std::literals; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - const int kIterationCount = 100; - - auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; - - { - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - - const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); - const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - // Populate with some simple data - - HttpClient Http{BaseUri}; - - for (int i = 0; i < kIterationCount; ++i) - { - zen::CbObjectWriter Cbo; - Cbo << "index" << i; - - IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(HttpContentType::kCbObject); - - zen::IoHash Key = HashKey(i); - - HttpClient::Response Result = Http.Put(fmt::format("/test/{}", Key), Payload); - - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // Retrieve data - - for (int i = 0; i < kIterationCount; ++i) - { - zen::IoHash Key = HashKey(i); - - HttpClient::Response Result = Http.Get(fmt::format("/test/{}", Key), {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - - // Ensure bad bucket identifiers are rejected - - { - zen::CbObjectWriter Cbo; - Cbo << "index" << 42; - - IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(HttpContentType::kCbObject); - - zen::IoHash Key = HashKey(442); - - HttpClient::Response Result = Http.Put(fmt::format("/te!st/{}", Key), Payload); - - CHECK(Result.StatusCode == HttpResponseCode::BadRequest); - } - } - - // Verify that the data persists between process runs (the previous server has exited at this point) - - { - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); - - const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - HttpClient Http{BaseUri}; - - // Retrieve data again - - for (int i = 0; i < kIterationCount; ++i) - { - zen::IoHash Key = HashKey(i); - - HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", "test", Key), {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } -} - -IoBuffer -SerializeToBuffer(const zen::CbPackage& Package) -{ - BinaryWriter MemStream; - - Package.Save(MemStream); - - IoBuffer Buffer = zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - Buffer.SetContentType(HttpContentType::kCbPackage); - return Buffer; -}; - -TEST_CASE("zcache.cbpackage") -{ - using namespace std::literals; - - auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { - auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); - auto CompressedData = zen::CompressedBuffer::Compress(Data); - - OutAttachmentKey = CompressedData.DecodeRawHash(); - - zen::CbWriter Obj; - Obj.BeginObject("obj"sv); - Obj.AddBinaryAttachment("data", OutAttachmentKey); - Obj.EndObject(); - - zen::CbPackage Package; - Package.SetObject(Obj.Save().AsObject()); - Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); - - return Package; - }; - - auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { - std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); - std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); - - if (LhsAttachments.size() != RhsAttachments.size()) - { - return false; - } - - for (const zen::CbAttachment& LhsAttachment : LhsAttachments) - { - const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); - CHECK(RhsAttachment); - - zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); - CHECK(!LhsBuffer.IsNull()); - - zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); - CHECK(!RhsBuffer.IsNull()); - - if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) - { - return false; - } - } - - return true; - }; - - SUBCASE("PUT/GET returns correct package") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); - const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - HttpClient Http{BaseUri}; - - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); - - // PUT - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // GET - { - HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } - - SUBCASE("PUT propagates upstream") - { - // Setup local and remote server - std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); - std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance RemoteInstance(TestEnv); - RemoteInstance.SetTestDir(RemoteDataDir); - const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); - - ZenServerInstance LocalInstance(TestEnv); - LocalInstance.SetTestDir(LocalDataDir); - LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), - fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); - const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); - CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); - - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); - - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); - - HttpClient LocalHttp{LocalBaseUri}; - HttpClient RemoteHttp{RemoteBaseUri}; - - // Store the cache record package in the local instance - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); - - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // The cache record can be retrieved as a package from the local instance - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - - // The cache record can be retrieved as a package from the remote instance - { - HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } - - SUBCASE("GET finds upstream when missing in local") - { - // Setup local and remote server - std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); - std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance RemoteInstance(TestEnv); - RemoteInstance.SetTestDir(RemoteDataDir); - const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); - - ZenServerInstance LocalInstance(TestEnv); - LocalInstance.SetTestDir(LocalDataDir); - LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), - fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); - const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); - CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); - - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); - - HttpClient LocalHttp{LocalBaseUri}; - HttpClient RemoteHttp{RemoteBaseUri}; - - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); - - // Store the cache record package in upstream cache - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); - - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // The cache record can be retrieved as a package from the local cache - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } -} - -TEST_CASE("zcache.policy") -{ - using namespace std::literals; - using namespace utils; - - auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::IoBuffer { - auto Buf = zen::UniqueBuffer::Alloc(Size); - uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData()); - for (uint64_t Idx = 0; Idx < Size; Idx++) - { - Data[Idx] = Idx % 256; - } - OutHash = zen::IoHash::HashBuffer(Data, Size); - return Buf.MoveToShared().AsIoBuffer(); - }; - - auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { - auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); - auto CompressedData = zen::CompressedBuffer::Compress(Data); - OutAttachmentKey = CompressedData.DecodeRawHash(); - - zen::CbWriter Writer; - Writer.BeginObject("obj"sv); - Writer.AddBinaryAttachment("data", OutAttachmentKey); - Writer.EndObject(); - CbObject CacheRecord = Writer.Save().AsObject(); - - OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); - - zen::CbPackage Package; - Package.SetObject(CacheRecord); - Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); - - return Package; - }; - - SUBCASE("query - 'local' does not query upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - const uint16_t UpstreamPort = UpstreamCfg.Port; - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const std::string_view Bucket = "legacy"sv; - - zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - { - HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - { - HttpClient::Response Result = - LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::NotFound); - } - - { - HttpClient::Response Result = - LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } - - SUBCASE("store - 'local' does not store upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - const uint16_t UpstreamPort = UpstreamCfg.Port; - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; - - zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store binary cache value locally - { - HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), - BinaryValue, - {{"Content-Type", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - { - HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::NotFound); - } - - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } - - SUBCASE("store - 'local/remote' stores local and upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; - - zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store binary cache value locally and upstream - { - HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), - BinaryValue, - {{"Content-Type", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - { - HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } - - SUBCASE("query - 'local' does not query upstream (cbpackage)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - IoBuffer Buf = SerializeToBuffer(Package); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store package upstream - { - HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - { - HttpClient::Response Result = - LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::NotFound); - } - - { - HttpClient::Response Result = - LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } - - SUBCASE("store - 'local' does not store upstream (cbpackage)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - IoBuffer Buf = SerializeToBuffer(Package); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store package locally - { - HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - { - HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::NotFound); - } - - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } - - SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - IoBuffer Buf = SerializeToBuffer(Package); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store package locally and upstream - { - HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - { - HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - } - } - - SUBCASE("skip - 'data' returns cache record without attachments/empty payload") - { - ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Instance(TestEnv); - Cfg.Spawn(Instance); - - const auto Bucket = "test"sv; - - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - IoBuffer Buf = SerializeToBuffer(Package); - - HttpClient Http{Cfg.BaseUri}; - - // Store package - { - HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // Get package - { - HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result); - CbPackage ResponsePackage; - CHECK(ResponsePackage.TryLoad(Result.ResponsePayload)); - CHECK(ResponsePackage.GetAttachments().size() == 0); - } - - // Get record - { - HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}}); - CHECK(Result); - CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload); - CHECK(ResponseObject); - } - - // Get payload - { - HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}}); - CHECK(Result); - CHECK(Result.ResponsePayload.GetSize() == 0); - } - } - - SUBCASE("skip - 'data' returns empty binary value") - { - ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Instance(TestEnv); - Cfg.Spawn(Instance); - - const auto Bucket = "test"sv; - - zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); - - HttpClient Http{Cfg.BaseUri}; - - // Store binary cache value - { - HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // Get package - { - HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}}); - CHECK(Result); - CHECK(Result.ResponsePayload.GetSize() == 0); - } - } -} - -TEST_CASE("zcache.rpc") -{ - using namespace std::literals; - - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - const zen::CacheKey& CacheKey, - size_t PayloadSize, - CachePolicy RecordPolicy) { - std::vector<uint8_t> Data; - Data.resize(PayloadSize); - uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]); - uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); - for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx) - { - DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu); - } - if (PayloadSize & 1) - { - Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff); - } - CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); - Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); - }; - - auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, - std::string_view Namespace, - std::string_view Bucket, - size_t Num, - size_t PayloadSize = 1024, - size_t KeyOffset = 1, - CachePolicy PutPolicy = CachePolicy::Default, - std::vector<CbPackage>* OutPackages = nullptr) -> std::vector<CacheKey> { - std::vector<zen::CacheKey> OutKeys; - - HttpClient Http{BaseUri}; - - for (uint32_t Key = 1; Key <= Num; ++Key) - { - zen::IoHash KeyHash; - ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key); - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); - - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy); - OutKeys.push_back(CacheKey); - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - if (OutPackages) - { - OutPackages->emplace_back(std::move(Package)); - } - } - - return OutKeys; - }; - - struct GetCacheRecordResult - { - zen::CbPackage Response; - cacherequests::GetCacheRecordsResult Result; - bool Success; - }; - - auto GetCacheRecords = [](std::string_view BaseUri, - std::string_view Namespace, - std::span<zen::CacheKey> Keys, - zen::CachePolicy Policy, - zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone, - int Pid = 0) -> GetCacheRecordResult { - cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .AcceptOptions = static_cast<uint16_t>(AcceptOptions), - .ProcessPid = Pid, - .DefaultPolicy = Policy, - .Namespace = std::string(Namespace)}; - for (const CacheKey& Key : Keys) - { - Request.Requests.push_back({.Key = Key}); - } - - CbObjectWriter RequestWriter; - CHECK(Request.Format(RequestWriter)); - - IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbObject); - - HttpClient Http{BaseUri}; - - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - GetCacheRecordResult OutResult; - - if (Result.StatusCode == HttpResponseCode::OK) - { - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - CHECK(!Response.IsNull()); - OutResult.Response = std::move(Response); - CHECK(OutResult.Result.Parse(OutResult.Response)); - OutResult.Success = true; - } - - return OutResult; - }; - - SUBCASE("get cache records") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - - const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); - const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); - - CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); - GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record); - CHECK(Record->Key == ExpectedKey); - CHECK(Record->Values.size() == 1); - - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.Body); - } - } - } - - SUBCASE("get missing cache records") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); - const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); - - CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); - std::vector<zen::CacheKey> Keys; - - for (const zen::CacheKey& Key : ExistingKeys) - { - Keys.push_back(Key); - Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); - } - - GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - size_t KeyIndex = 0; - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const bool Missing = Index++ % 2 != 0; - - if (Missing) - { - CHECK(!Record); - } - else - { - const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.Body); - } - } - } - } - - SUBCASE("policy - 'QueryLocal' does not query upstream") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); - - CachePolicy Policy = CachePolicy::QueryLocal; - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(!Record); - } - } - - SUBCASE("policy - 'QueryRemote' does query upstream") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); - - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - } - } - - SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - size_t PayloadSize = 1024; - std::string_view Namespace("ue4.ddc"sv); - std::string_view Bucket("mastodon"sv); - const size_t NumRecords = 4; - std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; - - for (const zen::CacheKey& CacheKey : Keys) - { - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - cacherequests::PutCacheRecordsResult ParsedResult; - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - CHECK(!Response.IsNull()); - CHECK(ParsedResult.Parse(Response)); - for (bool ResponseSuccess : ParsedResult.Success) - { - CHECK(ResponseSuccess); - } - CHECK(ParsedResult.Details.empty()); - } - - auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.RawSize == PayloadSize * 2); - } - } - }; - - // Check that the records are present and overwritten in the local server - CheckRecordCorrectness(LocalCfg); - - // Check that the records are present and overwritten in the upstream server - CheckRecordCorrectness(UpstreamCfg); - } - - SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - size_t PayloadSize = 1024; - std::string_view Namespace("ue4.ddc"sv); - std::string_view Bucket("mastodon"sv); - const size_t NumRecords = 4; - std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; - - for (const zen::CacheKey& CacheKey : Keys) - { - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - - HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - cacherequests::PutCacheRecordsResult ParsedResult; - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - CHECK(!Response.IsNull()); - CHECK(ParsedResult.Parse(Response)); - CHECK(Request.Requests.size() == ParsedResult.Success.size()); - for (bool ResponseSuccess : ParsedResult.Success) - { - CHECK(ResponseSuccess); - } - CHECK(Request.Requests.size() == ParsedResult.Details.size()); - for (const CbObjectView& Details : ParsedResult.Details) - { - CHECK(Details); - CHECK(Details["RawHash"sv].IsHash()); - CHECK(Details["RawSize"sv].IsInteger()); - CHECK(Details["Record"sv].IsObject()); - } - } - - auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.RawSize == PayloadSize); - } - } - }; - - // Check that the records are present and not overwritten in the local server - CheckRecordCorrectness(LocalCfg); - - // Check that the records are present and not overwritten in the upstream server - CheckRecordCorrectness(UpstreamCfg); - } - - SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; - - size_t PayloadSize = 1024; - std::string_view Namespace("ue4.ddc"sv); - std::string_view Bucket("mastodon"sv); - const size_t NumRecords = 4; - std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); - - for (const zen::CacheKey& CacheKey : Keys) - { - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store); - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - cacherequests::PutCacheRecordsResult ParsedResult; - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - CHECK(!Response.IsNull()); - CHECK(ParsedResult.Parse(Response)); - for (bool ResponseSuccess : ParsedResult.Success) - { - CHECK(ResponseSuccess); - } - CHECK(ParsedResult.Details.empty()); - } - - auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.RawSize == PayloadSize * 2); - } - } - }; - - // Check that the records are present and overwritten in the local server - CheckRecordCorrectness(LocalCfg); - - // Check that the records are present and overwritten in the upstream server - CheckRecordCorrectness(UpstreamCfg); - } - - SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites") - { - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; - - size_t PayloadSize = 1024; - std::string_view Namespace("ue4.ddc"sv); - std::string_view Bucket("mastodon"sv); - const size_t NumRecords = 4; - std::vector<CbPackage> Packages; - std::vector<zen::CacheKey> Keys = - PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages); - - for (const CbPackage& Package : Packages) - { - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - cacherequests::PutCacheRecordsResult ParsedResult; - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - CHECK(!Response.IsNull()); - CHECK(ParsedResult.Parse(Response)); - for (bool ResponseSuccess : ParsedResult.Success) - { - CHECK(ResponseSuccess); - } - CHECK(ParsedResult.Details.empty()); - } - - auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { - CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - CHECK(Value.RawSize == PayloadSize); - } - } - }; - - // Check that the records are present and unchanged in the local server - CheckRecordCorrectness(LocalCfg); - - // Check that the records are present and unchanged in the upstream server - CheckRecordCorrectness(UpstreamCfg); - } - - // TODO: Propagation for rejected PUTs - // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to - // upstream") - // { - // using namespace utils; - - // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - // ZenServerInstance UpstreamServer(TestEnv); - // SpawnServer(UpstreamServer, UpstreamCfg); - - // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, - // "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); - - // size_t PayloadSize = 1024; - // std::string_view Namespace("ue4.ddc"sv); - // std::string_view Bucket("mastodon"sv); - // const size_t NumRecords = 4; - // std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, - // CachePolicy::Local); - - // for (const zen::CacheKey& CacheKey : Keys) - // { - // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); - - // CbPackage Package; - // CHECK(Request.Format(Package)); - - // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, - // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, - // cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - - // CHECK(Result.status_code == 200); - // cacherequests::PutCacheRecordsResult ParsedResult; - // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); - // CHECK(!Response.IsNull()); - // CHECK(ParsedResult.Parse(Response)); - // for (bool ResponseSuccess : ParsedResult.Success) - // { - // CHECK(!ResponseSuccess); - // } - // } - - // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) { - // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); - - // CHECK(Result.Result.Results.size() == Keys.size()); - - // for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - // { - // CHECK(Record); - // const CacheKey& ExpectedKey = Keys[Index++]; - // CHECK(Record->Key == ExpectedKey); - // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - // { - // CHECK(Value.RawSize == ExpectedPayloadSize); - // } - // } - // }; - - // // Check that the records are present and not overwritten in the local server - // CheckRecordCorrectness(LocalCfg, PayloadSize); - - // // Check that the records are present and are the newer size in the upstream server - // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2); - // } - - SUBCASE("RpcAcceptOptions") - { - using namespace utils; - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - - const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); - const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); - - std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); - std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size()); - - std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end()); - Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); - - { - GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(!IsFileRef); - } - } - } - - // File path, but only for large files - { - GetCacheRecordResult Result = - GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef == (Body.Size() > 1024)); - } - } - } - - // File path, for all files - { - GetCacheRecordResult Result = - GetCacheRecords(BaseUri, - "ue4.ddc"sv, - Keys, - CachePolicy::Default, - RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef); - } - } - } - - // File handle, but only for large files - { - GetCacheRecordResult Result = GetCacheRecords(BaseUri, - "ue4.ddc"sv, - Keys, - CachePolicy::Default, - RpcAcceptOptions::kAllowLocalReferences, - GetCurrentProcessId()); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef == (Body.Size() > 1024)); - } - } - } - - // File handle, for all files - { - GetCacheRecordResult Result = - GetCacheRecords(BaseUri, - "ue4.ddc"sv, - Keys, - CachePolicy::Default, - RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences, - GetCurrentProcessId()); - - CHECK(Result.Result.Results.size() == Keys.size()); - - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - CHECK(Record); - const CacheKey& ExpectedKey = Keys[Index++]; - CHECK(Record->Key == ExpectedKey); - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); - IoBufferFileReference Ref; - bool IsFileRef = Body.GetFileReference(Ref); - CHECK(IsFileRef); - } - } - } - } -} - -TEST_CASE("zcache.failing.upstream") -{ - // This is an exploratory test that takes a long time to run, so lets skip it by default - if (true) - { - return; - } - - using namespace std::literals; - using namespace utils; - - ZenConfig Upstream1Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Upstream1Server(TestEnv); - SpawnServer(Upstream1Server, Upstream1Cfg); - - ZenConfig Upstream2Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Upstream2Server(TestEnv); - SpawnServer(Upstream2Server, Upstream2Cfg); - - std::vector<std::uint16_t> UpstreamPorts = {Upstream1Cfg.Port, Upstream2Cfg.Port}; - ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(TestEnv.GetNewPortNumber(), UpstreamPorts, false); - LocalCfg.Args += (" --upstream-thread-count 2"); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - const uint16_t LocalPortNumber = LocalCfg.Port; - const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1Cfg.Port); - const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2Cfg.Port); - - bool Upstream1Running = true; - bool Upstream2Running = true; - - using namespace std::literals; - - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - const zen::CacheKey& CacheKey, - size_t PayloadSize, - CachePolicy RecordPolicy) { - std::vector<uint32_t> Data; - Data.resize(PayloadSize / 4); - for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) - { - Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; - } - - CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); - Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); - }; - - auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, - std::string_view Namespace, - std::string_view Bucket, - size_t Num, - size_t KeyOffset, - size_t PayloadSize = 8192) -> std::vector<CacheKey> { - std::vector<zen::CacheKey> OutKeys; - - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - for (size_t Key = 1; Key <= Num; ++Key) - { - zen::IoHash KeyHash; - ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); - - AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); - OutKeys.push_back(CacheKey); - } - - CbPackage Package; - CHECK(Request.Format(Package)); - - HttpClient Http{BaseUri}; - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - if (Result.StatusCode != HttpResponseCode::OK) - { - ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); - OutKeys.clear(); - } - - return OutKeys; - }; - - struct GetCacheRecordResult - { - zen::CbPackage Response; - cacherequests::GetCacheRecordsResult Result; - bool Success = false; - }; - - auto GetCacheRecords = [](std::string_view BaseUri, - std::string_view Namespace, - std::span<zen::CacheKey> Keys, - zen::CachePolicy Policy) -> GetCacheRecordResult { - cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = Policy, - .Namespace = std::string(Namespace)}; - for (const CacheKey& Key : Keys) - { - Request.Requests.push_back({.Key = Key}); - } - - CbObjectWriter RequestWriter; - CHECK(Request.Format(RequestWriter)); - - IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbObject); - - HttpClient Http{BaseUri}; - - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - GetCacheRecordResult OutResult; - - if (Result.StatusCode == HttpResponseCode::OK) - { - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - if (!Response.IsNull()) - { - OutResult.Response = std::move(Response); - CHECK(OutResult.Result.Parse(OutResult.Response)); - OutResult.Success = true; - } - } - else - { - ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); - } - - return OutResult; - }; - - // Populate with some simple data - - CachePolicy Policy = CachePolicy::Default; - - const size_t ThreadCount = 128; - const size_t KeyMultiplier = 16384; - const size_t RecordsPerRequest = 64; - WorkerThreadPool Pool(ThreadCount); - - std::atomic_size_t Completed = 0; - - auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier]; - RwLock KeysLock; - - for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) - { - size_t Iteration = I; - Pool.ScheduleWork( - [&] { - std::vector<CacheKey> NewKeys = - PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); - if (NewKeys.size() != RecordsPerRequest) - { - ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); - Completed.fetch_add(1); - return; - } - { - RwLock::ExclusiveLockScope _(KeysLock); - Keys[Iteration].swap(NewKeys); - } - Completed.fetch_add(1); - }, - WorkerThreadPool::EMode::EnableBacklog); - } - bool UseUpstream1 = false; - while (Completed < ThreadCount * KeyMultiplier) - { - Sleep(8000); - - if (UseUpstream1) - { - if (Upstream2Running) - { - Upstream2Server.EnableTermination(); - Upstream2Server.Shutdown(); - Sleep(100); - Upstream2Running = false; - } - if (!Upstream1Running) - { - SpawnServer(Upstream1Server, Upstream1Cfg); - Upstream1Running = true; - } - UseUpstream1 = !UseUpstream1; - } - else - { - if (Upstream1Running) - { - Upstream1Server.EnableTermination(); - Upstream1Server.Shutdown(); - Sleep(100); - Upstream1Running = false; - } - if (!Upstream2Running) - { - SpawnServer(Upstream2Server, Upstream2Cfg); - Upstream2Running = true; - } - UseUpstream1 = !UseUpstream1; - } - } - - Completed = 0; - for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) - { - size_t Iteration = I; - std::vector<CacheKey>& LocalKeys = Keys[Iteration]; - if (LocalKeys.empty()) - { - Completed.fetch_add(1); - continue; - } - Pool.ScheduleWork( - [&] { - GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); - - if (!Result.Success) - { - ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); - Completed.fetch_add(1); - return; - } - - if (Result.Result.Results.size() != LocalKeys.size()) - { - ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); - Completed.fetch_add(1); - return; - } - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const CacheKey& ExpectedKey = LocalKeys[Index++]; - if (!Record) - { - continue; - } - if (Record->Key != ExpectedKey) - { - continue; - } - if (Record->Values.size() != 1) - { - continue; - } - - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) - { - if (!Value.Body) - { - continue; - } - } - } - Completed.fetch_add(1); - }, - WorkerThreadPool::EMode::EnableBacklog); - } - while (Completed < ThreadCount * KeyMultiplier) - { - Sleep(10); - } -} - -TEST_CASE("zcache.rpc.partialchunks") -{ - using namespace std::literals; - using namespace utils; - - ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Server(TestEnv); - SpawnServer(Server, LocalCfg); - - std::vector<CompressedBuffer> Attachments; - - const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort()); - - auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey { - IoHash KeyHash; - ((size_t*)(KeyHash.Hash))[0] = KeyIndex; - return CacheKey::Create(Bucket, KeyHash); - }; - - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - const CacheKey& CacheKey, - size_t AttachmentCount, - size_t AttachmentsSize, - CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> { - std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers; - std::vector<cacherequests::PutCacheRecordRequestValue> Attachments; - for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++) - { - CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize); - AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value)); - Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)}); - } - Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy}); - return AttachmentBuffers; - }; - - auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey]( - std::string_view BaseUri, - std::string_view Namespace, - std::string_view Bucket, - size_t KeyOffset, - size_t Num, - size_t AttachmentCount, - size_t AttachmentsSize = - 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> { - std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys; - - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; - for (size_t Key = 1; Key <= Num; ++Key) - { - const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key); - std::vector<std::pair<Oid, CompressedBuffer>> Attachments = - AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default); - Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments))); - } - - CbPackage Package; - CHECK(Request.Format(Package)); - - HttpClient Http{BaseUri}; - - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - if (Result.StatusCode != HttpResponseCode::OK) - { - ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); - Keys.clear(); - } - - return Keys; - }; - - std::string_view TestBucket = "partialcachevaluetests"sv; - std::string_view TestNamespace = "ue4.ddc"sv; - auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u); - CHECK(RecordsWithSmallAttachments.size() == 3); - auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u); - CHECK(RecordsWithLargeAttachments.size() == 1); - - struct PartialOptions - { - uint64_t Offset = 0ull; - uint64_t Size = ~0ull; - RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone; - }; - - auto GetCacheChunk = [](std::string_view BaseUri, - std::string_view Namespace, - const CacheKey& Key, - const Oid& ValueId, - const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult { - cacherequests::GetCacheChunksRequest Request = { - .AcceptMagic = kCbPkgMagic, - .AcceptOptions = (uint16_t)Options.AcceptOptions, - .Namespace = std::string(Namespace), - .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}}; - CbPackage Package; - CHECK(Request.Format(Package)); - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - - HttpClient Http{BaseUri}; - - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - - CHECK(Result.StatusCode == HttpResponseCode::OK); - - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); - cacherequests::GetCacheChunksResult GetCacheChunksResult; - CHECK(GetCacheChunksResult.Parse(Response)); - return GetCacheChunksResult; - }; - - auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri, - std::string_view Namespace, - const CacheKey& Key, - const Oid& ChunkId, - const CompressedBuffer& VerifyData, - const PartialOptions& Options = {}) { - cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options); - CHECK(Result.Results.size() == 1); - bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks); - if (!CanGetPartial) - { - CHECK(Result.Results[0].FragmentOffset == 0); - CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize()); - } - IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer(); - IoBuffer ReceivedDecompressed = - Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].FragmentOffset, Options.Size).AsIoBuffer(); - CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView())); - }; - - GetAndVerifyChunk(BaseUri, - TestNamespace, - RecordsWithSmallAttachments[0].first, - RecordsWithSmallAttachments[0].second[0].first, - RecordsWithSmallAttachments[0].second[0].second); - GetAndVerifyChunk(BaseUri, - TestNamespace, - RecordsWithSmallAttachments[0].first, - RecordsWithSmallAttachments[0].second[0].first, - RecordsWithSmallAttachments[0].second[0].second, - PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); - GetAndVerifyChunk( - BaseUri, - TestNamespace, - RecordsWithSmallAttachments[0].first, - RecordsWithSmallAttachments[0].second[0].first, - RecordsWithSmallAttachments[0].second[0].second, - PartialOptions{.Offset = 378, - .Size = 519, - .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); - GetAndVerifyChunk(BaseUri, - TestNamespace, - RecordsWithLargeAttachments[0].first, - RecordsWithLargeAttachments[0].second[0].first, - RecordsWithLargeAttachments[0].second[0].second, - PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); - GetAndVerifyChunk(BaseUri, - TestNamespace, - RecordsWithLargeAttachments[0].first, - RecordsWithLargeAttachments[0].second[0].first, - RecordsWithLargeAttachments[0].second[0].second, - PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u}); - GetAndVerifyChunk( - BaseUri, - TestNamespace, - RecordsWithLargeAttachments[0].first, - RecordsWithLargeAttachments[0].second[0].first, - RecordsWithLargeAttachments[0].second[0].second, - PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); - GetAndVerifyChunk( - BaseUri, - TestNamespace, - RecordsWithLargeAttachments[0].first, - RecordsWithLargeAttachments[0].second[0].first, - RecordsWithLargeAttachments[0].second[0].second, - PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks}); - GetAndVerifyChunk( - BaseUri, - TestNamespace, - RecordsWithLargeAttachments[0].first, - RecordsWithLargeAttachments[0].second[0].first, - RecordsWithLargeAttachments[0].second[0].second, - PartialOptions{.Offset = 1024u * 1024u, - .Size = 512u * 1024u, - .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); - GetAndVerifyChunk( - BaseUri, - TestNamespace, - RecordsWithLargeAttachments[0].first, - RecordsWithLargeAttachments[0].second[0].first, - RecordsWithLargeAttachments[0].second[0].second, - PartialOptions{.Offset = 1024u * 1024u, - .Size = 512u * 1024u, - .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences | - RpcAcceptOptions::kAllowPartialCacheChunks}); -} - -IoBuffer -FormatPackageBody(const CbPackage& Package) -{ - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - return Body; -} - -TEST_CASE("zcache.rpc.allpolicies") -{ - using namespace std::literals; - using namespace utils; - - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamServer(TestEnv); - SpawnServer(UpstreamServer, UpstreamCfg); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalServer(TestEnv); - SpawnServer(LocalServer, LocalCfg); - - const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort()); - HttpClient Http{BaseUri}; - - std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; - std::string_view TestBucket = "allpoliciestest"sv; - std::string_view TestNamespace = "ue4.ddc"sv; - - // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) - // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) - constexpr int NumKeys = 256; - constexpr int NumValues = 4; - Oid ValueIds[NumValues]; - IoHash Hash; - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - ExtendableStringBuilder<16> ValueName; - ValueName << "ValueId_"sv << ValueIndex; - static_assert(sizeof(IoHash) >= sizeof(Oid)); - ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); - } - - struct KeyData; - struct UserData - { - UserData& Set(KeyData* InKeyData, int InValueIndex) - { - Data = InKeyData; - ValueIndex = InValueIndex; - return *this; - } - KeyData* Data = nullptr; - int ValueIndex = 0; - }; - struct KeyData - { - CompressedBuffer BufferValues[NumValues]; - uint64_t IntValues[NumValues]; - UserData ValueUserData[NumValues]; - bool ReceivedChunk[NumValues]; - CacheKey Key; - UserData KeyUserData; - uint32_t KeyIndex = 0; - bool GetRequestsData = true; - bool UseValueAPI = false; - bool UseValuePolicy = false; - bool ForceMiss = false; - bool UseLocal = true; - bool UseRemote = true; - bool ShouldBeHit = true; - bool ReceivedPut = false; - bool ReceivedGet = false; - bool ReceivedPutValue = false; - bool ReceivedGetValue = false; - }; - struct CachePutRequest - { - CacheKey Key; - CbObject Record; - CacheRecordPolicy Policy; - KeyData* Values; - UserData* Data; - }; - struct CachePutValueRequest - { - CacheKey Key; - CompressedBuffer Value; - CachePolicy Policy; - UserData* Data; - }; - struct CacheGetRequest - { - CacheKey Key; - CacheRecordPolicy Policy; - UserData* Data; - }; - struct CacheGetValueRequest - { - CacheKey Key; - CachePolicy Policy; - UserData* Data; - }; - struct CacheGetChunkRequest - { - CacheKey Key; - Oid ValueId; - uint64_t RawOffset; - uint64_t RawSize; - IoHash RawHash; - CachePolicy Policy; - UserData* Data; - }; - - KeyData KeyDatas[NumKeys]; - std::vector<CachePutRequest> PutRequests; - std::vector<CachePutValueRequest> PutValueRequests; - std::vector<CacheGetRequest> GetRequests; - std::vector<CacheGetValueRequest> GetValueRequests; - std::vector<CacheGetChunkRequest> ChunkRequests; - - for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) - { - IoHashStream KeyWriter; - KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); - KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); - IoHash KeyHash = KeyWriter.GetHash(); - KeyData& KeyData = KeyDatas[KeyIndex]; - - KeyData.Key = CacheKey::Create(TestBucket, KeyHash); - KeyData.KeyIndex = KeyIndex; - KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; - KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; - KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; - KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; - KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; - KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; - KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); - CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; - SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; - CachePolicy PutPolicy = SharedPolicy; - CachePolicy GetPolicy = SharedPolicy; - GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; - CacheKey& Key = KeyData.Key; - - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32); - KeyData.BufferValues[ValueIndex] = - CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); - KeyData.ReceivedChunk[ValueIndex] = false; - } - - UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); - } - if (!KeyData.UseValueAPI) - { - CbObjectWriter Builder; - Builder.BeginObject("key"sv); - Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; - Builder.EndObject(); - Builder.BeginArray("Values"sv); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - Builder.BeginObject(); - Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); - Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash()); - Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize()); - Builder.EndObject(); - } - Builder.EndArray(); - - CacheRecordPolicy PutRecordPolicy; - CacheRecordPolicy GetRecordPolicy; - if (!KeyData.UseValuePolicy) - { - PutRecordPolicy = CacheRecordPolicy(PutPolicy); - GetRecordPolicy = CacheRecordPolicy(GetPolicy); - } - else - { - // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies - // it will use the wrong value for SkipData and fail our tests. - CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); - CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); - GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); - } - PutRecordPolicy = PutBuilder.Build(); - GetRecordPolicy = GetBuilder.Build(); - } - if (!KeyData.ForceMiss) - { - PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); - } - GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; - ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); - } - } - else - { - if (!KeyData.ForceMiss) - { - PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); - } - GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); - ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); - } - } - - // PutCacheRecords - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - Request.Requests.reserve(PutRequests.size()); - for (CachePutRequest& PutRequest : PutRequests) - { - cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back(); - RecordRequest.Key = PutRequest.Key; - RecordRequest.Policy = PutRequest.Policy; - RecordRequest.Values.reserve(NumValues); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]}); - } - PutRequest.Data->Data->ReceivedPut = true; - } - - CbPackage Package; - CHECK(Request.Format(Package)); - IoBuffer Body = FormatPackageBody(Package); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheRecords unexpectedly failed."); - } - - // PutCacheValues - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - - cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - Request.Requests.reserve(PutValueRequests.size()); - for (CachePutValueRequest& PutRequest : PutValueRequests) - { - Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy}); - PutRequest.Data->Data->ReceivedPutValue = true; - } - - CbPackage Package; - CHECK(Request.Format(Package)); - - IoBuffer Body = FormatPackageBody(Package); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheValues unexpectedly failed."); - } - - for (KeyData& KeyData : KeyDatas) - { - if (!KeyData.ForceMiss) - { - if (!KeyData.UseValueAPI) - { - CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str()); - } - else - { - CHECK_MESSAGE(KeyData.ReceivedPutValue, - WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str()); - } - } - } - - // GetCacheRecords - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - Request.Requests.reserve(GetRequests.size()); - for (CacheGetRequest& GetRequest : GetRequests) - { - Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); - } - - CbPackage Package; - CHECK(Request.Format(Package)); - IoBuffer Body = FormatPackageBody(Package); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheRecords unexpectedly failed."); - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); - cacherequests::GetCacheRecordsResult RequestResult; - CHECK(RequestResult.Parse(Response)); - CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count."); - for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results) - { - bool Succeeded = RecordResult.has_value(); - CacheGetRequest& GetRequest = GetRequests[Index++]; - KeyData* KeyData = GetRequest.Data->Data; - KeyData->ReceivedGet = true; - WriteToString<32> Name("Get(", KeyData->KeyIndex, ")"); - if (KeyData->ShouldBeHit) - { - CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); - } - else if (KeyData->ForceMiss) - { - CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str()); - } - if (!KeyData->ForceMiss && Succeeded) - { - CHECK_MESSAGE(RecordResult->Values.size() == NumValues, - WriteToString<32>(Name, " number of values did not match.").c_str()); - for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values) - { - int ExpectedValueIndex = 0; - for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) - { - if (ValueIds[ExpectedValueIndex] == Value.Id) - { - break; - } - } - CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str()); - - WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")"); - - CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; - CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(), - WriteToString<32>(ValueName, " RawHash did not match.").c_str()); - CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(), - WriteToString<32>(ValueName, " RawSize did not match.").c_str()); - - if (KeyData->GetRequestsData) - { - SharedBuffer Buffer = Value.Body.Decompress(); - CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize, - WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str()); - uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; - uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; - CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str()); - } - } - } - } - } - - // GetCacheValues - { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - - cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - GetCacheValuesRequest.Requests.reserve(GetValueRequests.size()); - for (CacheGetValueRequest& GetRequest : GetValueRequests) - { - GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); - } - - CbPackage Package; - CHECK(GetCacheValuesRequest.Format(Package)); - - IoBuffer Body = FormatPackageBody(Package); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheValues unexpectedly failed."); - IoBuffer MessageBuffer(Result.ResponsePayload); - CbPackage Response = ParsePackageMessage(MessageBuffer); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); - cacherequests::GetCacheValuesResult GetCacheValuesResult; - CHECK(GetCacheValuesResult.Parse(Response)); - for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results) - { - bool Succeeded = ValueResult.RawHash != IoHash::Zero; - CacheGetValueRequest& Request = GetValueRequests[Index++]; - KeyData* KeyData = Request.Data->Data; - KeyData->ReceivedGetValue = true; - WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv); - - if (KeyData->ShouldBeHit) - { - CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); - } - else if (KeyData->ForceMiss) - { - CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str()); - } - if (!KeyData->ForceMiss && Succeeded) - { - CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; - CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), - WriteToString<32>(Name, " RawHash did not match.").c_str()); - CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), - WriteToString<32>(Name, " RawSize did not match.").c_str()); - - if (KeyData->GetRequestsData) - { - SharedBuffer Buffer = ValueResult.Body.Decompress(); - CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, - WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); - uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; - uint64_t ExpectedIntValue = KeyData->IntValues[0]; - CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); - } - } - } - } - - // GetCacheChunks - { - std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { - return A.Key.Hash < B.Key.Hash; - }); - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic, - .DefaultPolicy = BatchDefaultPolicy, - .Namespace = std::string(TestNamespace)}; - GetCacheChunksRequest.Requests.reserve(ChunkRequests.size()); - for (CacheGetChunkRequest& ChunkRequest : ChunkRequests) - { - GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key, - .ValueId = ChunkRequest.ValueId, - .ChunkId = IoHash(), - .RawOffset = ChunkRequest.RawOffset, - .RawSize = ChunkRequest.RawSize, - .Policy = ChunkRequest.Policy}); - } - CbPackage Package; - CHECK(GetCacheChunksRequest.Format(Package)); - - IoBuffer Body = FormatPackageBody(Package); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); - CbPackage Response = ParsePackageMessage(Result.ResponsePayload); - bool Loaded = !Response.IsNull(); - CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); - cacherequests::GetCacheChunksResult GetCacheChunksResult; - CHECK(GetCacheChunksResult.Parse(Response)); - CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(), - "GetCacheChunks response count did not match request count."); - - for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results) - { - bool Succeeded = ValueResult.RawHash != IoHash::Zero; - - CacheGetChunkRequest& Request = ChunkRequests[Index++]; - KeyData* KeyData = Request.Data->Data; - int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; - KeyData->ReceivedChunk[ValueIndex] = true; - WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv); - - if (KeyData->ShouldBeHit) - { - CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str()); - } - else if (KeyData->ForceMiss) - { - CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str()); - } - if (KeyData->ShouldBeHit && Succeeded) - { - CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; - CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), - WriteToString<32>(Name, " had unexpected RawHash.").c_str()); - CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), - WriteToString<32>(Name, " had unexpected RawSize.").c_str()); - - if (KeyData->GetRequestsData) - { - SharedBuffer Buffer = ValueResult.Body.Decompress(); - CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, - WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); - uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; - uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; - CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); - } - } - } - } - - for (KeyData& KeyData : KeyDatas) - { - if (!KeyData.UseValueAPI) - { - CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); - for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) - { - CHECK_MESSAGE( - KeyData.ReceivedChunk[ValueIndex], - WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str()); - } - } - else - { - CHECK_MESSAGE(KeyData.ReceivedGetValue, - WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); - CHECK_MESSAGE(KeyData.ReceivedChunk[0], - WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); - } - } -} - -class ZenServerTestHelper -{ -public: - ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {} - ~ZenServerTestHelper() {} - - void SpawnServers(std::string_view AdditionalServerArgs = std::string_view()) - { - SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs); - } - - void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs) - { - ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount); - - m_Instances.resize(m_ServerCount); - - for (int i = 0; i < m_ServerCount; ++i) - { - auto& Instance = m_Instances[i]; - Instance = std::make_unique<ZenServerInstance>(TestEnv); - Instance->SetTestDir(TestEnv.CreateNewTestDir()); - } - - for (int i = 0; i < m_ServerCount; ++i) - { - auto& Instance = m_Instances[i]; - Callback(*Instance); - } - - for (int i = 0; i < m_ServerCount; ++i) - { - auto& Instance = m_Instances[i]; - Instance->SpawnServer(TestEnv.GetNewPortNumber(), AdditionalServerArgs); - } - - for (int i = 0; i < m_ServerCount; ++i) - { - auto& Instance = m_Instances[i]; - uint16_t PortNumber = Instance->WaitUntilReady(); - CHECK_MESSAGE(PortNumber != 0, Instance->GetLogOutput()); - } - } - - ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } - -private: - std::string m_HelperId; - int m_ServerCount = 0; - std::vector<std::unique_ptr<ZenServerInstance>> m_Instances; -}; - TEST_CASE("http.basics") { using namespace std::literals; @@ -3107,1787 +320,6 @@ TEST_CASE("http.package") CHECK_EQ(ResponsePackage, TestPackage); } -std::string -OidAsString(const Oid& Id) -{ - StringBuilder<25> OidStringBuilder; - Id.ToString(OidStringBuilder); - return OidStringBuilder.ToString(); -} - -CbPackage -CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) -{ - CbPackage Package; - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("bulkdata"); - for (const auto& Attachment : Attachments) - { - CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "type"sv - << "Standard"sv; - Object << "data"sv << Attach; - Object.EndObject(); - - Package.AddAttachment(Attach); - ZEN_DEBUG("Added attachment {}", Attach.GetHash()); - } - Object.EndArray(); - } - Package.SetObject(Object.Save()); - return Package; -}; - -CbObject -CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) -{ - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("bulkdata"); - for (const auto& Attachment : Attachments) - { - CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "type"sv - << "Standard"sv; - Object << "data"sv << Attach; - Object.EndObject(); - - ZEN_DEBUG("Added attachment {}", Attach.GetHash()); - } - Object.EndArray(); - } - return Object.Save(); -}; - -enum CbWriterMeta -{ - BeginObject, - EndObject, - BeginArray, - EndArray -}; - -inline CbWriter& -operator<<(CbWriter& Writer, CbWriterMeta Meta) -{ - switch (Meta) - { - case BeginObject: - Writer.BeginObject(); - break; - case EndObject: - Writer.EndObject(); - break; - case BeginArray: - Writer.BeginArray(); - break; - case EndArray: - Writer.EndArray(); - break; - default: - ZEN_ASSERT(false); - } - return Writer; -} - -TEST_CASE("project.remote") -{ - using namespace std::literals; - using namespace utils; - - ZenServerTestHelper Servers("remote", 3); - Servers.SpawnServers("--debug"); - - std::vector<Oid> OpIds; - const size_t OpCount = 24; - OpIds.reserve(OpCount); - for (size_t I = 0; I < OpCount; ++I) - { - OpIds.emplace_back(Oid::NewOid()); - } - - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; - { - std::vector<std::size_t> AttachmentSizes( - {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, - 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, - 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); - auto It = AttachmentSizes.begin(); - Attachments[OpIds[0]] = {}; - Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - ZEN_ASSERT(It == AttachmentSizes.end()); - } - - // Note: This is a clone of the function in projectstore.cpp - auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { - using namespace std::literals; - - XXH3_128Stream_deprecated KeyHasher; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash128 = KeyHasher.GetHash(); - - Oid KeyHash; - memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); - - return KeyHash; - }; - - auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { - const Oid Id = ComputeOpKey(Op); - IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); - const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); - Ops.insert({Id, OpCoreHash}); - }; - - auto MakeProject = [](std::string_view UrlBase, std::string_view ProjectName) { - CbObjectWriter Project; - Project.AddString("id"sv, ProjectName); - Project.AddString("root"sv, ""sv); - Project.AddString("engine"sv, ""sv); - Project.AddString("project"sv, ""sv); - Project.AddString("projectfile"sv, ""sv); - IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer(); - ProjectPayload.SetContentType(HttpContentType::kCbObject); - - HttpClient Http{UrlBase}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}", ProjectName), ProjectPayload); - CHECK(Response); - }; - - auto MakeOplog = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { - HttpClient Http{UrlBase}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{}); - CHECK(Response); - }; - - auto MakeOp = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName, const CbPackage& OpPackage) { - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - IoBuffer Body{IoBuffer::Wrap, MemOut.GetData(), MemOut.GetSize()}; - Body.SetContentType(HttpContentType::kCbPackage); - - HttpClient Http{UrlBase}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/new", ProjectName, OplogName), Body); - CHECK(Response); - }; - - MakeProject(Servers.GetInstance(0).GetBaseUri(), "proj0"); - MakeOplog(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); - - std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; - for (const Oid& OpId : OpIds) - { - CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]); - CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size()); - AddOp(OpPackage.GetObject(), SourceOps); - MakeOp(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage); - } - - std::vector<IoHash> AttachmentHashes; - AttachmentHashes.reserve(Attachments.size()); - for (const auto& AttachmentOplog : Attachments) - { - for (const auto& Attachment : AttachmentOplog.second) - { - AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash()); - } - } - - auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer { - CbObjectWriter Writer; - Write(Writer); - IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer(); - Result.MakeOwned(); - Result.SetContentType(HttpContentType::kCbObject); - return Result; - }; - - auto ValidateAttachments = - [&MakeCbObjectPayload, &AttachmentHashes, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) { - HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; - - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) { - Writer << "method"sv - << "getchunks"sv; - Writer << "chunks"sv << BeginArray; - for (const IoHash& Chunk : AttachmentHashes) - { - Writer << Chunk; - } - Writer << EndArray; // chunks - }); - - HttpClient::Response Response = - Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", Project, Oplog), Payload, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Response); - CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); - CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); - for (auto A : ResponsePackage.GetAttachments()) - { - CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash()); - } - }; - - auto ValidateOplog = [&SourceOps, &AddOp, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) { - std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps; - std::vector<CbObject> ResultingOplog; - - HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; - HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries", Project, Oplog)); - CHECK(Response); - - IoBuffer Payload(Response.ResponsePayload); - CbObject OplogResonse = LoadCompactBinaryObject(Payload); - CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView(); - - for (CbFieldView OpEntry : EntriesArray) - { - CbObjectView Core = OpEntry.AsObjectView(); - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - AddOp(Op, TargetOps); - } - CHECK(SourceOps == TargetOps); - }; - - auto HttpWaitForCompletion = [](ZenServerInstance& Server, const HttpClient::Response& Response) { - REQUIRE(Response); - const uint64_t JobId = ParseInt<uint64_t>(Response.AsText()).value_or(0); - CHECK(JobId != 0); - - HttpClient Http{Server.GetBaseUri()}; - - while (true) - { - HttpClient::Response StatusResponse = - Http.Get(fmt::format("/admin/jobs/{}", JobId), {{"Accept", ToString(ZenContentType::kCbObject)}}); - CHECK(StatusResponse); - CbObject ResponseObject = StatusResponse.AsObject(); - std::string_view Status = ResponseObject["Status"sv].AsString(); - CHECK(Status != "Aborted"sv); - if (Status == "Complete"sv) - { - return; - } - Sleep(10); - } - }; - - SUBCASE("File") - { - ScopedTemporaryDirectory TempDir; - { - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << path; - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; - - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); - HttpWaitForCompletion(Servers.GetInstance(0), Response); - } - { - MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << path; - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; - - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); - HttpWaitForCompletion(Servers.GetInstance(1), Response); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - } - - SUBCASE("File disable blocks") - { - ScopedTemporaryDirectory TempDir; - { - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - Writer << "disableblocks"sv << true; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; - - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); - HttpWaitForCompletion(Servers.GetInstance(0), Response); - } - { - MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; - - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); - HttpWaitForCompletion(Servers.GetInstance(1), Response); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - } - - SUBCASE("File force temp blocks") - { - ScopedTemporaryDirectory TempDir; - { - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - Writer << "enabletempblocks"sv << true; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); - HttpWaitForCompletion(Servers.GetInstance(0), Response); - } - { - MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "file"sv << BeginObject; - { - Writer << "path"sv << TempDir.Path().string(); - Writer << "name"sv - << "proj0_oplog0"sv; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); - HttpWaitForCompletion(Servers.GetInstance(1), Response); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - } - - SUBCASE("Zen") - { - ScopedTemporaryDirectory TempDir; - { - std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); - std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); - MakeProject(ExportTargetUri, "proj0_copy"); - MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy"); - - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "export"sv; - Writer << "params" << BeginObject; - { - Writer << "maxblocksize"sv << 3072u; - Writer << "maxchunkembedsize"sv << 1296u; - Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; - Writer << "zen"sv << BeginObject; - { - Writer << "url"sv << ExportTargetUri.substr(7); - Writer << "project" - << "proj0_copy"; - Writer << "oplog" - << "oplog0_copy"; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); - HttpWaitForCompletion(Servers.GetInstance(0), Response); - } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - - { - std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); - std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); - MakeProject(ImportTargetUri, "proj1"); - MakeOplog(ImportTargetUri, "proj1", "oplog1"); - - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { - Writer << "method"sv - << "import"sv; - Writer << "params" << BeginObject; - { - Writer << "force"sv << false; - Writer << "zen"sv << BeginObject; - { - Writer << "url"sv << ImportSourceUri.substr(7); - Writer << "project" - << "proj0_copy"; - Writer << "oplog" - << "oplog0_copy"; - } - Writer << EndObject; // "file" - } - Writer << EndObject; // "params" - }); - - HttpClient Http{Servers.GetInstance(2).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload); - HttpWaitForCompletion(Servers.GetInstance(2), Response); - } - ValidateAttachments(2, "proj1", "oplog1"); - ValidateOplog(2, "proj1", "oplog1"); - } -} - -TEST_CASE("project.rpcappendop") -{ - using namespace std::literals; - using namespace utils; - - ZenServerTestHelper Servers("remote", 2); - Servers.SpawnServers("--debug"); - - std::vector<Oid> OpIds; - const size_t OpCount = 24; - OpIds.reserve(OpCount); - for (size_t I = 0; I < OpCount; ++I) - { - OpIds.emplace_back(Oid::NewOid()); - } - - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; - { - std::vector<std::size_t> AttachmentSizes( - {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, - 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, - 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); - auto It = AttachmentSizes.begin(); - Attachments[OpIds[0]] = {}; - Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); - Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); - Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); - Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); - Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); - ZEN_ASSERT(It == AttachmentSizes.end()); - } - - // Note: This is a clone of the function in projectstore.cpp - auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { - using namespace std::literals; - - XXH3_128Stream_deprecated KeyHasher; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash128 = KeyHasher.GetHash(); - - Oid KeyHash; - memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); - - return KeyHash; - }; - - auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { - const Oid Id = ComputeOpKey(Op); - IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); - const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); - Ops.insert({Id, OpCoreHash}); - }; - - auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) { - CbObjectWriter Project; - Project.AddString("id"sv, ProjectName); - Project.AddString("root"sv, ""sv); - Project.AddString("engine"sv, ""sv); - Project.AddString("project"sv, ""sv); - Project.AddString("projectfile"sv, ""sv); - HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save()); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); - }; - - auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { - HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); - }; - auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { - HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); - return Response.AsObject(); - }; - - auto MakeOp = - [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> { - CbObjectWriter Request; - Request.AddString("method"sv, "appendops"sv); - Request.BeginArray("ops"sv); - { - Request.AddObject(Op); - } - Request.EndArray(); // "ops" - HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save()); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); - - CbObjectView ResponsePayload = Response.AsPackage().GetObject(); - CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView(); - std::vector<IoHash> Needs; - Needs.reserve(NeedArray.Num()); - for (CbFieldView NeedView : NeedArray) - { - Needs.push_back(NeedView.AsHash()); - } - return Needs; - }; - - auto SendAttachments = [](HttpClient& Client, - std::string_view ProjectName, - std::string_view OplogName, - std::span<const CompressedBuffer> Attachments, - void* ServerProcessHandle, - const std::filesystem::path& TempPath) { - CompositeBuffer PackageMessage; - { - CbPackage RequestPackage; - CbObjectWriter Request; - Request.AddString("method"sv, "putchunks"sv); - Request.AddBool("usingtmpfiles"sv, true); - Request.BeginArray("chunks"sv); - for (CompressedBuffer AttachmentPayload : Attachments) - { - if (AttachmentPayload.DecodeRawSize() > 16u * 1024u) - { - std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp"); - WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed()); - IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath); - AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment)); - } - - CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash()); - - Request.AddAttachment(Attachment); - RequestPackage.AddAttachment(Attachment); - } - Request.EndArray(); // "chunks" - RequestPackage.SetObject(Request.Save()); - - PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle)); - } - - HttpClient::Response Response = - Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); - }; - - { - HttpClient Client(Servers.GetInstance(0).GetBaseUri()); - void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); - - MakeProject(Client, "proj0"); - MakeOplog(Client, "proj0", "oplog0"); - CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); - std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); - - std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; - for (const Oid& OpId : OpIds) - { - CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); - AddOp(Op, SourceOps); - std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); - - if (!MissingAttachments.empty()) - { - CHECK(MissingAttachments.size() <= Attachments[OpId].size()); - tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); - std::vector<CompressedBuffer> PutAttachments; - for (const auto& Attachment : Attachments[OpId]) - { - CompressedBuffer Payload = Attachment.second; - const IoHash AttachmentHash = Payload.DecodeRawHash(); - if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) - { - PutAttachments.push_back(Payload); - } - } - SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); - } - } - - // Do it again, but now we should not need any attachments - - for (const Oid& OpId : OpIds) - { - CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); - AddOp(Op, SourceOps); - std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); - CHECK(MissingAttachments.empty()); - } - } - - { - HttpClient Client(Servers.GetInstance(1).GetBaseUri()); - void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk - - MakeProject(Client, "proj0"); - MakeOplog(Client, "proj0", "oplog0"); - CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); - std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); - - std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; - for (const Oid& OpId : OpIds) - { - CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); - AddOp(Op, SourceOps); - std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); - - if (!MissingAttachments.empty()) - { - CHECK(MissingAttachments.size() <= Attachments[OpId].size()); - tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); - std::vector<CompressedBuffer> PutAttachments; - for (const auto& Attachment : Attachments[OpId]) - { - CompressedBuffer Payload = Attachment.second; - const IoHash AttachmentHash = Payload.DecodeRawHash(); - if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) - { - PutAttachments.push_back(Payload); - } - } - SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); - } - } - - // Do it again, but now we should not need any attachments - - for (const Oid& OpId : OpIds) - { - CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); - AddOp(Op, SourceOps); - std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); - CHECK(MissingAttachments.empty()); - } - } -} - -std::vector<std::pair<std::filesystem::path, IoBuffer>> -GenerateFolderContent(const std::filesystem::path& RootPath) -{ - CreateDirectories(RootPath); - std::vector<std::pair<std::filesystem::path, IoBuffer>> Result; - Result.push_back(std::make_pair(RootPath / "root_blob_1.bin", CreateRandomBlob(4122))); - Result.push_back(std::make_pair(RootPath / "root_blob_2.bin", CreateRandomBlob(2122))); - - std::filesystem::path EmptyFolder(RootPath / "empty_folder"); - - std::filesystem::path FirstFolder(RootPath / "first_folder"); - CreateDirectories(FirstFolder); - Result.push_back(std::make_pair(FirstFolder / "first_folder_blob1.bin", CreateRandomBlob(22))); - Result.push_back(std::make_pair(FirstFolder / "first_folder_blob2.bin", CreateRandomBlob(122))); - - std::filesystem::path SecondFolder(RootPath / "second_folder"); - CreateDirectories(SecondFolder); - Result.push_back(std::make_pair(SecondFolder / "second_folder_blob1.bin", CreateRandomBlob(522))); - Result.push_back(std::make_pair(SecondFolder / "second_folder_blob2.bin", CreateRandomBlob(122))); - Result.push_back(std::make_pair(SecondFolder / "second_folder_blob3.bin", CreateRandomBlob(225))); - - std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); - CreateDirectories(SecondFolderChild); - Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob1.bin", CreateRandomBlob(622))); - - for (const auto& It : Result) - { - WriteFile(It.first, It.second); - } - - return Result; -} - -std::vector<std::pair<std::filesystem::path, IoBuffer>> -GenerateFolderContent2(const std::filesystem::path& RootPath) -{ - std::vector<std::pair<std::filesystem::path, IoBuffer>> Result; - Result.push_back(std::make_pair(RootPath / "root_blob_3.bin", CreateRandomBlob(312))); - std::filesystem::path FirstFolder(RootPath / "first_folder"); - Result.push_back(std::make_pair(FirstFolder / "first_folder_blob3.bin", CreateRandomBlob(722))); - std::filesystem::path SecondFolder(RootPath / "second_folder"); - std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); - Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob2.bin", CreateRandomBlob(962))); - Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob3.bin", CreateRandomBlob(561))); - - for (const auto& It : Result) - { - WriteFile(It.first, It.second); - } - - return Result; -} - -TEST_CASE("workspaces.create") -{ - using namespace std::literals; - - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - ZenServerInstance Instance(TestEnv); - Instance.SetTestDir(TestDir); - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady( - fmt::format("--workspaces-enabled --workspaces-allow-changes --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - ScopedTemporaryDirectory TempDir; - std::filesystem::path Root1Path = TempDir.Path() / "root1"; - std::filesystem::path Root2Path = TempDir.Path() / "root2"; - DeleteDirectories(Root1Path); - DeleteDirectories(Root2Path); - - std::filesystem::path Share1Path = "shared_1"; - std::filesystem::path Share2Path = "shared_2"; - CreateDirectories(Root1Path / Share1Path); - CreateDirectories(Root1Path / Share2Path); - CreateDirectories(Root2Path / Share1Path); - CreateDirectories(Root2Path / Share2Path); - - Oid Root1Id = Oid::Zero; - Oid Root2Id = Oid::NewOid(); - - HttpClient Client(Instance.GetBaseUri()); - - CHECK(Client.Put(fmt::format("/ws/{}", Root1Id)).StatusCode == HttpResponseCode::BadRequest); - - if (HttpClient::Response Root1Response = - Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); - Root1Response.StatusCode == HttpResponseCode::Created) - { - Root1Id = Oid::TryFromHexString(Root1Response.AsText()); - CHECK(Root1Id != Oid::Zero); - } - else - { - CHECK(false); - } - if (HttpClient::Response Root1Response = - Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); - Root1Response.StatusCode == HttpResponseCode::OK) - { - CHECK(Root1Id == Oid::TryFromHexString(Root1Response.AsText())); - } - else - { - CHECK(false); - } - if (HttpClient::Response Root1Response = - Client.Put(fmt::format("/ws/{}", Root1Id), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); - Root1Response.StatusCode == HttpResponseCode::OK) - { - CHECK(Root1Id == Oid::TryFromHexString(Root1Response.AsText())); - } - else - { - CHECK(false); - } - CHECK(Client.Put(fmt::format("/ws/{}", Root1Id), HttpClient::KeyValueMap{{"root_path", Root2Path.string()}}).StatusCode == - HttpResponseCode::Conflict); - - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root1Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == - HttpResponseCode::Created); - - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == - HttpResponseCode::NotFound); - - CHECK(Client.Put(fmt::format("/ws/{}", Root2Id), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}).StatusCode == - HttpResponseCode::Conflict); - - if (HttpClient::Response Root2Response = - Client.Put(fmt::format("/ws/{}", Root2Id), HttpClient::KeyValueMap{{"root_path", Root2Path.string()}}); - Root2Response.StatusCode == HttpResponseCode::Created) - { - CHECK(Root2Id == Oid::TryFromHexString(Root2Response.AsText())); - } - else - { - CHECK(false); - } - - CHECK(Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero)).StatusCode == HttpResponseCode::BadRequest); - - Oid Share2Id = Oid::Zero; - if (HttpClient::Response Share2Response = - Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}); - Share2Response.StatusCode == HttpResponseCode::Created) - { - Share2Id = Oid::TryFromHexString(Share2Response.AsText()); - CHECK(Share2Id != Oid::Zero); - } - else - { - CHECK(false); - } - - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == - HttpResponseCode::OK); - - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == - HttpResponseCode::OK); - - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share1Path.string()}}).StatusCode == - HttpResponseCode::Conflict); - - CHECK(Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::NewOid()), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}) - .StatusCode == HttpResponseCode::Conflict); - - CHECK(Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", "idonotexist"}}).StatusCode != - HttpResponseCode::OK); - - while (true) - { - std::error_code Ec; - DeleteDirectories(Root2Path / Share2Path, Ec); - if (!Ec) - break; - } - - CHECK(Client.Get(fmt::format("/ws/{}/{}/files", Root2Id, Share2Id)).StatusCode == HttpResponseCode::NotFound); -} - -TEST_CASE("workspaces.restricted") -{ - using namespace std::literals; - - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - ZenServerInstance Instance(TestEnv); - Instance.SetTestDir(TestDir); - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - ScopedTemporaryDirectory TempDir; - std::filesystem::path Root1Path = TempDir.Path() / "root1"; - std::filesystem::path Root2Path = TempDir.Path() / "root2"; - DeleteDirectories(Root1Path); - DeleteDirectories(Root2Path); - - std::filesystem::path Share1Path = "shared_1"; - std::filesystem::path Share2Path = "shared_2"; - CreateDirectories(Root1Path / Share1Path); - CreateDirectories(Root1Path / Share2Path); - CreateDirectories(Root2Path / Share1Path); - CreateDirectories(Root2Path / Share2Path); - - Oid Root1Id = Oid::NewOid(); - Oid Root2Id = Oid::NewOid(); - Oid Share1Id = Oid::NewOid(); - Oid Share2Id = Oid::NewOid(); - - HttpClient Client(Instance.GetBaseUri()); - CHECK(Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}).StatusCode == - HttpResponseCode::Unauthorized); - - CHECK_EQ(Client.Get(fmt::format("/ws/{}", Root1Id)).StatusCode, HttpResponseCode::NotFound); - - std::string Config1; - { - CbObjectWriter Config; - Config.BeginArray("workspaces"); - Config.BeginObject(); - Config << "id"sv << Root1Id.ToString(); - Config << "root_path"sv << Root1Path.string(); - Config << "allow_share_creation_from_http"sv << false; - Config.EndObject(); - Config.EndArray(); - ExtendableStringBuilder<256> SB; - CompactBinaryToJson(Config.Save(), SB); - Config1 = SB.ToString(); - } - WriteFile(SystemRootPath / "workspaces" / "config.json", IoBuffer(IoBuffer::Wrap, Config1.data(), Config1.size())); - - CHECK(IsHttpSuccessCode(Client.Get("/ws/refresh").StatusCode)); - - CHECK_EQ(Client.Get(fmt::format("/ws/{}", Root1Id)).StatusCode, HttpResponseCode::OK); - - CHECK(Client.Get(fmt::format("/ws/{}/{}", Root1Id, Share1Id)).StatusCode == HttpResponseCode::NotFound); - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root1Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share1Path.string()}}).StatusCode == - HttpResponseCode::Unauthorized); - - std::string Config2; - { - CbObjectWriter Config; - Config.BeginArray("workspaces"); - Config.BeginObject(); - Config << "id"sv << Root1Id.ToString(); - Config << "root_path"sv << Root1Path.string(); - Config << "allow_share_creation_from_http"sv << false; - Config.EndObject(); - Config.BeginObject(); - Config << "id"sv << Root2Id.ToString(); - Config << "root_path"sv << Root2Path.string(); - Config << "allow_share_creation_from_http"sv << true; - Config.EndObject(); - Config.EndArray(); - ExtendableStringBuilder<256> SB; - CompactBinaryToJson(Config.Save(), SB); - Config2 = SB.ToString(); - } - WriteFile(SystemRootPath / "workspaces" / "config.json", IoBuffer(IoBuffer::Wrap, Config2.data(), Config2.size())); - - CHECK(IsHttpSuccessCode(Client.Get("/ws/refresh").StatusCode)); - - CHECK_EQ(Client.Get(fmt::format("/ws/{}", Root2Id)).StatusCode, HttpResponseCode::OK); - - CHECK(Client.Get(fmt::format("/ws/{}/{}", Root2Id, Share2Id)).StatusCode == HttpResponseCode::NotFound); - CHECK( - Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == - HttpResponseCode::Created); - CHECK(Client.Get(fmt::format("/ws/{}/{}", Root2Id, Share2Id)).StatusCode == HttpResponseCode::OK); - - CHECK(IsHttpSuccessCode(Client.Delete(fmt::format("/ws/{}/{}", Root2Id, Share2Id)).StatusCode)); -} - -TEST_CASE("workspaces.lifetimes") -{ - using namespace std::literals; - - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - - Oid WorkspaceId = Oid::NewOid(); - Oid ShareId = Oid::NewOid(); - - ScopedTemporaryDirectory TempDir; - std::filesystem::path RootPath = TempDir.Path(); - DeleteDirectories(RootPath); - std::filesystem::path SharePath = RootPath / "shared_folder"; - CreateDirectories(SharePath); - - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - ZenServerInstance Instance(TestEnv); - Instance.SetTestDir(TestDir); - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady( - fmt::format("--workspaces-enabled --workspaces-allow-changes --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri()); - CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode == - HttpResponseCode::Created); - CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId); - CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode == - HttpResponseCode::OK); - - CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", "shared_folder"}}) - .StatusCode == HttpResponseCode::Created); - CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId); - CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", "shared_folder"}}) - .StatusCode == HttpResponseCode::OK); - } - - // Restart - - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - ZenServerInstance Instance(TestEnv); - Instance.SetTestDir(TestDir); - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri()); - CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId); - - CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId); - } - - // Wipe system config - DeleteDirectories(SystemRootPath); - - // Restart - - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - ZenServerInstance Instance(TestEnv); - Instance.SetTestDir(TestDir); - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri()); - CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); - CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).StatusCode == HttpResponseCode::NotFound); - } -} - -TEST_CASE("workspaces.share") -{ - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady( - fmt::format("--workspaces-enabled --workspaces-allow-changes --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - ScopedTemporaryDirectory TempDir; - std::filesystem::path RootPath = TempDir.Path(); - DeleteDirectories(RootPath); - std::filesystem::path SharePath = RootPath / "shared_folder"; - GenerateFolderContent(SharePath); - - HttpClient Client(Instance.GetBaseUri()); - - Oid WorkspaceId = Oid::NewOid(); - CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode == - HttpResponseCode::Created); - CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId); - - Oid ShareId = Oid::NewOid(); - CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", "shared_folder"}}).StatusCode == - HttpResponseCode::Created); - CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId); - - CHECK(Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId)).AsObject()["files"sv].AsArrayView().Num() == 8); - GenerateFolderContent2(SharePath); - CHECK(Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId)).AsObject()["files"sv].AsArrayView().Num() == 8); - HttpClient::Response FilesResponse = - Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId), - {}, - HttpClient::KeyValueMap{{"refresh", ToString(true)}, {"fieldnames", "id,clientpath,size"}}); - CHECK(FilesResponse); - std::unordered_map<Oid, std::pair<std::filesystem::path, uint64_t>, Oid::Hasher> Files; - { - CbArrayView FilesArray = FilesResponse.AsObject()["files"sv].AsArrayView(); - CHECK(FilesArray.Num() == 12); - for (CbFieldView Field : FilesArray) - { - CbObjectView FileObject = Field.AsObjectView(); - Oid ChunkId = FileObject["id"sv].AsObjectId(); - CHECK(ChunkId != Oid::Zero); - uint64_t Size = FileObject["size"sv].AsUInt64(); - std::u8string_view Path = FileObject["clientpath"sv].AsU8String(); - std::filesystem::path AbsFilePath = SharePath / Path; - CHECK(IsFile(AbsFilePath)); - CHECK(FileSizeFromPath(AbsFilePath) == Size); - Files.insert_or_assign(ChunkId, std::make_pair(AbsFilePath, Size)); - } - } - - HttpClient::Response EntriesResponse = - Client.Get(fmt::format("/ws/{}/{}/entries", WorkspaceId, ShareId), {}, HttpClient::KeyValueMap{{"fieldfilter", "id,clientpath"}}); - CHECK(EntriesResponse); - { - CbArrayView EntriesArray = EntriesResponse.AsObject()["entries"sv].AsArrayView(); - CHECK(EntriesArray.Num() == 1); - for (CbFieldView EntryField : EntriesArray) - { - CbObjectView EntryObject = EntryField.AsObjectView(); - CbArrayView FilesArray = EntryObject["files"sv].AsArrayView(); - CHECK(FilesArray.Num() == 12); - for (CbFieldView FileField : FilesArray) - { - CbObjectView FileObject = FileField.AsObjectView(); - Oid ChunkId = FileObject["id"sv].AsObjectId(); - CHECK(ChunkId != Oid::Zero); - std::u8string_view Path = FileObject["clientpath"sv].AsU8String(); - std::filesystem::path AbsFilePath = SharePath / Path; - CHECK(IsFile(AbsFilePath)); - } - } - } - - HttpClient::Response FileManifestResponse = - Client.Get(fmt::format("/ws/{}/{}/entries", WorkspaceId, ShareId), - {}, - HttpClient::KeyValueMap{{"opkey", "file_manifest"}, {"fieldfilter", "id,clientpath"}}); - CHECK(FileManifestResponse); - { - CbArrayView EntriesArray = FileManifestResponse.AsObject()["entry"sv].AsObjectView()["files"sv].AsArrayView(); - CHECK(EntriesArray.Num() == 12); - for (CbFieldView Field : EntriesArray) - { - CbObjectView FileObject = Field.AsObjectView(); - Oid ChunkId = FileObject["id"sv].AsObjectId(); - CHECK(ChunkId != Oid::Zero); - std::u8string_view Path = FileObject["clientpath"sv].AsU8String(); - std::filesystem::path AbsFilePath = SharePath / Path; - CHECK(IsFile(AbsFilePath)); - } - } - - for (auto It : Files) - { - const Oid& ChunkId = It.first; - const std::filesystem::path& Path = It.second.first; - const uint64_t Size = It.second.second; - - CHECK(Client.Get(fmt::format("/ws/{}/{}/{}/info", WorkspaceId, ShareId, ChunkId)).AsObject()["size"sv].AsUInt64() == Size); - - { - IoBuffer Payload = Client.Get(fmt::format("/ws/{}/{}/{}", WorkspaceId, ShareId, ChunkId)).ResponsePayload; - CHECK(Payload); - CHECK(Payload.GetSize() == Size); - IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path); - CHECK(FileContent); - CHECK(FileContent.GetView().EqualBytes(Payload.GetView())); - } - - { - IoBuffer Payload = - Client - .Get(fmt::format("/ws/{}/{}/{}", WorkspaceId, ShareId, ChunkId), - {}, - HttpClient::KeyValueMap{{"offset", fmt::format("{}", Size / 4)}, {"size", fmt::format("{}", Size / 2)}}) - .ResponsePayload; - CHECK(Payload); - CHECK(Payload.GetSize() == Size / 2); - IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path, Size / 4, Size / 2); - CHECK(FileContent); - CHECK(FileContent.GetView().EqualBytes(Payload.GetView())); - } - } - - { - uint32_t CorrelationId = gsl::narrow<uint32_t>(Files.size()); - std::vector<RequestChunkEntry> BatchEntries; - for (auto It : Files) - { - const Oid& ChunkId = It.first; - const uint64_t Size = It.second.second; - - BatchEntries.push_back( - RequestChunkEntry{.ChunkId = ChunkId, .CorrelationId = --CorrelationId, .Offset = Size / 4, .RequestBytes = Size / 2}); - } - IoBuffer BatchResponse = - Client.Post(fmt::format("/ws/{}/{}/batch", WorkspaceId, ShareId), BuildChunkBatchRequest(BatchEntries)).ResponsePayload; - CHECK(BatchResponse); - std::vector<IoBuffer> BatchResult = ParseChunkBatchResponse(BatchResponse); - CHECK(BatchResult.size() == Files.size()); - for (const RequestChunkEntry& Request : BatchEntries) - { - IoBuffer Result = BatchResult[Request.CorrelationId]; - auto It = Files.find(Request.ChunkId); - const std::filesystem::path& Path = It->second.first; - CHECK(Result.GetSize() == Request.RequestBytes); - IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path, Request.Offset, Request.RequestBytes); - CHECK(FileContent); - CHECK(FileContent.GetView().EqualBytes(Result.GetView())); - } - } - - CHECK(Client.Delete(fmt::format("/ws/{}/{}", WorkspaceId, ShareId))); - CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).StatusCode == HttpResponseCode::NotFound); - CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId))); - - CHECK(Client.Delete(fmt::format("/ws/{}", WorkspaceId))); - CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); -} - -TEST_CASE("buildstore.blobs") -{ - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); }); - - std::string_view Namespace = "ns"sv; - std::string_view Bucket = "bkt"sv; - Oid BuildId = Oid::NewOid(); - - std::vector<IoHash> CompressedBlobsHashes; - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri() + "/builds/"); - - for (size_t I = 0; I < 5; I++) - { - IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); - CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); - CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); - IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCompressedBinary); - - HttpClient::Response Result = - Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload); - CHECK(Result); - } - - for (const IoHash& RawHash : CompressedBlobsHashes) - { - HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), - HttpClient::Accept(ZenContentType::kCompressedBinary)); - CHECK(Result); - IoBuffer Payload = Result.ResponsePayload; - CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer CompressedBlob = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); - CHECK(CompressedBlob); - CHECK(VerifyRawHash == RawHash); - IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); - CHECK(IoHash::HashBuffer(Decompressed) == RawHash); - } - } - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri() + "/builds/"); - - for (const IoHash& RawHash : CompressedBlobsHashes) - { - HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), - HttpClient::Accept(ZenContentType::kCompressedBinary)); - CHECK(Result); - IoBuffer Payload = Result.ResponsePayload; - CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer CompressedBlob = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); - CHECK(CompressedBlob); - CHECK(VerifyRawHash == RawHash); - IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); - CHECK(IoHash::HashBuffer(Decompressed) == RawHash); - } - - for (size_t I = 0; I < 5; I++) - { - IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); - CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); - CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); - IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCompressedBinary); - - HttpClient::Response Result = - Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload); - CHECK(Result); - } - } - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri() + "/builds/"); - - for (const IoHash& RawHash : CompressedBlobsHashes) - { - HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), - HttpClient::Accept(ZenContentType::kCompressedBinary)); - CHECK(Result); - IoBuffer Payload = Result.ResponsePayload; - CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer CompressedBlob = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); - CHECK(CompressedBlob); - CHECK(VerifyRawHash == RawHash); - IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); - CHECK(IoHash::HashBuffer(Decompressed) == RawHash); - } - } -} - -namespace { - CbObject MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) - { - CbObjectWriter Writer; - Writer.AddHash("rawHash"sv, BlobHash); - Writer.BeginObject("values"); - { - for (const auto& V : KeyValues) - { - Writer.AddString(V.first, V.second); - } - } - Writer.EndObject(); // values - return Writer.Save(); - }; - -} // namespace - -TEST_CASE("buildstore.metadata") -{ - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); }); - - std::string_view Namespace = "ns"sv; - std::string_view Bucket = "bkt"sv; - Oid BuildId = Oid::NewOid(); - - std::vector<IoHash> BlobHashes; - std::vector<CbObject> Metadatas; - std::vector<IoHash> MetadataHashes; - - auto GetMetadatas = - [](HttpClient& Client, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, std::vector<IoHash> BlobHashes) { - CbObjectWriter Request; - - Request.BeginArray("blobHashes"sv); - for (const IoHash& BlobHash : BlobHashes) - { - Request.AddHash(BlobHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - - HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/getBlobMetadata", Namespace, Bucket, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - CHECK(Result); - - std::vector<CbObject> ResultMetadatas; - - CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload); - CbObject ResponseObject = ResponsePackage.GetObject(); - - CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); - CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); - ResultMetadatas.reserve(MetadatasArray.Num()); - auto BlobHashesIt = BlobHashes.begin(); - auto BlobHashArrayIt = begin(BlobHashArray); - auto MetadataArrayIt = begin(MetadatasArray); - while (MetadataArrayIt != end(MetadatasArray)) - { - const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); - while (BlobHash != *BlobHashesIt) - { - ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); - BlobHashesIt++; - } - - ZEN_ASSERT(BlobHash == *BlobHashesIt); - - const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); - const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); - ZEN_ASSERT(MetaAttachment); - - CbObject Metadata = MetaAttachment->AsObject(); - ResultMetadatas.emplace_back(std::move(Metadata)); - - BlobHashArrayIt++; - MetadataArrayIt++; - BlobHashesIt++; - } - return ResultMetadatas; - }; - - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri() + "/builds/"); - - const size_t BlobCount = 5; - - for (size_t I = 0; I < BlobCount; I++) - { - BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); - Metadatas.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); - MetadataHashes.push_back(IoHash::HashBuffer(Metadatas.back().GetBuffer().AsIoBuffer())); - } - - { - CbPackage RequestPackage; - std::vector<CbAttachment> Attachments; - tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; - Attachments.reserve(BlobCount); - AttachmentHashes.reserve(BlobCount); - { - CbObjectWriter RequestWriter; - RequestWriter.BeginArray("blobHashes"); - for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++) - { - RequestWriter.AddHash(BlobHashes[BlockHashIndex]); - } - RequestWriter.EndArray(); // blobHashes - - RequestWriter.BeginArray("metadatas"); - for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++) - { - const IoHash ObjectHash = Metadatas[BlockHashIndex].GetHash(); - RequestWriter.AddBinaryAttachment(ObjectHash); - if (!AttachmentHashes.contains(ObjectHash)) - { - Attachments.push_back(CbAttachment(Metadatas[BlockHashIndex], ObjectHash)); - AttachmentHashes.insert(ObjectHash); - } - } - - RequestWriter.EndArray(); // metadatas - - RequestPackage.SetObject(RequestWriter.Save()); - } - RequestPackage.AddAttachments(Attachments); - - CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); - - HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/putBlobMetadata", Namespace, Bucket, BuildId), - RpcRequestBuffer, - ZenContentType::kCbPackage); - CHECK(Result); - } - - { - std::vector<CbObject> ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes); - - for (size_t Index = 0; Index < MetadataHashes.size(); Index++) - { - const IoHash& ExpectedHash = MetadataHashes[Index]; - IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer()); - CHECK_EQ(ExpectedHash, Hash); - } - } - } - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri() + "/builds/"); - - std::vector<CbObject> ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes); - - for (size_t Index = 0; Index < MetadataHashes.size(); Index++) - { - const IoHash& ExpectedHash = MetadataHashes[Index]; - IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer()); - CHECK_EQ(ExpectedHash, Hash); - } - } -} - -TEST_CASE("buildstore.cache") -{ - std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); - std::filesystem::path TempDir = TestEnv.CreateNewTestDir(); - auto _ = MakeGuard([&SystemRootPath, &TempDir]() { - DeleteDirectories(SystemRootPath); - DeleteDirectories(TempDir); - }); - - std::string_view Namespace = "ns"sv; - std::string_view Bucket = "bkt"sv; - Oid BuildId = Oid::NewOid(); - - std::vector<IoHash> BlobHashes; - std::vector<CbObject> Metadatas; - std::vector<IoHash> MetadataHashes; - - const size_t BlobCount = 5; - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri()); - - BuildStorageCache::Statistics Stats; - std::unique_ptr<BuildStorageCache> Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false)); - - { - IoHash NoneBlob = IoHash::HashBuffer("data", 4); - std::vector<BuildStorageCache::BlobExistsResult> NoneExists = Cache->BlobsExists(BuildId, std::vector<IoHash>{NoneBlob}); - CHECK(NoneExists.size() == 1); - CHECK(!NoneExists[0].HasBody); - CHECK(!NoneExists[0].HasMetadata); - } - - for (size_t I = 0; I < BlobCount; I++) - { - IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); - CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); - BlobHashes.push_back(CompressedBlob.DecodeRawHash()); - Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed()); - } - - Cache->Flush(500); - Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); - - { - std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); - for (size_t I = 0; I < BlobCount; I++) - { - CHECK(Exists[I].HasBody); - CHECK(!Exists[I].HasMetadata); - } - - std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(0, FetchedMetadatas.size()); - } - - { - for (size_t I = 0; I < BlobCount; I++) - { - IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]); - CHECK(BuildBlob); - CHECK_EQ(BlobHashes[I], - IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer())); - } - } - - { - for (size_t I = 0; I < BlobCount; I++) - { - CbObject Metadata = MakeMetadata(BlobHashes[I], - {{"key", fmt::format("{}", I)}, - {"key_plus_one", fmt::format("{}", I + 1)}, - {"block_hash", fmt::format("{}", BlobHashes[I])}}); - Metadatas.push_back(Metadata); - MetadataHashes.push_back(IoHash::HashBuffer(Metadata.GetBuffer().AsIoBuffer())); - } - Cache->PutBlobMetadatas(BuildId, BlobHashes, Metadatas); - } - - Cache->Flush(500); - Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); - - { - std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); - for (size_t I = 0; I < BlobCount; I++) - { - CHECK(Exists[I].HasBody); - CHECK(Exists[I].HasMetadata); - } - - std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, FetchedMetadatas.size()); - - for (size_t I = 0; I < BlobCount; I++) - { - CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); - } - } - - for (size_t I = 0; I < BlobCount; I++) - { - IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); - CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); - BlobHashes.push_back(CompressedBlob.DecodeRawHash()); - Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed()); - } - - Cache->Flush(500); - Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); - - { - std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); - for (size_t I = 0; I < BlobCount * 2; I++) - { - CHECK(Exists[I].HasBody); - CHECK_EQ(I < BlobCount, Exists[I].HasMetadata); - } - - std::vector<CbObject> MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, MetaDatas.size()); - - std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, FetchedMetadatas.size()); - - for (size_t I = 0; I < BlobCount; I++) - { - CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); - } - } - } - - { - ZenServerInstance Instance(TestEnv); - - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); - - HttpClient Client(Instance.GetBaseUri()); - - BuildStorageCache::Statistics Stats; - std::unique_ptr<BuildStorageCache> Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false)); - - std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); - for (size_t I = 0; I < BlobCount * 2; I++) - { - CHECK(Exists[I].HasBody); - CHECK_EQ(I < BlobCount, Exists[I].HasMetadata); - } - - for (size_t I = 0; I < BlobCount * 2; I++) - { - IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]); - CHECK(BuildBlob); - CHECK_EQ(BlobHashes[I], - IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer())); - } - - std::vector<CbObject> MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, MetaDatas.size()); - - std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, FetchedMetadatas.size()); - - for (size_t I = 0; I < BlobCount; I++) - { - CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); - } - } -} - # if 0 TEST_CASE("lifetime.owner") { |