diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/compactbinaryutil.cpp | 8 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 21 | ||||
| -rw-r--r-- | src/zencore/include/zencore/process.h | 1 | ||||
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 270 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 602 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 3 |
6 files changed, 676 insertions, 229 deletions
diff --git a/src/zencore/compactbinaryutil.cpp b/src/zencore/compactbinaryutil.cpp index c8cde21c3..074bdaffd 100644 --- a/src/zencore/compactbinaryutil.cpp +++ b/src/zencore/compactbinaryutil.cpp @@ -14,7 +14,13 @@ ValidateAndReadCompactBinaryObject(const SharedBuffer&& Payload, CbValidateError { if (OutError = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default); OutError == CbValidateError::None) { - return CbObject(std::move(Payload)); + CbObject Object(std::move(Payload)); + if (Object.GetView().GetSize() != Payload.GetSize()) + { + OutError |= CbValidateError::OutOfBounds; + return {}; + } + return Object; } } return CbObject(); diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 5125beeca..8e6f5085f 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -1290,6 +1290,27 @@ MoveToFile(std::filesystem::path Path, IoBuffer Data) zen::CreateDirectories(Path.parent_path()); Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); } + if (!Success && (LastError == ERROR_ACCESS_DENIED)) + { + // Fallback to regular rename + std::error_code Ec; + std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + auto NativeSourcePath = SourcePath.native().c_str(); + auto NativeTargetPath = Path.native().c_str(); + Success = ::MoveFile(NativeSourcePath, NativeTargetPath); + if (!Success) + { + LastError = GetLastError(); + if (LastError == ERROR_PATH_NOT_FOUND) + { + zen::CreateDirectories(Path.parent_path()); + Success = ::MoveFile(NativeSourcePath, NativeTargetPath); + } + } + } + } } Memory::Free(RenameInfo); if (!Success) diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h index 98d352db6..04b79a1e0 100644 --- a/src/zencore/include/zencore/process.h +++ b/src/zencore/include/zencore/process.h @@ -33,6 +33,7 @@ public: ZENCORE_API bool Terminate(int ExitCode); ZENCORE_API void Reset(); [[nodiscard]] inline int Pid() const { return m_Pid; } + [[nodiscard]] inline void* Handle() const { return m_ProcessHandle; } private: void* m_ProcessHandle = nullptr; diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 0ea953937..8099091bb 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -3145,6 +3145,31 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse return Package; }; +CbObject +CreateOplogOp(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) +{ + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + ZEN_DEBUG("Added attachment {}", Attach.GetHash()); + } + Object.EndArray(); + } + return Object.Save(); +}; + cpr::Body AsBody(const IoBuffer& Payload) { @@ -3663,6 +3688,251 @@ TEST_CASE("project.remote") } } +TEST_CASE("project.rpcappendop") +{ + using namespace std::literals; + using namespace utils; + + ZenServerTestHelper Servers("remote", 2); + Servers.SpawnServers("--debug"); + + std::vector<Oid> OpIds; + const size_t OpCount = 24; + OpIds.reserve(OpCount); + for (size_t I = 0; I < OpCount; ++I) + { + OpIds.emplace_back(Oid::NewOid()); + } + + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + std::vector<std::size_t> AttachmentSizes( + {7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 33466, 1093, 4269, 2257, 3685, 13489, 97194, + 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 224024, 51582, 5251, 491, 2u * 1024u * 1024u + 124u, + 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526, 4097, 4855, 48225}); + auto It = AttachmentSizes.begin(); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[2]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[3]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[4]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[5]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[6]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[7]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[8]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[9]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++}); + ZEN_ASSERT(It == AttachmentSizes.end()); + } + + // Note: This is a clone of the function in projectstore.cpp + auto ComputeOpKey = [](const CbObjectView& Op) -> Oid { + using namespace std::literals; + + XXH3_128Stream_deprecated KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + return KeyHash; + }; + + auto AddOp = [ComputeOpKey](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { + const Oid Id = ComputeOpKey(Op); + IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); + Ops.insert({Id, OpCoreHash}); + }; + + auto MakeProject = [](HttpClient& Client, std::string_view ProjectName) { + CbObjectWriter Project; + Project.AddString("id"sv, ProjectName); + Project.AddString("root"sv, ""sv); + Project.AddString("engine"sv, ""sv); + Project.AddString("project"sv, ""sv); + Project.AddString("projectfile"sv, ""sv); + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save()); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + + auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { + HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + return Response.AsObject(); + }; + + auto MakeOp = + [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName, const CbObjectView& Op) -> std::vector<IoHash> { + CbObjectWriter Request; + Request.AddString("method"sv, "appendops"sv); + Request.BeginArray("ops"sv); + { + Request.AddObject(Op); + } + Request.EndArray(); // "ops" + HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save()); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + + CbObjectView ResponsePayload = Response.AsPackage().GetObject(); + CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView(); + std::vector<IoHash> Needs; + Needs.reserve(NeedArray.Num()); + for (CbFieldView NeedView : NeedArray) + { + Needs.push_back(NeedView.AsHash()); + } + return Needs; + }; + + auto SendAttachments = [](HttpClient& Client, + std::string_view ProjectName, + std::string_view OplogName, + std::span<const CompressedBuffer> Attachments, + void* ServerProcessHandle, + const std::filesystem::path& TempPath) { + CompositeBuffer PackageMessage; + { + CbPackage RequestPackage; + CbObjectWriter Request; + Request.AddString("method"sv, "putchunks"sv); + Request.AddBool("usingtmpfiles"sv, true); + Request.BeginArray("chunks"sv); + for (CompressedBuffer AttachmentPayload : Attachments) + { + if (AttachmentPayload.DecodeRawSize() > 16u * 1024u) + { + std::filesystem::path TempAttachmentPath = TempPath / (Oid::NewOid().ToString() + ".tmp"); + WriteFile(TempAttachmentPath, AttachmentPayload.GetCompressed()); + IoBuffer OnDiskAttachment = IoBufferBuilder::MakeFromFile(TempAttachmentPath); + AttachmentPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OnDiskAttachment)); + } + + CbAttachment Attachment(AttachmentPayload, AttachmentPayload.DecodeRawHash()); + + Request.AddAttachment(Attachment); + RequestPackage.AddAttachment(Attachment); + } + Request.EndArray(); // "chunks" + RequestPackage.SetObject(Request.Save()); + + PackageMessage = CompositeBuffer(FormatPackageMessage(RequestPackage, FormatFlags::kAllowLocalReferences, ServerProcessHandle)); + } + + HttpClient::Response Response = + Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), PackageMessage, HttpContentType::kCbPackage); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + }; + + { + HttpClient Client(Servers.GetInstance(0).GetBaseUri()); + void* ServerProcessHandle = Servers.GetInstance(0).GetProcessHandle(); + + MakeProject(Client, "proj0"); + MakeOplog(Client, "proj0", "oplog0"); + CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); + std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + + if (!MissingAttachments.empty()) + { + CHECK(MissingAttachments.size() <= Attachments[OpId].size()); + tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); + std::vector<CompressedBuffer> PutAttachments; + for (const auto& Attachment : Attachments[OpId]) + { + CompressedBuffer Payload = Attachment.second; + const IoHash AttachmentHash = Payload.DecodeRawHash(); + if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) + { + PutAttachments.push_back(Payload); + } + } + SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); + } + } + + // Do it again, but now we should not need any attachments + + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + CHECK(MissingAttachments.empty()); + } + } + + { + HttpClient Client(Servers.GetInstance(1).GetBaseUri()); + void* ServerProcessHandle = nullptr; // Force use of path for attachments passed on disk + + MakeProject(Client, "proj0"); + MakeOplog(Client, "proj0", "oplog0"); + CbObject Oplog = GetOplog(Client, "proj0", "oplog0"); + std::filesystem::path TempPath = Oplog["tempdir"sv].AsU8String(); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + + if (!MissingAttachments.empty()) + { + CHECK(MissingAttachments.size() <= Attachments[OpId].size()); + tsl::robin_set<IoHash, IoHash::Hasher> MissingAttachmentSet(MissingAttachments.begin(), MissingAttachments.end()); + std::vector<CompressedBuffer> PutAttachments; + for (const auto& Attachment : Attachments[OpId]) + { + CompressedBuffer Payload = Attachment.second; + const IoHash AttachmentHash = Payload.DecodeRawHash(); + if (auto It = MissingAttachmentSet.find(AttachmentHash); It != MissingAttachmentSet.end()) + { + PutAttachments.push_back(Payload); + } + } + SendAttachments(Client, "proj0", "oplog0", PutAttachments, ServerProcessHandle, TempPath); + } + } + + // Do it again, but now we should not need any attachments + + for (const Oid& OpId : OpIds) + { + CbObject Op = CreateOplogOp(OpId, Attachments[OpId]); + AddOp(Op, SourceOps); + std::vector<IoHash> MissingAttachments = MakeOp(Client, "proj0", "oplog0", Op); + CHECK(MissingAttachments.empty()); + } + } +} + std::vector<std::pair<std::filesystem::path, IoBuffer>> GenerateFolderContent(const std::filesystem::path& RootPath) { diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 09cbe1aa3..262b35ea2 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -991,11 +991,24 @@ struct ProjectStore::OplogStorage : public RefCounted } else { - Handler(CbObjectView(OpBuffer.GetData()), LogEntry); - MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); - const uint64_t EntryNextOpFileOffset = - RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); - NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); + CbObjectView OpView(OpBuffer.GetData()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + LogEntry.OpKeyHash, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(OpView, LogEntry); + MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); + const uint64_t EntryNextOpFileOffset = + RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); + NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); + } } } } @@ -1031,12 +1044,38 @@ struct ProjectStore::OplogStorage : public RefCounted MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); if (OpBufferView.GetSize() == Entry.Address.Size) { - Handler(Entry.Lsn, CbObjectView(OpBufferView.GetData())); + CbObjectView OpView(OpBufferView.GetData()); + if (OpView.GetSize() != OpBufferView.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBufferView.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } continue; } IoBuffer OpBuffer(Entry.Address.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - Handler(Entry.Lsn, CbObjectView(OpBuffer.Data())); + CbObjectView OpView(OpBuffer.Data()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } } } @@ -1272,7 +1311,10 @@ ProjectStore::Oplog::Flush() { RwLock::SharedLockScope Lock(m_OplogLock); - ZEN_ASSERT(m_Storage); + if (!m_Storage) + { + return; + } m_Storage->Flush(); if (!m_MetaValid) @@ -5871,284 +5913,390 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } Project->TouchOplog(OplogId); - if (Method == "import"sv) - { - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - HttpReq.WriteResponse(Result.first); - return Result.first != HttpResponseCode::BadRequest; - } - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - return true; - } - else if (Method == "export"sv) - { - std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - return true; - } - else if (Method == "getchunks"sv) - { - ZEN_TRACE_CPU("Store::Rpc::getchunks"); - - RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); - int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); + uint32_t MethodHash = HashStringDjb2(Method); - CbPackage ResponsePackage; - std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage); - if (Result.first == HttpResponseCode::OK) - { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + switch (MethodHash) + { + case HashStringDjb2("import"sv): { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + if (!AreDiskWritesAllowed()) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } + std::pair<HttpResponseCode, std::string> Result = + Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + HttpReq.WriteResponse(Result.first); + return Result.first != HttpResponseCode::BadRequest; } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + return true; } + case HashStringDjb2("export"sv): + { + std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + return true; + } + case HashStringDjb2("getchunks"sv): + { + ZEN_TRACE_CPU("Store::Rpc::getchunks"); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - return true; - } - else if (Method == "putchunks"sv) - { - ZEN_TRACE_CPU("Store::Rpc::putchunks"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - if (!Attachments.empty()) - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); + int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); - WriteAttachmentBuffers.reserve(Attachments.size()); - WriteRawHashes.reserve(Attachments.size()); + CbPackage ResponsePackage; + std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage); + if (Result.first == HttpResponseCode::OK) + { + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + } - for (const CbAttachment& Attachment : Attachments) - { - IoHash RawHash = Attachment.GetHash(); - const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); - WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(RawHash); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + return true; } + case HashStringDjb2("putchunks"sv): + { + ZEN_TRACE_CPU("Store::Rpc::putchunks"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } - Oplog->CaptureAddedAttachments(WriteRawHashes); - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - } - HttpReq.WriteResponse(HttpResponseCode::OK); - return true; - } - else if (Method == "snapshot"sv) - { - ZEN_TRACE_CPU("Store::Rpc::snapshot"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } + CbObject Object = Package.GetObject(); + const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); - // Snapshot all referenced files. This brings the content of all - // files into the CID store + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + if (!Attachments.empty()) + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; - uint32_t OpCount = 0; - uint64_t InlinedBytes = 0; - uint64_t InlinedFiles = 0; - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); - std::vector<CbObject> NewOps; - struct AddedChunk - { - IoBuffer Buffer; - uint64_t RawSize = 0; - }; - tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); + IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + if (UsingTempFiles) + { + AttachmentPayload.SetDeleteOnClose(true); + } + WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); + WriteRawHashes.push_back(RawHash); + } + + Oplog->CaptureAddedAttachments(WriteRawHashes); + m_CidStore.AddChunks(WriteAttachmentBuffers, + WriteRawHashes, + UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); + } + HttpReq.WriteResponse(HttpResponseCode::OK); + return true; + } + case HashStringDjb2("snapshot"sv): + { + ZEN_TRACE_CPU("Store::Rpc::snapshot"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } - Oplog->IterateOplog( - [&](CbObjectView Op) { - bool OpRewritten = false; - bool AllOk = true; + // Snapshot all referenced files. This brings the content of all + // files into the CID store - CbWriter FilesArrayWriter; - FilesArrayWriter.BeginArray("files"sv); + uint32_t OpCount = 0; + uint64_t InlinedBytes = 0; + uint64_t InlinedFiles = 0; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; - for (CbFieldView& Field : Op["files"sv]) + std::vector<CbObject> NewOps; + struct AddedChunk { - bool CopyField = true; + IoBuffer Buffer; + uint64_t RawSize = 0; + }; + tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; - if (CbObjectView View = Field.AsObjectView()) - { - const IoHash DataHash = View["data"sv].AsHash(); + Oplog->IterateOplog( + [&](CbObjectView Op) { + bool OpRewritten = false; + bool AllOk = true; + + CbWriter FilesArrayWriter; + FilesArrayWriter.BeginArray("files"sv); - if (DataHash == IoHash::Zero) + for (CbFieldView& Field : Op["files"sv]) { - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project->RootDir / ServerPath; - BasicFile DataFile; - std::error_code Ec; - DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + bool CopyField = true; - if (Ec) + if (CbObjectView View = Field.AsObjectView()) { - // Error... + const IoHash DataHash = View["data"sv].AsHash(); - ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); - - AllOk = false; - } - else - { - // Read file contents into memory, compress and keep in map of chunks to add to Cid store - IoBuffer FileIoBuffer = DataFile.ReadAll(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); - const uint64_t RawSize = Compressed.DecodeRawSize(); - const IoHash RawHash = Compressed.DecodeRawHash(); - if (!AddedChunks.contains(RawHash)) + if (DataHash == IoHash::Zero) { - const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); - BasicFile ChunkTempFile; - ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); - ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project->RootDir / ServerPath; + BasicFile DataFile; + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + if (Ec) { - Oid ChunkId = View["id"sv].AsObjectId(); - ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", - FilePath, - ChunkId, - Ec.message()); + // Error... + + ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); + AllOk = false; } else { - void* FileHandle = ChunkTempFile.Detach(); - IoBuffer ChunkBuffer(IoBuffer::File, - FileHandle, - 0, - Compressed.GetCompressed().GetSize(), - /*IsWholeFile*/ true); - ChunkBuffer.SetDeleteOnClose(true); - AddedChunks.insert_or_assign(RawHash, - AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); - } - } + // Read file contents into memory, compress and keep in map of chunks to add to Cid store + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); + const uint64_t RawSize = Compressed.DecodeRawSize(); + const IoHash RawHash = Compressed.DecodeRawHash(); + if (!AddedChunks.contains(RawHash)) + { + const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); + BasicFile ChunkTempFile; + ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); + ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + if (Ec) + { + Oid ChunkId = View["id"sv].AsObjectId(); + ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", + FilePath, + ChunkId, + Ec.message()); + AllOk = false; + } + else + { + void* FileHandle = ChunkTempFile.Detach(); + IoBuffer ChunkBuffer(IoBuffer::File, + FileHandle, + 0, + Compressed.GetCompressed().GetSize(), + /*IsWholeFile*/ true); + ChunkBuffer.SetDeleteOnClose(true); + AddedChunks.insert_or_assign( + RawHash, + AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); + } + } - TotalBytes += RawSize; - ++TotalFiles; + TotalBytes += RawSize; + ++TotalFiles; - // Rewrite file array entry with new data reference - CbObjectWriter Writer(View.GetSize()); - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; + // Rewrite file array entry with new data reference + CbObjectWriter Writer(View.GetSize()); + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, RawHash); + + CbObject RewrittenOp = Writer.Save(); + FilesArrayWriter.AddObject(std::move(RewrittenOp)); + CopyField = false; } - return false; - }); - Writer.AddBinaryAttachment("data"sv, RawHash); + } + } - CbObject RewrittenOp = Writer.Save(); - FilesArrayWriter.AddObject(std::move(RewrittenOp)); - CopyField = false; + if (CopyField) + { + FilesArrayWriter.AddField(Field); + } + else + { + OpRewritten = true; } } - } - if (CopyField) + if (OpRewritten && AllOk) + { + FilesArrayWriter.EndArray(); + CbArray FilesArray = FilesArrayWriter.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + + NewOps.push_back(std::move(RewrittenOp)); + } + + OpCount++; + }, + Oplog::Paging{}); + + CbObjectWriter ResponseObj; + + // Persist rewritten oplog entries + if (!NewOps.empty()) + { + ResponseObj.BeginArray("rewritten_ops"); + + for (CbObject& NewOp : NewOps) { - FilesArrayWriter.AddField(Field); + ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); + + ResponseObj.AddInteger(NewLsn.Number); } - else + + ResponseObj.EndArray(); + } + + // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the + // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have + // unreferenced chunks. + for (auto It : AddedChunks) + { + const IoHash& RawHash = It.first; + AddedChunk& Chunk = It.second; + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); + if (Result.New) { - OpRewritten = true; + InlinedBytes += Chunk.RawSize; + ++InlinedFiles; } } - if (OpRewritten && AllOk) - { - FilesArrayWriter.EndArray(); - CbArray FilesArray = FilesArrayWriter.Save().AsArray(); + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; + ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; - CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { - if (Field.GetName() == "files"sv) - { - NewWriter.AddArray("files"sv, FilesArray); + ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); - return true; - } + HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); + return true; + } + case HashStringDjb2("addopattachments"sv): + { + ZEN_TRACE_CPU("Store::Rpc::addopattachments"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } + } + case HashStringDjb2("appendops"sv): + { + ZEN_TRACE_CPU("Store::Rpc::appendops"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } - return false; - }); + CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); - NewOps.push_back(std::move(RewrittenOp)); + size_t OpsBufferSize = 0; + for (CbFieldView OpView : OpsArray) + { + OpsBufferSize += OpView.GetSize(); } + UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); + MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); - OpCount++; - }, - Oplog::Paging{}); - - CbObjectWriter ResponseObj; + std::vector<CbObjectView> Ops; + Ops.reserve(OpsArray.Num()); + for (CbFieldView OpView : OpsArray) + { + OpView.CopyTo(OpsBuffersMemory); + Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); + OpsBuffersMemory.MidInline(OpView.GetSize()); + } - // Persist rewritten oplog entries - if (!NewOps.empty()) - { - ResponseObj.BeginArray("rewritten_ops"); + std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops); + ZEN_ASSERT(LSNs.size() == Ops.size()); - for (CbObject& NewOp : NewOps) - { - ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + std::vector<IoHash> MissingAttachments; + for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) + { + if (LSNs[OpIndex]) + { + CbObjectView Op = Ops[OpIndex]; + Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { + const IoHash Cid = AttachmentView.AsAttachment(); + if (!m_CidStore.ContainsChunk(Cid)) + { + MissingAttachments.push_back(Cid); + } + }); + } + } - ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); + CbPackage ResponsePackage; - ResponseObj.AddInteger(NewLsn.Number); - } + { + CbObjectWriter ResponseObj; + ResponseObj.BeginArray("written_ops"); - ResponseObj.EndArray(); - } + for (ProjectStore::LogSequenceNumber NewLsn : LSNs) + { + ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); + ResponseObj.AddInteger(NewLsn.Number); + } + ResponseObj.EndArray(); - // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the - // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have - // unreferenced chunks. - for (auto It : AddedChunks) - { - const IoHash& RawHash = It.first; - AddedChunk& Chunk = It.second; - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); - if (Result.New) - { - InlinedBytes += Chunk.RawSize; - ++InlinedFiles; - } - } + if (!MissingAttachments.empty()) + { + ResponseObj.BeginArray("need"); - ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; - ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; + for (const IoHash& Cid : MissingAttachments) + { + ResponseObj.AddHash(Cid); + } + ResponseObj.EndArray(); + } + ResponsePackage.SetObject(ResponseObj.Save()); + } - ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); + std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage); - HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); - return true; + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); + return true; + } + default: + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); + return false; } - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); return true; } diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 75009b045..0da63285b 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -74,8 +74,9 @@ struct ZenServerInstance void EnableTermination() { m_Terminate = true; } void DisableShutdownOnDestroy() { m_ShutdownOnDestroy = false; } void Detach(); - inline int GetPid() { return m_Process.Pid(); } + inline int GetPid() const { return m_Process.Pid(); } inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } + void* GetProcessHandle() const { return m_Process.Handle(); } bool IsRunning(); bool Terminate(); std::string GetLogOutput() const; |