aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/projectstore-tests.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-09-29 13:15:16 +0200
committerGitHub Enterprise <[email protected]>2025-09-29 13:15:16 +0200
commitd4c6e547a7081b1562a69dc9839d24cb82681c5d (patch)
tree3ffe43dcf09bb6d01c2fb860bb1f73882f44827d /src/zenserver-test/projectstore-tests.cpp
parentgracefully handle missing chunks when exporting an oplog (#526) (diff)
downloadzen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.tar.xz
zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.zip
split zenserver-test monolith into multiple source files (#528)
Diffstat (limited to 'src/zenserver-test/projectstore-tests.cpp')
-rw-r--r--src/zenserver-test/projectstore-tests.cpp1055
1 files changed, 1055 insertions, 0 deletions
diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp
new file mode 100644
index 000000000..ce3e9dcd1
--- /dev/null
+++ b/src/zenserver-test/projectstore-tests.cpp
@@ -0,0 +1,1055 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#if ZEN_WITH_TESTS
+# include "zenserver-test.h"
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/stream.h>
+# include <zencore/string.h>
+# include <zencore/xxhash.h>
+# include <zenhttp/packageformat.h>
+# include <zenutil/zenserverprocess.h>
+# include <zenhttp/httpclient.h>
+
+# include <tsl/robin_set.h>
+# include <random>
+
+namespace zen::tests {
+
+using namespace std::literals;
+
+TEST_CASE("project.basic")
+{
+ using namespace std::literals;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+
+ const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
+
+ std::mt19937_64 mt;
+
+ zen::StringBuilder<64> BaseUri;
+ BaseUri << fmt::format("http://localhost:{}", PortNumber);
+
+ std::filesystem::path BinPath = zen::GetRunningExecutablePath();
+ std::filesystem::path RootPath = BinPath.parent_path().parent_path();
+ BinPath = BinPath.lexically_relative(RootPath);
+
+ SUBCASE("build store init")
+ {
+ {
+ HttpClient Http{BaseUri};
+
+ {
+ zen::CbObjectWriter Body;
+ Body << "id"
+ << "test";
+ Body << "root" << RootPath.c_str();
+ Body << "project"
+ << "/zooom";
+ Body << "engine"
+ << "/zooom";
+
+ zen::BinaryWriter MemOut;
+ IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
+
+ auto Response = Http.Post("/prj/test"sv, BodyBuf);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ auto Response = Http.Get("/prj/test"sv);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+
+ CbObject ResponseObject = Response.AsObject();
+
+ CHECK(ResponseObject["id"].AsString() == "test"sv);
+ CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
+ }
+ }
+
+ BaseUri << "/prj/test/oplog/foobar";
+
+ {
+ HttpClient Http{BaseUri};
+
+ {
+ auto Response = Http.Post(""sv);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ auto Response = Http.Get(""sv);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+
+ CbObject ResponseObject = Response.AsObject();
+
+ CHECK(ResponseObject["id"].AsString() == "foobar"sv);
+ CHECK(ResponseObject["project"].AsString() == "test"sv);
+ }
+ }
+
+ SUBCASE("build store persistence")
+ {
+ uint8_t AttachData[] = {1, 2, 3};
+
+ zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
+ zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
+
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo"
+ << "attachment" << Attach;
+
+ const std::string_view ChunkId{
+ "00000000"
+ "00000000"
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
+ OpPackage.AddAttachment(Attach);
+
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
+
+ HttpClient Http{BaseUri};
+
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
+
+ // Read file data
+
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri);
+
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
+
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId << "?offset=1&size=10";
+ auto Response = Http.Get(ChunkGetUri);
+
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ CHECK(Response.ResponsePayload.GetSize() == 10);
+ }
+
+ ZEN_INFO("+++++++");
+ }
+
+ SUBCASE("snapshot")
+ {
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo";
+
+ const std::string_view ChunkId{
+ "00000000"
+ "00000000"
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
+
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
+
+ HttpClient Http{BaseUri};
+
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
+
+ // Read file data, it is raw and uncompressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri);
+
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+
+ IoBuffer Data = Response.ResponsePayload;
+ IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
+ CHECK(ReferenceData.GetSize() == Data.GetSize());
+ CHECK(ReferenceData.GetView().EqualBytes(Data.GetView()));
+ }
+
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
+ auto Response = Http.Post("/rpc"sv, Payload, {{"Content-Type", "application/x-ue-cb"}});
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
+
+ // Read chunk data, it is now compressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ CHECK(Compressed);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
+ CHECK(RawSize == ReferenceData.GetSize());
+ CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize());
+ CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView()));
+ }
+
+ ZEN_INFO("+++++++");
+ }
+
+ SUBCASE("test chunk not found error")
+ {
+ HttpClient Http{BaseUri};
+
+ for (size_t I = 0; I < 65; I++)
+ {
+ zen::StringBuilder<128> PostUri;
+ PostUri << "/f77c781846caead318084604/info";
+ auto Response = Http.Get(PostUri);
+
+ REQUIRE(!Response.Error);
+ CHECK(Response.StatusCode == HttpResponseCode::NotFound);
+ }
+ }
+ }
+}
+
+CbPackage
+CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+{
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ Package.AddAttachment(Attach);
+ ZEN_DEBUG("Added attachment {}", Attach.GetHash());
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+};
+
+CbObject
+CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+{
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ ZEN_DEBUG("Added attachment {}", Attach.GetHash());
+ }
+ Object.EndArray();
+ }
+ return Object.Save();
+};
+
+enum CbWriterMeta
+{
+ BeginObject,
+ EndObject,
+ BeginArray,
+ EndArray
+};
+
+inline CbWriter&
+operator<<(CbWriter& Writer, CbWriterMeta Meta)
+{
+ switch (Meta)
+ {
+ case BeginObject:
+ Writer.BeginObject();
+ break;
+ case EndObject:
+ Writer.EndObject();
+ break;
+ case BeginArray:
+ Writer.BeginArray();
+ break;
+ case EndArray:
+ Writer.EndArray();
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ return Writer;
+}
+
+TEST_CASE("project.remote")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenServerTestHelper Servers("remote", 3);
+ Servers.SpawnServers("--debug");
+
+ std::vector<Oid> OpIds;
+ const size_t OpCount = 24;
+ OpIds.reserve(OpCount);
+ for (size_t I = 0; I < OpCount; ++I)
+ {
+ OpIds.emplace_back(Oid::NewOid());
+ }
+
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ std::vector<std::size_t> AttachmentSizes(
+ {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194,
+ 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u,
+ 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225});
+ auto It = AttachmentSizes.begin();
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ ZEN_ASSERT(It == AttachmentSizes.end());
+ }
+
+ // Note: This is a clone of the function in projectstore.cpp
+ auto ComputeOpKey = [](const CbObjectView& Op) -> Oid {
+ using namespace std::literals;
+
+ XXH3_128Stream_deprecated KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ return KeyHash;
+ };
+
+ auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
+ const Oid Id = ComputeOpKey(Op);
+ IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
+ Ops.insert({Id, OpCoreHash});
+ };
+
+ auto MakeProject = [](std::string_view UrlBase, std::string_view ProjectName) {
+ CbObjectWriter Project;
+ Project.AddString("id"sv, ProjectName);
+ Project.AddString("root"sv, ""sv);
+ Project.AddString("engine"sv, ""sv);
+ Project.AddString("project"sv, ""sv);
+ Project.AddString("projectfile"sv, ""sv);
+ IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer();
+ ProjectPayload.SetContentType(HttpContentType::kCbObject);
+
+ HttpClient Http{UrlBase};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}", ProjectName), ProjectPayload);
+ CHECK(Response);
+ };
+
+ auto MakeOplog = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient Http{UrlBase};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{});
+ CHECK(Response);
+ };
+
+ auto MakeOp = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName, const CbPackage& OpPackage) {
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
+ IoBuffer Body{IoBuffer::Wrap, MemOut.GetData(), MemOut.GetSize()};
+ Body.SetContentType(HttpContentType::kCbPackage);
+
+ HttpClient Http{UrlBase};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/new", ProjectName, OplogName), Body);
+ CHECK(Response);
+ };
+
+ MakeProject(Servers.GetInstance(0).GetBaseUri(), "proj0");
+ MakeOplog(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]);
+ CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size());
+ AddOp(OpPackage.GetObject(), SourceOps);
+ MakeOp(Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage);
+ }
+
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(Attachments.size());
+ for (const auto& AttachmentOplog : Attachments)
+ {
+ for (const auto& Attachment : AttachmentOplog.second)
+ {
+ AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash());
+ }
+ }
+
+ auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer {
+ CbObjectWriter Writer;
+ Write(Writer);
+ IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer();
+ Result.MakeOwned();
+ Result.SetContentType(HttpContentType::kCbObject);
+ return Result;
+ };
+
+ auto ValidateAttachments =
+ [&MakeCbObjectPayload, &AttachmentHashes, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) {
+ HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()};
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "getchunks"sv;
+ Writer << "chunks"sv << BeginArray;
+ for (const IoHash& Chunk : AttachmentHashes)
+ {
+ Writer << Chunk;
+ }
+ Writer << EndArray; // chunks
+ });
+
+ HttpClient::Response Response =
+ Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", Project, Oplog), Payload, {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Response);
+ CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
+ CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
+ for (auto A : ResponsePackage.GetAttachments())
+ {
+ CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash());
+ }
+ };
+
+ auto ValidateOplog = [&SourceOps, &AddOp, &Servers](int ServerIndex, std::string_view Project, std::string_view Oplog) {
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps;
+ std::vector<CbObject> ResultingOplog;
+
+ HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()};
+ HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries", Project, Oplog));
+ CHECK(Response);
+
+ IoBuffer Payload(Response.ResponsePayload);
+ CbObject OplogResonse = LoadCompactBinaryObject(Payload);
+ CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView();
+
+ for (CbFieldView OpEntry : EntriesArray)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ AddOp(Op, TargetOps);
+ }
+ CHECK(SourceOps == TargetOps);
+ };
+
+ auto HttpWaitForCompletion = [](ZenServerInstance& Server, const HttpClient::Response& Response) {
+ REQUIRE(Response);
+ const uint64_t JobId = ParseInt<uint64_t>(Response.AsText()).value_or(0);
+ CHECK(JobId != 0);
+
+ HttpClient Http{Server.GetBaseUri()};
+
+ while (true)
+ {
+ HttpClient::Response StatusResponse =
+ Http.Get(fmt::format("/admin/jobs/{}", JobId), {{"Accept", ToString(ZenContentType::kCbObject)}});
+ CHECK(StatusResponse);
+ CbObject ResponseObject = StatusResponse.AsObject();
+ std::string_view Status = ResponseObject["Status"sv].AsString();
+ CHECK(Status != "Aborted"sv);
+ if (Status == "Complete"sv)
+ {
+ return;
+ }
+ Sleep(10);
+ }
+ };
+
+ SUBCASE("File")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "chunkfilesizelimit"sv << 5u * 1024u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << path;
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(0), Response);
+ }
+ {
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << path;
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(1), Response);
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("File disable blocks")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "chunkfilesizelimit"sv << 5u * 1024u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "disableblocks"sv << true;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(0), Response);
+ }
+ {
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(1), Response);
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("File force temp blocks")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "chunkfilesizelimit"sv << 5u * 1024u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "enabletempblocks"sv << true;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(0), Response);
+ }
+ {
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(1), Response);
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("Zen")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
+ std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
+ MakeProject(ExportTargetUri, "proj0_copy");
+ MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy");
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "chunkfilesizelimit"sv << 5u * 1024u;
+ Writer << "force"sv << false;
+ Writer << "zen"sv << BeginObject;
+ {
+ Writer << "url"sv << ExportTargetUri.substr(7);
+ Writer << "project"
+ << "proj0_copy";
+ Writer << "oplog"
+ << "oplog0_copy";
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(0), Response);
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+
+ {
+ std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
+ std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
+ MakeProject(ImportTargetUri, "proj1");
+ MakeOplog(ImportTargetUri, "proj1", "oplog1");
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "zen"sv << BeginObject;
+ {
+ Writer << "url"sv << ImportSourceUri.substr(7);
+ Writer << "project"
+ << "proj0_copy";
+ Writer << "oplog"
+ << "oplog0_copy";
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+
+ HttpClient Http{Servers.GetInstance(2).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(2), Response);
+ }
+ ValidateAttachments(2, "proj1", "oplog1");
+ ValidateOplog(2, "proj1", "oplog1");
+ }
+}
+
+TEST_CASE("project.rpcappendop")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenServerTestHelper Servers("remote", 2);
+ Servers.SpawnServers("--debug");
+
+ std::vector<Oid> OpIds;
+ const size_t OpCount = 24;
+ OpIds.reserve(OpCount);
+ for (size_t I = 0; I < OpCount; ++I)
+ {
+ OpIds.emplace_back(Oid::NewOid());
+ }
+
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ std::vector<std::size_t> AttachmentSizes(
+ {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194,
+ 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u,
+ 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225});
+ auto It = AttachmentSizes.begin();
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ ZEN_ASSERT(It == AttachmentSizes.end());
+ }
+
+ // Note: This is a clone of the function in projectstore.cpp
+ auto ComputeOpKey = [](const CbObjectView& Op) -> Oid {
+ using namespace std::literals;
+
+ XXH3_128Stream_deprecated KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ return KeyHash;
+ };
+
+ auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
+ const Oid Id = ComputeOpKey(Op);
+ IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
+ Ops.insert({Id, OpCoreHash});
+ };
+
+ auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) {
+ CbObjectWriter Project;
+ Project.AddString("id"sv, ProjectName);
+ Project.AddString("root"sv, ""sv);
+ Project.AddString("engine"sv, ""sv);
+ Project.AddString("project"sv, ""sv);
+ Project.AddString("projectfile"sv, ""sv);
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save());
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+
+ auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+ auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ return Response.AsObject();
+ };
+
+ auto MakeOp =
+ [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> {
+ CbObjectWriter Request;
+ Request.AddString("method"sv, "appendops"sv);
+ Request.BeginArray("ops"sv);
+ {
+ Request.AddObject(Op);
+ }
+ Request.EndArray(); // "ops"
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save());
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+
+ CbObjectView ResponsePayload = Response.AsPackage().GetObject();
+ CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView();
+ std::vector<IoHash> Needs;
+ Needs.reserve(NeedArray.Num());
+ for (CbFieldView NeedView : NeedArray)
+ {
+ Needs.push_back(NeedView.AsHash());
+ }
+ return Needs;
+ };
+
+ auto SendAttachments = [](HttpClient& Client,
+ std::string_view ProjectName,
+ std::string_view OplogName,
+ std::span<const CompressedBuffer> Attachments,
+ void* ServerProcessHandle,
+ const std::filesystem::path& TempPath) {
+ CompositeBuffer PackageMessage;
+ {
+ CbPackage RequestPackage;
+ CbObjectWriter Request;
+ Request.AddString("method"sv, "putchunks"sv);
+ Request.AddBool("usingtmpfiles"sv, true);
+ Request.BeginArray("chunks"sv);
+ for (CompressedBuffer AttachmentPayload : Attachments)
+ {
+ if (AttachmentPayload.DecodeRawSize() > 16u * 1024u)
+ {
+ std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp");
+ WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed());
+ IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath);
+ AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment));
+ }
+
+ CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash());
+
+ Request.AddAttachment(Attachment);
+ RequestPackage.AddAttachment(Attachment);
+ }
+ Request.EndArray(); // "chunks"
+ RequestPackage.SetObject(Request.Save());
+
+ PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle));
+ }
+
+ HttpClient::Response Response =
+ Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+
+ {
+ HttpClient Client(Servers.GetInstance(0).GetBaseUri());
+ void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle();
+
+ MakeProject(Client, "proj0");
+ MakeOplog(Client, "proj0", "oplog0");
+ CbObject Oplog = GetOplog(Client, "proj0", "oplog0");
+ std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String();
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+
+ if (!MissingAttachments.empty())
+ {
+ CHECK(MissingAttachments.size() <= Attachments[OpId].size());
+ tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end());
+ std::vector<CompressedBuffer> PutAttachments;
+ for (const auto& Attachment : Attachments[OpId])
+ {
+ CompressedBuffer Payload = Attachment.second;
+ const IoHash AttachmentHash = Payload.DecodeRawHash();
+ if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end())
+ {
+ PutAttachments.push_back(Payload);
+ }
+ }
+ SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath);
+ }
+ }
+
+ // Do it again, but now we should not need any attachments
+
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+ CHECK(MissingAttachments.empty());
+ }
+ }
+
+ {
+ HttpClient Client(Servers.GetInstance(1).GetBaseUri());
+ void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk
+
+ MakeProject(Client, "proj0");
+ MakeOplog(Client, "proj0", "oplog0");
+ CbObject Oplog = GetOplog(Client, "proj0", "oplog0");
+ std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String();
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+
+ if (!MissingAttachments.empty())
+ {
+ CHECK(MissingAttachments.size() <= Attachments[OpId].size());
+ tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end());
+ std::vector<CompressedBuffer> PutAttachments;
+ for (const auto& Attachment : Attachments[OpId])
+ {
+ CompressedBuffer Payload = Attachment.second;
+ const IoHash AttachmentHash = Payload.DecodeRawHash();
+ if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end())
+ {
+ PutAttachments.push_back(Payload);
+ }
+ }
+ SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath);
+ }
+ }
+
+ // Do it again, but now we should not need any attachments
+
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+ CHECK(MissingAttachments.empty());
+ }
+ }
+}
+
+} // namespace zen::tests
+
+#endif