diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-26 10:26:34 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-26 10:26:34 +0200 |
| commit | 347153218dd09e3806e5b27eb51f538768f27035 (patch) | |
| tree | 81ef102cc116ed2c751e1af91d83db32ffa8d369 /src/zenserver-test/zenserver-test.cpp | |
| parent | Merge pull request #509 from ue-foundation/zs/put-overwrite-policy-response (diff) | |
| download | zen-347153218dd09e3806e5b27eb51f538768f27035.tar.xz zen-347153218dd09e3806e5b27eb51f538768f27035.zip | |
new append op rpc method (#511)
- Feature: New `/prj/{project}/{oplog}/rpc` endpoint method `appendops` to send an array of oplog ops and receiving a list of `need` for attachments not present
- Feature: Added `usingtmpfiles` boolean field to `/prj/{project}/{oplog}/rpc` method `putchunks` to be explicit about allowing move of temp attachment files
- Improvement: Added additional validation of compact binary objects when reading from disk/receiving from client
- Improvement: Windows: Added fallback code to use standard `MoveFile` api when rename via `SetFileInformationByHandle` fails in `MoveToFile` (used by filecas)
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 0ea953937..8099091bb 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -3145,6 +3145,31 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse return Package; }; +CbObject +CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) +{ + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + ZEN_DEBUG("Added attachment {}", Attach.GetHash()); + } + Object.EndArray(); + } + return Object.Save(); +}; + cpr::Body AsBody(const IoBuffer& Payload) { @@ -3663,6 +3688,251 @@ TEST_CASE("project.remote") } } +TEST_CASE("project.rpcappendop") +{ + using namespace std::literals; + using namespace utils; + + ZenServerTestHelper Servers("remote", 2); + Servers.SpawnServers("--debug"); + + std::vector<Oid> OpIds; + const size_t OpCount = 24; + OpIds.reserve(OpCount); + for (size_t I = 0; I < OpCount; ++I) + { + OpIds.emplace_back(Oid::NewOid()); + } + + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + std::vector<std::size_t> AttachmentSizes( + {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, + 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, + 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); + auto It = AttachmentSizes.begin(); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + ZEN_ASSERT(It == AttachmentSizes.end()); + } + + // Note: This is a clone of the function in projectstore.cpp + auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { + using namespace std::literals; + + XXH3_128Stream_deprecated KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + return KeyHash; + }; + + auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { + const Oid Id = ComputeOpKey(Op); + IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); + Ops.insert({Id, OpCoreHash}); + }; + + auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) { + CbObjectWriter Project; + Project.AddString("id"sv, ProjectName); + Project.AddString("root"sv, ""sv); + Project.AddString("engine"sv, ""sv); + Project.AddString("project"sv, ""sv); + Project.AddString("projectfile"sv, ""sv); + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save()); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + + auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { + HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + return Response.AsObject(); + }; + + auto MakeOp = + [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> { + CbObjectWriter Request; + Request.AddString("method"sv, "appendops"sv); + Request.BeginArray("ops"sv); + { + Request.AddObject(Op); + } + Request.EndArray(); // "ops" + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save()); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + + CbObjectView ResponsePayload = Response.AsPackage().GetObject(); + CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView(); + std::vector<IoHash> Needs; + Needs.reserve(NeedArray.Num()); + for (CbFieldView NeedView : NeedArray) + { + Needs.push_back(NeedView.AsHash()); + } + return Needs; + }; + + auto SendAttachments = [](HttpClient& Client, + std::string_view ProjectName, + std::string_view OplogName, + std::span<const CompressedBuffer> Attachments, + void* ServerProcessHandle, + const std::filesystem::path& TempPath) { + CompositeBuffer PackageMessage; + { + CbPackage RequestPackage; + CbObjectWriter Request; + Request.AddString("method"sv, "putchunks"sv); + Request.AddBool("usingtmpfiles"sv, true); + Request.BeginArray("chunks"sv); + for (CompressedBuffer AttachmentPayload : Attachments) + { + if (AttachmentPayload.DecodeRawSize() > 16u * 1024u) + { + std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp"); + WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed()); + IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath); + AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment)); + } + + CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash()); + + Request.AddAttachment(Attachment); + RequestPackage.AddAttachment(Attachment); + } + Request.EndArray(); // "chunks" + RequestPackage.SetObject(Request.Save()); + + PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle)); + } + + HttpClient::Response Response = + Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + + { + HttpClient Client(Servers.GetInstance(0).GetBaseUri()); + void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); + + MakeProject(Client, "proj0"); + MakeOplog(Client, "proj0", "oplog0"); + CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); + std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + + if (!MissingAttachments.empty()) + { + CHECK(MissingAttachments.size() <= Attachments[OpId].size()); + tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); + std::vector<CompressedBuffer> PutAttachments; + for (const auto& Attachment : Attachments[OpId]) + { + CompressedBuffer Payload = Attachment.second; + const IoHash AttachmentHash = Payload.DecodeRawHash(); + if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) + { + PutAttachments.push_back(Payload); + } + } + SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); + } + } + + // Do it again, but now we should not need any attachments + + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + CHECK(MissingAttachments.empty()); + } + } + + { + HttpClient Client(Servers.GetInstance(1).GetBaseUri()); + void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk + + MakeProject(Client, "proj0"); + MakeOplog(Client, "proj0", "oplog0"); + CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); + std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + + if (!MissingAttachments.empty()) + { + CHECK(MissingAttachments.size() <= Attachments[OpId].size()); + tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); + std::vector<CompressedBuffer> PutAttachments; + for (const auto& Attachment : Attachments[OpId]) + { + CompressedBuffer Payload = Attachment.second; + const IoHash AttachmentHash = Payload.DecodeRawHash(); + if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) + { + PutAttachments.push_back(Payload); + } + } + SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); + } + } + + // Do it again, but now we should not need any attachments + + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + CHECK(MissingAttachments.empty()); + } + } +} + std::vector<std::pair<std::filesystem::path, IoBuffer>> GenerateFolderContent(const std::filesystem::path& RootPath) { |