diff options
| author | Stefan Boberg <[email protected]> | 2025-09-29 13:15:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-29 13:15:16 +0200 |
| commit | d4c6e547a7081b1562a69dc9839d24cb82681c5d (patch) | |
| tree | 3ffe43dcf09bb6d01c2fb860bb1f73882f44827d /src/zenserver-test/projectstore-tests.cpp | |
| parent | gracefully handle missing chunks when exporting an oplog (#526) (diff) | |
| download | zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.tar.xz zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.zip | |
split zenserver-test monolith into multiple source files (#528)
Diffstat (limited to 'src/zenserver-test/projectstore-tests.cpp')
| -rw-r--r-- | src/zenserver-test/projectstore-tests.cpp | 1055 |
1 files changed, 1055 insertions, 0 deletions
diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp new file mode 100644 index 000000000..ce3e9dcd1 --- /dev/null +++ b/src/zenserver-test/projectstore-tests.cpp @@ -0,0 +1,1055 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#if ZEN_WITH_TESTS +# include "zenserver-test.h" +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/stream.h> +# include <zencore/string.h> +# include <zencore/xxhash.h> +# include <zenhttp/packageformat.h> +# include <zenutil/zenserverprocess.h> +# include <zenhttp/httpclient.h> + +# include <tsl/robin_set.h> +# include <random> + +namespace zen::tests { + +using namespace std::literals; + +TEST_CASE("project.basic") +{ + using namespace std::literals; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + + const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); + + std::mt19937_64 mt; + + zen::StringBuilder<64> BaseUri; + BaseUri << fmt::format("http://localhost:{}", PortNumber); + + std::filesystem::path BinPath = zen::GetRunningExecutablePath(); + std::filesystem::path RootPath = BinPath.parent_path().parent_path(); + BinPath = BinPath.lexically_relative(RootPath); + + SUBCASE("build store init") + { + { + HttpClient Http{BaseUri}; + + { + zen::CbObjectWriter Body; + Body << "id" + << "test"; + Body << "root" << RootPath.c_str(); + Body << "project" + << "/zooom"; + Body << "engine" + << "/zooom"; + + zen::BinaryWriter MemOut; + IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer(); + + auto Response = Http.Post("/prj/test"sv, BodyBuf); + CHECK(Response.StatusCode == HttpResponseCode::Created); + } + + { + auto Response = Http.Get("/prj/test"sv); + CHECK(Response.StatusCode == HttpResponseCode::OK); + + CbObject ResponseObject = Response.AsObject(); + + CHECK(ResponseObject["id"].AsString() == "test"sv); + CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); + } + } + + BaseUri << "/prj/test/oplog/foobar"; + + { + HttpClient Http{BaseUri}; + + { + auto Response = Http.Post(""sv); + CHECK(Response.StatusCode == HttpResponseCode::Created); + } + + { + auto Response = Http.Get(""sv); + CHECK(Response.StatusCode == HttpResponseCode::OK); + + CbObject ResponseObject = Response.AsObject(); + + CHECK(ResponseObject["id"].AsString() == "foobar"sv); + CHECK(ResponseObject["project"].AsString() == "test"sv); + } + } + + SUBCASE("build store persistence") + { + uint8_t AttachData[] = {1, 2, 3}; + + zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); + zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; + + zen::CbObjectWriter OpWriter; + OpWriter << "key" + << "foo" + << "attachment" << Attach; + + const std::string_view ChunkId{ + "00000000" + "00000000" + "00010000"}; + auto FileOid = zen::Oid::FromHexString(ChunkId); + + OpWriter.BeginArray("files"); + OpWriter.BeginObject(); + OpWriter << "id" << FileOid; + OpWriter << "clientpath" + << "/{engine}/client/side/path"; + OpWriter << "serverpath" << BinPath.c_str(); + OpWriter.EndObject(); + OpWriter.EndArray(); + + zen::CbObject Op = OpWriter.Save(); + + zen::CbPackage OpPackage(Op); + OpPackage.AddAttachment(Attach); + + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); + + HttpClient Http{BaseUri}; + + { + auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); + + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::Created); + } + + // Read file data + + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId; + auto Response = Http.Get(ChunkGetUri); + + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + } + + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId << "?offset=1&size=10"; + auto Response = Http.Get(ChunkGetUri); + + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + CHECK(Response.ResponsePayload.GetSize() == 10); + } + + ZEN_INFO("+++++++"); + } + + SUBCASE("snapshot") + { + zen::CbObjectWriter OpWriter; + OpWriter << "key" + << "foo"; + + const std::string_view ChunkId{ + "00000000" + "00000000" + "00010000"}; + auto FileOid = zen::Oid::FromHexString(ChunkId); + + OpWriter.BeginArray("files"); + OpWriter.BeginObject(); + OpWriter << "id" << FileOid; + OpWriter << "clientpath" + << "/{engine}/client/side/path"; + OpWriter << "serverpath" << BinPath.c_str(); + OpWriter.EndObject(); + OpWriter.EndArray(); + + zen::CbObject Op = OpWriter.Save(); + + zen::CbPackage OpPackage(Op); + + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); + + HttpClient Http{BaseUri}; + + { + auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); + + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::Created); + } + + // Read file data, it is raw and uncompressed + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId; + auto Response = Http.Get(ChunkGetUri); + + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + + IoBuffer Data = Response.ResponsePayload; + IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); + CHECK(ReferenceData.GetSize() == Data.GetSize()); + CHECK(ReferenceData.GetView().EqualBytes(Data.GetView())); + } + + { + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); + auto Response = Http.Post("/rpc"sv, Payload, {{"Content-Type", "application/x-ue-cb"}}); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + } + + // Read chunk data, it is now compressed + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId; + auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); + + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + + IoBuffer Data = Response.ResponsePayload; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); + CHECK(Compressed); + IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); + IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); + CHECK(RawSize == ReferenceData.GetSize()); + CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize()); + CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView())); + } + + ZEN_INFO("+++++++"); + } + + SUBCASE("test chunk not found error") + { + HttpClient Http{BaseUri}; + + for (size_t I = 0; I < 65; I++) + { + zen::StringBuilder<128> PostUri; + PostUri << "/f77c781846caead318084604/info"; + auto Response = Http.Get(PostUri); + + REQUIRE(!Response.Error); + CHECK(Response.StatusCode == HttpResponseCode::NotFound); + } + } + } +} + +CbPackage +CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) +{ + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + Package.AddAttachment(Attach); + ZEN_DEBUG("Added attachment {}", Attach.GetHash()); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; +}; + +CbObject +CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) +{ + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + ZEN_DEBUG("Added attachment {}", Attach.GetHash()); + } + Object.EndArray(); + } + return Object.Save(); +}; + +enum CbWriterMeta +{ + BeginObject, + EndObject, + BeginArray, + EndArray +}; + +inline CbWriter& +operator<<(CbWriter& Writer, CbWriterMeta Meta) +{ + switch (Meta) + { + case BeginObject: + Writer.BeginObject(); + break; + case EndObject: + Writer.EndObject(); + break; + case BeginArray: + Writer.BeginArray(); + break; + case EndArray: + Writer.EndArray(); + break; + default: + ZEN_ASSERT(false); + } + return Writer; +} + +TEST_CASE("project.remote") +{ + using namespace std::literals; + using namespace utils; + + ZenServerTestHelper Servers("remote", 3); + Servers.SpawnServers("--debug"); + + std::vector<Oid> OpIds; + const size_t OpCount = 24; + OpIds.reserve(OpCount); + for (size_t I = 0; I < OpCount; ++I) + { + OpIds.emplace_back(Oid::NewOid()); + } + + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + std::vector<std::size_t> AttachmentSizes( + {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, + 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, + 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); + auto It = AttachmentSizes.begin(); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + ZEN_ASSERT(It == AttachmentSizes.end()); + } + + // Note: This is a clone of the function in projectstore.cpp + auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { + using namespace std::literals; + + XXH3_128Stream_deprecated KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + return KeyHash; + }; + + auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { + const Oid Id = ComputeOpKey(Op); + IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); + Ops.insert({Id, OpCoreHash}); + }; + + auto MakeProject = [](std::string_view UrlBase, std::string_view ProjectName) { + CbObjectWriter Project; + Project.AddString("id"sv, ProjectName); + Project.AddString("root"sv, ""sv); + Project.AddString("engine"sv, ""sv); + Project.AddString("project"sv, ""sv); + Project.AddString("projectfile"sv, ""sv); + IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer(); + ProjectPayload.SetContentType(HttpContentType::kCbObject); + + HttpClient Http{UrlBase}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}", ProjectName), ProjectPayload); + CHECK(Response); + }; + + auto MakeOplog = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { + HttpClient Http{UrlBase}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{}); + CHECK(Response); + }; + + auto MakeOp = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName, const CbPackage& OpPackage) { + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); + IoBuffer Body{IoBuffer::Wrap, MemOut.GetData(), MemOut.GetSize()}; + Body.SetContentType(HttpContentType::kCbPackage); + + HttpClient Http{UrlBase}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/new", ProjectName, OplogName), Body); + CHECK(Response); + }; + + MakeProject(Servers.GetInstance(0).GetBaseUri(), "proj0"); + MakeOplog(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]); + CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size()); + AddOp(OpPackage.GetObject(), SourceOps); + MakeOp(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage); + } + + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(Attachments.size()); + for (const auto& AttachmentOplog : Attachments) + { + for (const auto& Attachment : AttachmentOplog.second) + { + AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash()); + } + } + + auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer { + CbObjectWriter Writer; + Write(Writer); + IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer(); + Result.MakeOwned(); + Result.SetContentType(HttpContentType::kCbObject); + return Result; + }; + + auto ValidateAttachments = + [&MakeCbObjectPayload, &AttachmentHashes, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) { + HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) { + Writer << "method"sv + << "getchunks"sv; + Writer << "chunks"sv << BeginArray; + for (const IoHash& Chunk : AttachmentHashes) + { + Writer << Chunk; + } + Writer << EndArray; // chunks + }); + + HttpClient::Response Response = + Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", Project, Oplog), Payload, {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Response); + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); + for (auto A : ResponsePackage.GetAttachments()) + { + CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash()); + } + }; + + auto ValidateOplog = [&SourceOps, &AddOp, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) { + std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps; + std::vector<CbObject> ResultingOplog; + + HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; + HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries", Project, Oplog)); + CHECK(Response); + + IoBuffer Payload(Response.ResponsePayload); + CbObject OplogResonse = LoadCompactBinaryObject(Payload); + CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView(); + + for (CbFieldView OpEntry : EntriesArray) + { + CbObjectView Core = OpEntry.AsObjectView(); + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + AddOp(Op, TargetOps); + } + CHECK(SourceOps == TargetOps); + }; + + auto HttpWaitForCompletion = [](ZenServerInstance& Server, const HttpClient::Response& Response) { + REQUIRE(Response); + const uint64_t JobId = ParseInt<uint64_t>(Response.AsText()).value_or(0); + CHECK(JobId != 0); + + HttpClient Http{Server.GetBaseUri()}; + + while (true) + { + HttpClient::Response StatusResponse = + Http.Get(fmt::format("/admin/jobs/{}", JobId), {{"Accept", ToString(ZenContentType::kCbObject)}}); + CHECK(StatusResponse); + CbObject ResponseObject = StatusResponse.AsObject(); + std::string_view Status = ResponseObject["Status"sv].AsString(); + CHECK(Status != "Aborted"sv); + if (Status == "Complete"sv) + { + return; + } + Sleep(10); + } + }; + + SUBCASE("File") + { + ScopedTemporaryDirectory TempDir; + { + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "chunkfilesizelimit"sv << 5u * 1024u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << path; + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; + + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); + HttpWaitForCompletion(Servers.GetInstance(0), Response); + } + { + MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << path; + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; + + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); + HttpWaitForCompletion(Servers.GetInstance(1), Response); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("File disable blocks") + { + ScopedTemporaryDirectory TempDir; + { + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "chunkfilesizelimit"sv << 5u * 1024u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "disableblocks"sv << true; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; + + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); + HttpWaitForCompletion(Servers.GetInstance(0), Response); + } + { + MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; + + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); + HttpWaitForCompletion(Servers.GetInstance(1), Response); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("File force temp blocks") + { + ScopedTemporaryDirectory TempDir; + { + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "chunkfilesizelimit"sv << 5u * 1024u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "enabletempblocks"sv << true; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); + HttpWaitForCompletion(Servers.GetInstance(0), Response); + } + { + MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); + HttpWaitForCompletion(Servers.GetInstance(1), Response); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("Zen") + { + ScopedTemporaryDirectory TempDir; + { + std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); + std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); + MakeProject(ExportTargetUri, "proj0_copy"); + MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy"); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "chunkfilesizelimit"sv << 5u * 1024u; + Writer << "force"sv << false; + Writer << "zen"sv << BeginObject; + { + Writer << "url"sv << ExportTargetUri.substr(7); + Writer << "project" + << "proj0_copy"; + Writer << "oplog" + << "oplog0_copy"; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); + HttpWaitForCompletion(Servers.GetInstance(0), Response); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + + { + std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); + std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); + MakeProject(ImportTargetUri, "proj1"); + MakeOplog(ImportTargetUri, "proj1", "oplog1"); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "zen"sv << BeginObject; + { + Writer << "url"sv << ImportSourceUri.substr(7); + Writer << "project" + << "proj0_copy"; + Writer << "oplog" + << "oplog0_copy"; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + + HttpClient Http{Servers.GetInstance(2).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload); + HttpWaitForCompletion(Servers.GetInstance(2), Response); + } + ValidateAttachments(2, "proj1", "oplog1"); + ValidateOplog(2, "proj1", "oplog1"); + } +} + +TEST_CASE("project.rpcappendop") +{ + using namespace std::literals; + using namespace utils; + + ZenServerTestHelper Servers("remote", 2); + Servers.SpawnServers("--debug"); + + std::vector<Oid> OpIds; + const size_t OpCount = 24; + OpIds.reserve(OpCount); + for (size_t I = 0; I < OpCount; ++I) + { + OpIds.emplace_back(Oid::NewOid()); + } + + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + std::vector<std::size_t> AttachmentSizes( + {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, + 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, + 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); + auto It = AttachmentSizes.begin(); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + ZEN_ASSERT(It == AttachmentSizes.end()); + } + + // Note: This is a clone of the function in projectstore.cpp + auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { + using namespace std::literals; + + XXH3_128Stream_deprecated KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + return KeyHash; + }; + + auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { + const Oid Id = ComputeOpKey(Op); + IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); + Ops.insert({Id, OpCoreHash}); + }; + + auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) { + CbObjectWriter Project; + Project.AddString("id"sv, ProjectName); + Project.AddString("root"sv, ""sv); + Project.AddString("engine"sv, ""sv); + Project.AddString("project"sv, ""sv); + Project.AddString("projectfile"sv, ""sv); + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save()); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + + auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { + HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + return Response.AsObject(); + }; + + auto MakeOp = + [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> { + CbObjectWriter Request; + Request.AddString("method"sv, "appendops"sv); + Request.BeginArray("ops"sv); + { + Request.AddObject(Op); + } + Request.EndArray(); // "ops" + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save()); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + + CbObjectView ResponsePayload = Response.AsPackage().GetObject(); + CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView(); + std::vector<IoHash> Needs; + Needs.reserve(NeedArray.Num()); + for (CbFieldView NeedView : NeedArray) + { + Needs.push_back(NeedView.AsHash()); + } + return Needs; + }; + + auto SendAttachments = [](HttpClient& Client, + std::string_view ProjectName, + std::string_view OplogName, + std::span<const CompressedBuffer> Attachments, + void* ServerProcessHandle, + const std::filesystem::path& TempPath) { + CompositeBuffer PackageMessage; + { + CbPackage RequestPackage; + CbObjectWriter Request; + Request.AddString("method"sv, "putchunks"sv); + Request.AddBool("usingtmpfiles"sv, true); + Request.BeginArray("chunks"sv); + for (CompressedBuffer AttachmentPayload : Attachments) + { + if (AttachmentPayload.DecodeRawSize() > 16u * 1024u) + { + std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp"); + WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed()); + IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath); + AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment)); + } + + CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash()); + + Request.AddAttachment(Attachment); + RequestPackage.AddAttachment(Attachment); + } + Request.EndArray(); // "chunks" + RequestPackage.SetObject(Request.Save()); + + PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle)); + } + + HttpClient::Response Response = + Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + + { + HttpClient Client(Servers.GetInstance(0).GetBaseUri()); + void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); + + MakeProject(Client, "proj0"); + MakeOplog(Client, "proj0", "oplog0"); + CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); + std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + + if (!MissingAttachments.empty()) + { + CHECK(MissingAttachments.size() <= Attachments[OpId].size()); + tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); + std::vector<CompressedBuffer> PutAttachments; + for (const auto& Attachment : Attachments[OpId]) + { + CompressedBuffer Payload = Attachment.second; + const IoHash AttachmentHash = Payload.DecodeRawHash(); + if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) + { + PutAttachments.push_back(Payload); + } + } + SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); + } + } + + // Do it again, but now we should not need any attachments + + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + CHECK(MissingAttachments.empty()); + } + } + + { + HttpClient Client(Servers.GetInstance(1).GetBaseUri()); + void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk + + MakeProject(Client, "proj0"); + MakeOplog(Client, "proj0", "oplog0"); + CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); + std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + + if (!MissingAttachments.empty()) + { + CHECK(MissingAttachments.size() <= Attachments[OpId].size()); + tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); + std::vector<CompressedBuffer> PutAttachments; + for (const auto& Attachment : Attachments[OpId]) + { + CompressedBuffer Payload = Attachment.second; + const IoHash AttachmentHash = Payload.DecodeRawHash(); + if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) + { + PutAttachments.push_back(Payload); + } + } + SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); + } + } + + // Do it again, but now we should not need any attachments + + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + CHECK(MissingAttachments.empty()); + } + } +} + +} // namespace zen::tests + +#endif |