aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zencore/compactbinaryutil.cpp8
-rw-r--r--src/zencore/filesystem.cpp21
-rw-r--r--src/zencore/include/zencore/process.h1
-rw-r--r--src/zenserver-test/zenserver-test.cpp270
-rw-r--r--src/zenserver/projectstore/projectstore.cpp602
-rw-r--r--src/zenutil/include/zenutil/zenserverprocess.h3
6 files changed, 676 insertions, 229 deletions
diff --git a/src/zencore/compactbinaryutil.cpp b/src/zencore/compactbinaryutil.cpp
index c8cde21c3..074bdaffd 100644
--- a/src/zencore/compactbinaryutil.cpp
+++ b/src/zencore/compactbinaryutil.cpp
@@ -14,7 +14,13 @@ ValidateAndReadCompactBinaryObject(const SharedBuffer&& Payload, CbValidateError
{
if (OutError = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default); OutError == CbValidateError::None)
{
- return CbObject(std::move(Payload));
+ CbObject Object(std::move(Payload));
+ if (Object.GetView().GetSize() != Payload.GetSize())
+ {
+ OutError |= CbValidateError::OutOfBounds;
+ return {};
+ }
+ return Object;
}
}
return CbObject();
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 5125beeca..8e6f5085f 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -1290,6 +1290,27 @@ MoveToFile(std::filesystem::path Path, IoBuffer Data)
zen::CreateDirectories(Path.parent_path());
Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
}
+ if (!Success && (LastError == ERROR_ACCESS_DENIED))
+ {
+ // Fallback to regular rename
+ std::error_code Ec;
+ std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ auto NativeSourcePath = SourcePath.native().c_str();
+ auto NativeTargetPath = Path.native().c_str();
+ Success = ::MoveFile(NativeSourcePath, NativeTargetPath);
+ if (!Success)
+ {
+ LastError = GetLastError();
+ if (LastError == ERROR_PATH_NOT_FOUND)
+ {
+ zen::CreateDirectories(Path.parent_path());
+ Success = ::MoveFile(NativeSourcePath, NativeTargetPath);
+ }
+ }
+ }
+ }
}
Memory::Free(RenameInfo);
if (!Success)
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h
index 98d352db6..04b79a1e0 100644
--- a/src/zencore/include/zencore/process.h
+++ b/src/zencore/include/zencore/process.h
@@ -33,6 +33,7 @@ public:
ZENCORE_API bool Terminate(int ExitCode);
ZENCORE_API void Reset();
[[nodiscard]] inline int Pid() const { return m_Pid; }
+ [[nodiscard]] inline void* Handle() const { return m_ProcessHandle; }
private:
void* m_ProcessHandle = nullptr;
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 0ea953937..8099091bb 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -3145,6 +3145,31 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse
return Package;
};
+CbObject
+CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+{
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ ZEN_DEBUG("Added attachment {}", Attach.GetHash());
+ }
+ Object.EndArray();
+ }
+ return Object.Save();
+};
+
cpr::Body
AsBody(const IoBuffer& Payload)
{
@@ -3663,6 +3688,251 @@ TEST_CASE("project.remote")
}
}
+TEST_CASE("project.rpcappendop")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenServerTestHelper Servers("remote", 2);
+ Servers.SpawnServers("--debug");
+
+ std::vector<Oid> OpIds;
+ const size_t OpCount = 24;
+ OpIds.reserve(OpCount);
+ for (size_t I = 0; I < OpCount; ++I)
+ {
+ OpIds.emplace_back(Oid::NewOid());
+ }
+
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ std::vector<std::size_t> AttachmentSizes(
+ {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194,
+ 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u,
+ 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225});
+ auto It = AttachmentSizes.begin();
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
+ ZEN_ASSERT(It == AttachmentSizes.end());
+ }
+
+ // Note: This is a clone of the function in projectstore.cpp
+ auto ComputeOpKey = [](const CbObjectView& Op) -> Oid {
+ using namespace std::literals;
+
+ XXH3_128Stream_deprecated KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ return KeyHash;
+ };
+
+ auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
+ const Oid Id = ComputeOpKey(Op);
+ IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
+ Ops.insert({Id, OpCoreHash});
+ };
+
+ auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) {
+ CbObjectWriter Project;
+ Project.AddString("id"sv, ProjectName);
+ Project.AddString("root"sv, ""sv);
+ Project.AddString("engine"sv, ""sv);
+ Project.AddString("project"sv, ""sv);
+ Project.AddString("projectfile"sv, ""sv);
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save());
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+
+ auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+ auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
+ HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ return Response.AsObject();
+ };
+
+ auto MakeOp =
+ [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> {
+ CbObjectWriter Request;
+ Request.AddString("method"sv, "appendops"sv);
+ Request.BeginArray("ops"sv);
+ {
+ Request.AddObject(Op);
+ }
+ Request.EndArray(); // "ops"
+ HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save());
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+
+ CbObjectView ResponsePayload = Response.AsPackage().GetObject();
+ CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView();
+ std::vector<IoHash> Needs;
+ Needs.reserve(NeedArray.Num());
+ for (CbFieldView NeedView : NeedArray)
+ {
+ Needs.push_back(NeedView.AsHash());
+ }
+ return Needs;
+ };
+
+ auto SendAttachments = [](HttpClient& Client,
+ std::string_view ProjectName,
+ std::string_view OplogName,
+ std::span<const CompressedBuffer> Attachments,
+ void* ServerProcessHandle,
+ const std::filesystem::path& TempPath) {
+ CompositeBuffer PackageMessage;
+ {
+ CbPackage RequestPackage;
+ CbObjectWriter Request;
+ Request.AddString("method"sv, "putchunks"sv);
+ Request.AddBool("usingtmpfiles"sv, true);
+ Request.BeginArray("chunks"sv);
+ for (CompressedBuffer AttachmentPayload : Attachments)
+ {
+ if (AttachmentPayload.DecodeRawSize() > 16u * 1024u)
+ {
+ std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp");
+ WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed());
+ IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath);
+ AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment));
+ }
+
+ CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash());
+
+ Request.AddAttachment(Attachment);
+ RequestPackage.AddAttachment(Attachment);
+ }
+ Request.EndArray(); // "chunks"
+ RequestPackage.SetObject(Request.Save());
+
+ PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle));
+ }
+
+ HttpClient::Response Response =
+ Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ };
+
+ {
+ HttpClient Client(Servers.GetInstance(0).GetBaseUri());
+ void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle();
+
+ MakeProject(Client, "proj0");
+ MakeOplog(Client, "proj0", "oplog0");
+ CbObject Oplog = GetOplog(Client, "proj0", "oplog0");
+ std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String();
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+
+ if (!MissingAttachments.empty())
+ {
+ CHECK(MissingAttachments.size() <= Attachments[OpId].size());
+ tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end());
+ std::vector<CompressedBuffer> PutAttachments;
+ for (const auto& Attachment : Attachments[OpId])
+ {
+ CompressedBuffer Payload = Attachment.second;
+ const IoHash AttachmentHash = Payload.DecodeRawHash();
+ if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end())
+ {
+ PutAttachments.push_back(Payload);
+ }
+ }
+ SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath);
+ }
+ }
+
+ // Do it again, but now we should not need any attachments
+
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+ CHECK(MissingAttachments.empty());
+ }
+ }
+
+ {
+ HttpClient Client(Servers.GetInstance(1).GetBaseUri());
+ void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk
+
+ MakeProject(Client, "proj0");
+ MakeOplog(Client, "proj0", "oplog0");
+ CbObject Oplog = GetOplog(Client, "proj0", "oplog0");
+ std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String();
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+
+ if (!MissingAttachments.empty())
+ {
+ CHECK(MissingAttachments.size() <= Attachments[OpId].size());
+ tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end());
+ std::vector<CompressedBuffer> PutAttachments;
+ for (const auto& Attachment : Attachments[OpId])
+ {
+ CompressedBuffer Payload = Attachment.second;
+ const IoHash AttachmentHash = Payload.DecodeRawHash();
+ if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end())
+ {
+ PutAttachments.push_back(Payload);
+ }
+ }
+ SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath);
+ }
+ }
+
+ // Do it again, but now we should not need any attachments
+
+ for (const Oid& OpId : OpIds)
+ {
+ CbObject Op = CreateOplogOp(OpId, Attachments[OpId]);
+ AddOp(Op, SourceOps);
+ std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op);
+ CHECK(MissingAttachments.empty());
+ }
+ }
+}
+
std::vector<std::pair<std::filesystem::path, IoBuffer>>
GenerateFolderContent(const std::filesystem::path& RootPath)
{
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 09cbe1aa3..262b35ea2 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -991,11 +991,24 @@ struct ProjectStore::OplogStorage : public RefCounted
}
else
{
- Handler(CbObjectView(OpBuffer.GetData()), LogEntry);
- MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number);
- const uint64_t EntryNextOpFileOffset =
- RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
- NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
+ CbObjectView OpView(OpBuffer.GetData());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(OpView, LogEntry);
+ MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number);
+ const uint64_t EntryNextOpFileOffset =
+ RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
+ NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
+ }
}
}
}
@@ -1031,12 +1044,38 @@ struct ProjectStore::OplogStorage : public RefCounted
MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
if (OpBufferView.GetSize() == Entry.Address.Size)
{
- Handler(Entry.Lsn, CbObjectView(OpBufferView.GetData()));
+ CbObjectView OpView(OpBufferView.GetData());
+ if (OpView.GetSize() != OpBufferView.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBufferView.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
continue;
}
IoBuffer OpBuffer(Entry.Address.Size);
OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- Handler(Entry.Lsn, CbObjectView(OpBuffer.Data()));
+ CbObjectView OpView(OpBuffer.Data());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
}
@@ -1272,7 +1311,10 @@ ProjectStore::Oplog::Flush()
{
RwLock::SharedLockScope Lock(m_OplogLock);
- ZEN_ASSERT(m_Storage);
+ if (!m_Storage)
+ {
+ return;
+ }
m_Storage->Flush();
if (!m_MetaValid)
@@ -5871,284 +5913,390 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Project->TouchOplog(OplogId);
- if (Method == "import"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
- std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- else if (Method == "export"sv)
- {
- std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- else if (Method == "getchunks"sv)
- {
- ZEN_TRACE_CPU("Store::Rpc::getchunks");
-
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
- int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
+ uint32_t MethodHash = HashStringDjb2(Method);
- CbPackage ResponsePackage;
- std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
- if (Result.first == HttpResponseCode::OK)
- {
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ switch (MethodHash)
+ {
+ case HashStringDjb2("import"sv):
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ if (!AreDiskWritesAllowed())
{
- Flags |= FormatFlags::kDenyPartialLocalReferences;
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
+ std::pair<HttpResponseCode, std::string> Result =
+ Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ HttpReq.WriteResponse(Result.first);
+ return Result.first != HttpResponseCode::BadRequest;
}
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ return true;
}
+ case HashStringDjb2("export"sv):
+ {
+ std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ return true;
+ }
+ case HashStringDjb2("getchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::getchunks");
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- return true;
- }
- else if (Method == "putchunks"sv)
- {
- ZEN_TRACE_CPU("Store::Rpc::putchunks");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- if (!Attachments.empty())
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
- WriteAttachmentBuffers.reserve(Attachments.size());
- WriteRawHashes.reserve(Attachments.size());
+ CbPackage ResponsePackage;
+ std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ }
- for (const CbAttachment& Attachment : Attachments)
- {
- IoHash RawHash = Attachment.GetHash();
- const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
- WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(RawHash);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ return true;
}
+ case HashStringDjb2("putchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::putchunks");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
- Oplog->CaptureAddedAttachments(WriteRawHashes);
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
- }
- HttpReq.WriteResponse(HttpResponseCode::OK);
- return true;
- }
- else if (Method == "snapshot"sv)
- {
- ZEN_TRACE_CPU("Store::Rpc::snapshot");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
+ CbObject Object = Package.GetObject();
+ const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
- // Snapshot all referenced files. This brings the content of all
- // files into the CID store
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ if (!Attachments.empty())
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
- uint32_t OpCount = 0;
- uint64_t InlinedBytes = 0;
- uint64_t InlinedFiles = 0;
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
- std::vector<CbObject> NewOps;
- struct AddedChunk
- {
- IoBuffer Buffer;
- uint64_t RawSize = 0;
- };
- tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
+ IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ if (UsingTempFiles)
+ {
+ AttachmentPayload.SetDeleteOnClose(true);
+ }
+ WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ Oplog->CaptureAddedAttachments(WriteRawHashes);
+ m_CidStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
+ }
+ HttpReq.WriteResponse(HttpResponseCode::OK);
+ return true;
+ }
+ case HashStringDjb2("snapshot"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::snapshot");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
- Oplog->IterateOplog(
- [&](CbObjectView Op) {
- bool OpRewritten = false;
- bool AllOk = true;
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
- CbWriter FilesArrayWriter;
- FilesArrayWriter.BeginArray("files"sv);
+ uint32_t OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
- for (CbFieldView& Field : Op["files"sv])
+ std::vector<CbObject> NewOps;
+ struct AddedChunk
{
- bool CopyField = true;
+ IoBuffer Buffer;
+ uint64_t RawSize = 0;
+ };
+ tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
- if (CbObjectView View = Field.AsObjectView())
- {
- const IoHash DataHash = View["data"sv].AsHash();
+ Oplog->IterateOplog(
+ [&](CbObjectView Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter FilesArrayWriter;
+ FilesArrayWriter.BeginArray("files"sv);
- if (DataHash == IoHash::Zero)
+ for (CbFieldView& Field : Op["files"sv])
{
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project->RootDir / ServerPath;
- BasicFile DataFile;
- std::error_code Ec;
- DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+ bool CopyField = true;
- if (Ec)
+ if (CbObjectView View = Field.AsObjectView())
{
- // Error...
+ const IoHash DataHash = View["data"sv].AsHash();
- ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
-
- AllOk = false;
- }
- else
- {
- // Read file contents into memory, compress and keep in map of chunks to add to Cid store
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
- const uint64_t RawSize = Compressed.DecodeRawSize();
- const IoHash RawHash = Compressed.DecodeRawHash();
- if (!AddedChunks.contains(RawHash))
+ if (DataHash == IoHash::Zero)
{
- const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
- BasicFile ChunkTempFile;
- ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
- ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
if (Ec)
{
- Oid ChunkId = View["id"sv].AsObjectId();
- ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
- FilePath,
- ChunkId,
- Ec.message());
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
AllOk = false;
}
else
{
- void* FileHandle = ChunkTempFile.Detach();
- IoBuffer ChunkBuffer(IoBuffer::File,
- FileHandle,
- 0,
- Compressed.GetCompressed().GetSize(),
- /*IsWholeFile*/ true);
- ChunkBuffer.SetDeleteOnClose(true);
- AddedChunks.insert_or_assign(RawHash,
- AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
- }
- }
+ // Read file contents into memory, compress and keep in map of chunks to add to Cid store
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ if (!AddedChunks.contains(RawHash))
+ {
+ const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
+ BasicFile ChunkTempFile;
+ ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
+ ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ if (Ec)
+ {
+ Oid ChunkId = View["id"sv].AsObjectId();
+ ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
+ FilePath,
+ ChunkId,
+ Ec.message());
+ AllOk = false;
+ }
+ else
+ {
+ void* FileHandle = ChunkTempFile.Detach();
+ IoBuffer ChunkBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ Compressed.GetCompressed().GetSize(),
+ /*IsWholeFile*/ true);
+ ChunkBuffer.SetDeleteOnClose(true);
+ AddedChunks.insert_or_assign(
+ RawHash,
+ AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ }
+ }
- TotalBytes += RawSize;
- ++TotalFiles;
+ TotalBytes += RawSize;
+ ++TotalFiles;
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer(View.GetSize());
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer(View.GetSize());
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ CbObject RewrittenOp = Writer.Save();
+ FilesArrayWriter.AddObject(std::move(RewrittenOp));
+ CopyField = false;
}
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, RawHash);
+ }
+ }
- CbObject RewrittenOp = Writer.Save();
- FilesArrayWriter.AddObject(std::move(RewrittenOp));
- CopyField = false;
+ if (CopyField)
+ {
+ FilesArrayWriter.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
}
}
- }
- if (CopyField)
+ if (OpRewritten && AllOk)
+ {
+ FilesArrayWriter.EndArray();
+ CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ },
+ Oplog::Paging{});
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
{
- FilesArrayWriter.AddField(Field);
+ ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+
+ ResponseObj.AddInteger(NewLsn.Number);
}
- else
+
+ ResponseObj.EndArray();
+ }
+
+ // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
+ // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // unreferenced chunks.
+ for (auto It : AddedChunks)
+ {
+ const IoHash& RawHash = It.first;
+ AddedChunk& Chunk = It.second;
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ if (Result.New)
{
- OpRewritten = true;
+ InlinedBytes += Chunk.RawSize;
+ ++InlinedFiles;
}
}
- if (OpRewritten && AllOk)
- {
- FilesArrayWriter.EndArray();
- CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
- CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
- if (Field.GetName() == "files"sv)
- {
- NewWriter.AddArray("files"sv, FilesArray);
+ ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
- return true;
- }
+ HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ return true;
+ }
+ case HashStringDjb2("addopattachments"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::addopattachments");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
+ }
+ case HashStringDjb2("appendops"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::appendops");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
- return false;
- });
+ CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
- NewOps.push_back(std::move(RewrittenOp));
+ size_t OpsBufferSize = 0;
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpsBufferSize += OpView.GetSize();
}
+ UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
+ MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
- OpCount++;
- },
- Oplog::Paging{});
-
- CbObjectWriter ResponseObj;
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpsArray.Num());
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpView.CopyTo(OpsBuffersMemory);
+ Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
+ OpsBuffersMemory.MidInline(OpView.GetSize());
+ }
- // Persist rewritten oplog entries
- if (!NewOps.empty())
- {
- ResponseObj.BeginArray("rewritten_ops");
+ std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
+ ZEN_ASSERT(LSNs.size() == Ops.size());
- for (CbObject& NewOp : NewOps)
- {
- ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+ std::vector<IoHash> MissingAttachments;
+ for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
+ {
+ if (LSNs[OpIndex])
+ {
+ CbObjectView Op = Ops[OpIndex];
+ Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
+ const IoHash Cid = AttachmentView.AsAttachment();
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ MissingAttachments.push_back(Cid);
+ }
+ });
+ }
+ }
- ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+ CbPackage ResponsePackage;
- ResponseObj.AddInteger(NewLsn.Number);
- }
+ {
+ CbObjectWriter ResponseObj;
+ ResponseObj.BeginArray("written_ops");
- ResponseObj.EndArray();
- }
+ for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
+ {
+ ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+ ResponseObj.EndArray();
- // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
- // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
- // unreferenced chunks.
- for (auto It : AddedChunks)
- {
- const IoHash& RawHash = It.first;
- AddedChunk& Chunk = It.second;
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
- if (Result.New)
- {
- InlinedBytes += Chunk.RawSize;
- ++InlinedFiles;
- }
- }
+ if (!MissingAttachments.empty())
+ {
+ ResponseObj.BeginArray("need");
- ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
- ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+ for (const IoHash& Cid : MissingAttachments)
+ {
+ ResponseObj.AddHash(Cid);
+ }
+ ResponseObj.EndArray();
+ }
+ ResponsePackage.SetObject(ResponseObj.Save());
+ }
- ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
+ std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
- HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
- return true;
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
+ return true;
+ }
+ default:
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
+ return false;
}
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
return true;
}
diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h
index 75009b045..0da63285b 100644
--- a/src/zenutil/include/zenutil/zenserverprocess.h
+++ b/src/zenutil/include/zenutil/zenserverprocess.h
@@ -74,8 +74,9 @@ struct ZenServerInstance
void EnableTermination() { m_Terminate = true; }
void DisableShutdownOnDestroy() { m_ShutdownOnDestroy = false; }
void Detach();
- inline int GetPid() { return m_Process.Pid(); }
+ inline int GetPid() const { return m_Process.Pid(); }
inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; }
+ void* GetProcessHandle() const { return m_Process.Handle(); }
bool IsRunning();
bool Terminate();
std::string GetLogOutput() const;