aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-26 10:26:34 +0200
committerGitHub Enterprise <[email protected]>2025-09-26 10:26:34 +0200
commit347153218dd09e3806e5b27eb51f538768f27035 (patch)
tree81ef102cc116ed2c751e1af91d83db32ffa8d369 /src/zenserver-test/zenserver-test.cpp
parentMerge pull request #509 from ue-foundation/zs/put-overwrite-policy-response (diff)
downloadzen-347153218dd09e3806e5b27eb51f538768f27035.tar.xz
zen-347153218dd09e3806e5b27eb51f538768f27035.zip
new append op rpc method (#511)
- Feature: New `/prj/{project}/{oplog}/rpc` endpoint method `appendops` to send an array of oplog ops and receiving a list of `need` for attachments not present - Feature: Added `usingtmpfiles` boolean field to `/prj/{project}/{oplog}/rpc` method `putchunks` to be explicit about allowing move of temp attachment files - Improvement: Added additional validation of compact binary objects when reading from disk/receiving from client - Improvement: Windows: Added fallback code to use standard `MoveFile` api when rename via `SetFileInformationByHandle` fails in `MoveToFile` (used by filecas)
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
-rw-r--r--src/zenserver-test/zenserver-test.cpp270
1 files changed, 270 insertions, 0 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 0ea953937..8099091bb 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -3145,6 +3145,31 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse
return Package;
};
+CbObject
+CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+{
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ ZEN_DEBUG("Added attachment {}", Attach.GetHash());
+ }
+ Object.EndArray();
+ }
+ return Object.Save();
+};
+
cpr::Body
AsBody(const IoBuffer& Payload)
{
@@ -3663,6 +3688,251 @@ TEST_CASE("project.remote")
}
}
+TEST_CASE("project.rpcappendop")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenServerTestHelper Servers("remote", 2);
+ Servers.SpawnServers("--debug");
+
+ std::vector<Oid> OpIds;
+ const size_t OpCount = 24;
+ OpIds.reserve(OpCount);
+ for (size_t I = 0; I < OpCount; ++I)
+ {
+ OpIds.emplace_back(Oid::NewOid());
+ }
+
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ std::vector<std::size_t> AttachmentSizes(
+ {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194,
+ 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u,
+ 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225});
+ auto It = AttachmentSizes.begin();
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ ZEN_ASSERT(It == AttachmentSizes.end());
+ }
+
+ // Note: This is a clone of the function in projectstore.cpp
+ auto ComputeOpKey = [](const CbObjectView& Op) -> Oid {
+ using namespace std::literals;
+
+ XXH3_128Stream_deprecated KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ return KeyHash;
+ };
+
+ auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
+ const Oid Id = ComputeOpKey(Op);
+ IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
+ Ops.insert({Id, OpCoreHash});
+ };
+
+ auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) {
+ CbObjectWriter Project;
+ Project.AddString("id"sv, ProjectName);
+ Project.AddString("root"sv, ""sv);
+ Project.AddString("engine"sv, ""sv);
+ Project.AddString("project"sv, ""sv);
+ Project.AddString("projectfile"sv, ""sv);
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save());
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+
+ auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+ auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ return Response.AsObject();
+ };
+
+ auto MakeOp =
+ [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> {
+ CbObjectWriter Request;
+ Request.AddString("method"sv, "appendops"sv);
+ Request.BeginArray("ops"sv);
+ {
+ Request.AddObject(Op);
+ }
+ Request.EndArray(); // "ops"
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save());
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+
+ CbObjectView ResponsePayload = Response.AsPackage().GetObject();
+ CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView();
+ std::vector<IoHash> Needs;
+ Needs.reserve(NeedArray.Num());
+ for (CbFieldView NeedView : NeedArray)
+ {
+ Needs.push_back(NeedView.AsHash());
+ }
+ return Needs;
+ };
+
+ auto SendAttachments = [](HttpClient& Client,
+ std::string_view ProjectName,
+ std::string_view OplogName,
+ std::span<const CompressedBuffer> Attachments,
+ void* ServerProcessHandle,
+ const std::filesystem::path& TempPath) {
+ CompositeBuffer PackageMessage;
+ {
+ CbPackage RequestPackage;
+ CbObjectWriter Request;
+ Request.AddString("method"sv, "putchunks"sv);
+ Request.AddBool("usingtmpfiles"sv, true);
+ Request.BeginArray("chunks"sv);
+ for (CompressedBuffer AttachmentPayload : Attachments)
+ {
+ if (AttachmentPayload.DecodeRawSize() > 16u * 1024u)
+ {
+ std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp");
+ WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed());
+ IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath);
+ AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment));
+ }
+
+ CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash());
+
+ Request.AddAttachment(Attachment);
+ RequestPackage.AddAttachment(Attachment);
+ }
+ Request.EndArray(); // "chunks"
+ RequestPackage.SetObject(Request.Save());
+
+ PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle));
+ }
+
+ HttpClient::Response Response =
+ Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+
+ {
+ HttpClient Client(Servers.GetInstance(0).GetBaseUri());
+ void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle();
+
+ MakeProject(Client, "proj0");
+ MakeOplog(Client, "proj0", "oplog0");
+ CbObject Oplog = GetOplog(Client, "proj0", "oplog0");
+ std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String();
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+
+ if (!MissingAttachments.empty())
+ {
+ CHECK(MissingAttachments.size() <= Attachments[OpId].size());
+ tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end());
+ std::vector<CompressedBuffer> PutAttachments;
+ for (const auto& Attachment : Attachments[OpId])
+ {
+ CompressedBuffer Payload = Attachment.second;
+ const IoHash AttachmentHash = Payload.DecodeRawHash();
+ if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end())
+ {
+ PutAttachments.push_back(Payload);
+ }
+ }
+ SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath);
+ }
+ }
+
+ // Do it again, but now we should not need any attachments
+
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+ CHECK(MissingAttachments.empty());
+ }
+ }
+
+ {
+ HttpClient Client(Servers.GetInstance(1).GetBaseUri());
+ void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk
+
+ MakeProject(Client, "proj0");
+ MakeOplog(Client, "proj0", "oplog0");
+ CbObject Oplog = GetOplog(Client, "proj0", "oplog0");
+ std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String();
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+
+ if (!MissingAttachments.empty())
+ {
+ CHECK(MissingAttachments.size() <= Attachments[OpId].size());
+ tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end());
+ std::vector<CompressedBuffer> PutAttachments;
+ for (const auto& Attachment : Attachments[OpId])
+ {
+ CompressedBuffer Payload = Attachment.second;
+ const IoHash AttachmentHash = Payload.DecodeRawHash();
+ if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end())
+ {
+ PutAttachments.push_back(Payload);
+ }
+ }
+ SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath);
+ }
+ }
+
+ // Do it again, but now we should not need any attachments
+
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+ CHECK(MissingAttachments.empty());
+ }
+ }
+}
+
std::vector<std::pair<std::filesystem::path, IoBuffer>>
GenerateFolderContent(const std::filesystem::path& RootPath)
{