// Copyright Epic Games, Inc. All Rights Reserved. #if ZEN_WITH_TESTS # include "zenserver-test.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END # include namespace zen::tests { using namespace std::literals; TEST_CASE("project.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Instance1(TestEnv); Instance1.SetDataDir(TestDir); const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); std::mt19937_64 mt; zen::StringBuilder<64> BaseUri; BaseUri << fmt::format("http://localhost:{}", PortNumber); std::filesystem::path BinPath = zen::GetRunningExecutablePath(); std::filesystem::path RootPath = BinPath.parent_path().parent_path(); BinPath = BinPath.lexically_relative(RootPath); SUBCASE("build store init") { { HttpClient Http{BaseUri}; { zen::CbObjectWriter Body; Body << "id" << "test"; Body << "root" << RootPath.c_str(); Body << "project" << "/zooom"; Body << "engine" << "/zooom"; zen::BinaryWriter MemOut; IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer(); auto Response = Http.Post("/prj/test"sv, BodyBuf); CHECK(Response.StatusCode == HttpResponseCode::Created); } { auto Response = Http.Get("/prj/test"sv); CHECK(Response.StatusCode == HttpResponseCode::OK); CbObject ResponseObject = Response.AsObject(); CHECK(ResponseObject["id"].AsString() == "test"sv); CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); } } BaseUri << "/prj/test/oplog/foobar"; { HttpClient Http{BaseUri}; { auto Response = Http.Post(""sv); CHECK(Response.StatusCode == HttpResponseCode::Created); } { auto Response = Http.Get(""sv); CHECK(Response.StatusCode == HttpResponseCode::OK); CbObject ResponseObject = Response.AsObject(); CHECK(ResponseObject["id"].AsString() == "foobar"sv); CHECK(ResponseObject["project"].AsString() == "test"sv); } } SUBCASE("build store persistence") { uint8_t AttachData[] = {1, 2, 3}; zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; zen::CbObjectWriter OpWriter; OpWriter << "key" << "foo" << "attachment" << Attach; const std::string_view ChunkId{ "00000000" "00000000" "00010000"}; auto FileOid = zen::Oid::FromHexString(ChunkId); OpWriter.BeginArray("files"); OpWriter.BeginObject(); OpWriter << "id" << FileOid; OpWriter << "clientpath" << "/{engine}/client/side/path"; OpWriter << "serverpath" << BinPath.c_str(); OpWriter.EndObject(); OpWriter.EndArray(); zen::CbObject Op = OpWriter.Save(); zen::CbPackage OpPackage(Op); OpPackage.AddAttachment(Attach); zen::BinaryWriter MemOut; legacy::SaveCbPackage(OpPackage, MemOut); HttpClient Http{BaseUri}; { auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::Created); } // Read file data { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << "/" << ChunkId; auto Response = Http.Get(ChunkGetUri); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::OK); } { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << "/" << ChunkId << "?offset=1&size=10"; auto Response = Http.Get(ChunkGetUri); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::OK); CHECK(Response.ResponsePayload.GetSize() == 10); } ZEN_INFO("+++++++"); } SUBCASE("snapshot") { zen::CbObjectWriter OpWriter; OpWriter << "key" << "foo"; const std::string_view ChunkId{ "00000000" "00000000" "00010000"}; auto FileOid = zen::Oid::FromHexString(ChunkId); OpWriter.BeginArray("files"); OpWriter.BeginObject(); OpWriter << "id" << FileOid; OpWriter << "clientpath" << "/{engine}/client/side/path"; OpWriter << "serverpath" << BinPath.c_str(); OpWriter.EndObject(); OpWriter.EndArray(); zen::CbObject Op = OpWriter.Save(); zen::CbPackage OpPackage(Op); zen::BinaryWriter MemOut; legacy::SaveCbPackage(OpPackage, MemOut); HttpClient Http{BaseUri}; { auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::Created); } // Read file data, it is raw and uncompressed { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << "/" << ChunkId; auto Response = Http.Get(ChunkGetUri); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::OK); IoBuffer Data = Response.ResponsePayload; IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); CHECK(ReferenceData.GetSize() == Data.GetSize()); CHECK(ReferenceData.GetView().EqualBytes(Data.GetView())); } { IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); auto Response = Http.Post("/rpc"sv, Payload); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::OK); } // Read chunk data, it is now compressed { zen::StringBuilder<128> ChunkGetUri; ChunkGetUri << "/" << ChunkId; auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); REQUIRE(Response); CHECK(Response.StatusCode == HttpResponseCode::OK); IoBuffer Data = Response.ResponsePayload; IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); CHECK(Compressed); IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); CHECK(RawSize == ReferenceData.GetSize()); CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize()); CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView())); } ZEN_INFO("+++++++"); } SUBCASE("test chunk not found error") { HttpClient Http{BaseUri}; for (size_t I = 0; I < 65; I++) { zen::StringBuilder<128> PostUri; PostUri << "/f77c781846caead318084604/info"; auto Response = Http.Get(PostUri); REQUIRE(!Response.Error); CHECK(Response.StatusCode == HttpResponseCode::NotFound); } } } } CbPackage CreateOplogPackage(const Oid& Id, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv << "Standard"sv; Object << "data"sv << Attach; Object.EndObject(); Package.AddAttachment(Attach); ZEN_DEBUG("Added attachment {}", Attach.GetHash()); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; CbObject CreateOplogOp(const Oid& Id, const std::span>& Attachments) { CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv << "Standard"sv; Object << "data"sv << Attach; Object.EndObject(); ZEN_DEBUG("Added attachment {}", Attach.GetHash()); } Object.EndArray(); } return Object.Save(); }; enum CbWriterMeta { BeginObject, EndObject, BeginArray, EndArray }; inline CbWriter& operator<<(CbWriter& Writer, CbWriterMeta Meta) { switch (Meta) { case BeginObject: Writer.BeginObject(); break; case EndObject: Writer.EndObject(); break; case BeginArray: Writer.BeginArray(); break; case EndArray: Writer.EndArray(); break; default: ZEN_ASSERT(false); } return Writer; } TEST_CASE("project.remote") { using namespace std::literals; using namespace utils; ZenServerTestHelper Servers("remote", 3); Servers.SpawnServers("--debug"); std::vector OpIds; const size_t OpCount = 24; OpIds.reserve(OpCount); for (size_t I = 0; I < OpCount; ++I) { OpIds.emplace_back(Oid::NewOid()); } std::unordered_map>, Oid::Hasher> Attachments; { std::vector AttachmentSizes( {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); auto It = AttachmentSizes.begin(); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++}); Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list{}); Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++}); Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list{}); Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list{}); Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++}); Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); ZEN_ASSERT(It == AttachmentSizes.end()); } // Note: This is a clone of the function in projectstore.cpp auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { using namespace std::literals; XXH3_128Stream_deprecated KeyHasher; Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash128 = KeyHasher.GetHash(); Oid KeyHash; memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); return KeyHash; }; auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map& Ops) { const Oid Id = ComputeOpKey(Op); IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); Ops.insert({Id, OpCoreHash}); }; auto MakeProject = [](std::string_view UrlBase, std::string_view ProjectName) { CbObjectWriter Project; Project.AddString("id"sv, ProjectName); Project.AddString("root"sv, ""sv); Project.AddString("engine"sv, ""sv); Project.AddString("project"sv, ""sv); Project.AddString("projectfile"sv, ""sv); IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer(); ProjectPayload.SetContentType(HttpContentType::kCbObject); HttpClient Http{UrlBase}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}", ProjectName), ProjectPayload); CHECK(Response); }; auto MakeOplog = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { HttpClient Http{UrlBase}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{}); CHECK(Response); }; auto MakeOp = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName, const CbPackage& OpPackage) { zen::BinaryWriter MemOut; legacy::SaveCbPackage(OpPackage, MemOut); IoBuffer Body{IoBuffer::Wrap, MemOut.GetData(), MemOut.GetSize()}; Body.SetContentType(HttpContentType::kCbPackage); HttpClient Http{UrlBase}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/new", ProjectName, OplogName), Body); CHECK(Response); }; MakeProject(Servers.GetInstance(0).GetBaseUri(), "proj0"); MakeOplog(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); std::unordered_map SourceOps; for (const Oid& OpId : OpIds) { CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]); CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size()); AddOp(OpPackage.GetObject(), SourceOps); MakeOp(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage); } std::vector AttachmentHashes; AttachmentHashes.reserve(Attachments.size()); for (const auto& AttachmentOplog : Attachments) { for (const auto& Attachment : AttachmentOplog.second) { AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash()); } } auto MakeCbObjectPayload = [](std::function Write) -> IoBuffer { CbObjectWriter Writer; Write(Writer); IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer(); Result.MakeOwned(); Result.SetContentType(HttpContentType::kCbObject); return Result; }; auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) { HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) { Writer << "method"sv << "getchunks"sv; Writer << "chunks"sv << BeginArray; for (const IoHash& Chunk : AttachmentHashes) { Writer << Chunk; } Writer << EndArray; // chunks }); HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", Project, Oplog), Payload, {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Response); CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); for (auto A : ResponsePackage.GetAttachments()) { CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash()); } }; auto ValidateOplog = [&SourceOps, &AddOp, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) { std::unordered_map TargetOps; std::vector ResultingOplog; HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries", Project, Oplog)); CHECK(Response); IoBuffer Payload(Response.ResponsePayload); CbObject OplogResonse = LoadCompactBinaryObject(Payload); CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView(); for (CbFieldView OpEntry : EntriesArray) { CbObjectView Core = OpEntry.AsObjectView(); BinaryWriter Writer; Core.CopyTo(Writer); MemoryView OpView = Writer.GetView(); IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); AddOp(Op, TargetOps); } CHECK(SourceOps == TargetOps); }; auto HttpWaitForCompletion = [](ZenServerInstance& Server, const HttpClient::Response& Response) { REQUIRE(Response); const uint64_t JobId = ParseInt(Response.AsText()).value_or(0); CHECK(JobId != 0); HttpClient Http{Server.GetBaseUri()}; while (true) { HttpClient::Response StatusResponse = Http.Get(fmt::format("/admin/jobs/{}", JobId), {{"Accept", ToString(ZenContentType::kCbObject)}}); CHECK(StatusResponse); CbObject ResponseObject = StatusResponse.AsObject(); std::string_view Status = ResponseObject["Status"sv].AsString(); CHECK(Status != "Aborted"sv); if (Status == "Complete"sv) { return; } Sleep(10); } }; SUBCASE("File") { ScopedTemporaryDirectory TempDir; { IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; Writer << "params" << BeginObject; { Writer << "maxblocksize"sv << 3072u; Writer << "maxchunkembedsize"sv << 1296u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; Writer << "force"sv << false; Writer << "file"sv << BeginObject; { Writer << "path"sv << path; Writer << "name"sv << "proj0_oplog0"sv; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } { MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; { Writer << "force"sv << false; Writer << "file"sv << BeginObject; { Writer << "path"sv << path; Writer << "name"sv << "proj0_oplog0"sv; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); HttpWaitForCompletion(Servers.GetInstance(1), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); } SUBCASE("File disable blocks") { ScopedTemporaryDirectory TempDir; { IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; Writer << "params" << BeginObject; { Writer << "maxblocksize"sv << 3072u; Writer << "maxchunkembedsize"sv << 1296u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; Writer << "force"sv << false; Writer << "file"sv << BeginObject; { Writer << "path"sv << TempDir.Path().string(); Writer << "name"sv << "proj0_oplog0"sv; Writer << "disableblocks"sv << true; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } { MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; { Writer << "force"sv << false; Writer << "file"sv << BeginObject; { Writer << "path"sv << TempDir.Path().string(); Writer << "name"sv << "proj0_oplog0"sv; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); HttpWaitForCompletion(Servers.GetInstance(1), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); } SUBCASE("File force temp blocks") { ScopedTemporaryDirectory TempDir; { IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; Writer << "params" << BeginObject; { Writer << "maxblocksize"sv << 3072u; Writer << "maxchunkembedsize"sv << 1296u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; Writer << "force"sv << false; Writer << "file"sv << BeginObject; { Writer << "path"sv << TempDir.Path().string(); Writer << "name"sv << "proj0_oplog0"sv; Writer << "enabletempblocks"sv << true; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } { MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; { Writer << "force"sv << false; Writer << "file"sv << BeginObject; { Writer << "path"sv << TempDir.Path().string(); Writer << "name"sv << "proj0_oplog0"sv; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); HttpWaitForCompletion(Servers.GetInstance(1), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); } SUBCASE("Zen") { ScopedTemporaryDirectory TempDir; { std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); MakeProject(ExportTargetUri, "proj0_copy"); MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy"); IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; Writer << "params" << BeginObject; { Writer << "maxblocksize"sv << 3072u; Writer << "maxchunkembedsize"sv << 1296u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; Writer << "force"sv << false; Writer << "zen"sv << BeginObject; { Writer << "url"sv << ExportTargetUri.substr(7); Writer << "project" << "proj0_copy"; Writer << "oplog" << "oplog0_copy"; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } ValidateAttachments(1, "proj0_copy", "oplog0_copy"); ValidateOplog(1, "proj0_copy", "oplog0_copy"); { std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); MakeProject(ImportTargetUri, "proj1"); MakeOplog(ImportTargetUri, "proj1", "oplog1"); IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; { Writer << "force"sv << false; Writer << "zen"sv << BeginObject; { Writer << "url"sv << ImportSourceUri.substr(7); Writer << "project" << "proj0_copy"; Writer << "oplog" << "oplog0_copy"; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); HttpClient Http{Servers.GetInstance(2).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload); HttpWaitForCompletion(Servers.GetInstance(2), Response); } ValidateAttachments(2, "proj1", "oplog1"); ValidateOplog(2, "proj1", "oplog1"); } } TEST_CASE("project.rpcappendop") { using namespace std::literals; using namespace utils; ZenServerTestHelper Servers("remote", 2); Servers.SpawnServers("--debug"); std::vector OpIds; const size_t OpCount = 24; OpIds.reserve(OpCount); for (size_t I = 0; I < OpCount; ++I) { OpIds.emplace_back(Oid::NewOid()); } std::unordered_map>, Oid::Hasher> Attachments; { std::vector AttachmentSizes( {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); auto It = AttachmentSizes.begin(); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++}); Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list{}); Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++}); Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++, *It++}); Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list{}); Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++}); Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list{}); Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list{*It++, *It++, *It++}); Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list{*It++}); ZEN_ASSERT(It == AttachmentSizes.end()); } // Note: This is a clone of the function in projectstore.cpp auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { using namespace std::literals; XXH3_128Stream_deprecated KeyHasher; Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash128 = KeyHasher.GetHash(); Oid KeyHash; memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); return KeyHash; }; auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map& Ops) { const Oid Id = ComputeOpKey(Op); IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); Ops.insert({Id, OpCoreHash}); }; auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) { CbObjectWriter Project; Project.AddString("id"sv, ProjectName); Project.AddString("root"sv, ""sv); Project.AddString("engine"sv, ""sv); Project.AddString("project"sv, ""sv); Project.AddString("projectfile"sv, ""sv); HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save()); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); }; auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); }; auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); return Response.AsObject(); }; auto MakeOp = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector { CbObjectWriter Request; Request.AddString("method"sv, "appendops"sv); Request.BeginArray("ops"sv); { Request.AddObject(Op); } Request.EndArray(); // "ops" HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save()); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); CbObjectView ResponsePayload = Response.AsPackage().GetObject(); CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView(); std::vector Needs; Needs.reserve(NeedArray.Num()); for (CbFieldView NeedView : NeedArray) { Needs.push_back(NeedView.AsHash()); } return Needs; }; auto SendAttachments = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, std::span Attachments, void* ServerProcessHandle, const std::filesystem::path& TempPath) { CompositeBuffer PackageMessage; { CbPackage RequestPackage; CbObjectWriter Request; Request.AddString("method"sv, "putchunks"sv); Request.AddBool("usingtmpfiles"sv, true); Request.BeginArray("chunks"sv); for (CompressedBuffer AttachmentPayload : Attachments) { if (AttachmentPayload.DecodeRawSize() > 16u * 1024u) { std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp"); WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed()); IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath); AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment)); } CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash()); Request.AddAttachment(Attachment); RequestPackage.AddAttachment(Attachment); } Request.EndArray(); // "chunks" RequestPackage.SetObject(Request.Save()); PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle)); } HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); }; { HttpClient Client(Servers.GetInstance(0).GetBaseUri()); void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); MakeProject(Client, "proj0"); MakeOplog(Client, "proj0", "oplog0"); CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); std::unordered_map SourceOps; for (const Oid& OpId : OpIds) { CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); AddOp(Op, SourceOps); std::vector MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); if (!MissingAttachments.empty()) { CHECK(MissingAttachments.size() <= Attachments[OpId].size()); tsl::robin_set MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); std::vector PutAttachments; for (const auto& Attachment : Attachments[OpId]) { CompressedBuffer Payload = Attachment.second; const IoHash AttachmentHash = Payload.DecodeRawHash(); if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) { PutAttachments.push_back(Payload); } } SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); } } // Do it again, but now we should not need any attachments for (const Oid& OpId : OpIds) { CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); AddOp(Op, SourceOps); std::vector MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); CHECK(MissingAttachments.empty()); } } { HttpClient Client(Servers.GetInstance(1).GetBaseUri()); void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk MakeProject(Client, "proj0"); MakeOplog(Client, "proj0", "oplog0"); CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); std::unordered_map SourceOps; for (const Oid& OpId : OpIds) { CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); AddOp(Op, SourceOps); std::vector MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); if (!MissingAttachments.empty()) { CHECK(MissingAttachments.size() <= Attachments[OpId].size()); tsl::robin_set MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); std::vector PutAttachments; for (const auto& Attachment : Attachments[OpId]) { CompressedBuffer Payload = Attachment.second; const IoHash AttachmentHash = Payload.DecodeRawHash(); if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) { PutAttachments.push_back(Payload); } } SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); } } // Do it again, but now we should not need any attachments for (const Oid& OpId : OpIds) { CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); AddOp(Op, SourceOps); std::vector MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); CHECK(MissingAttachments.empty()); } } } } // namespace zen::tests #endif