diff options
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 602 |
1 files changed, 375 insertions, 227 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 09cbe1aa3..262b35ea2 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -991,11 +991,24 @@ struct ProjectStore::OplogStorage : public RefCounted } else { - Handler(CbObjectView(OpBuffer.GetData()), LogEntry); - MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); - const uint64_t EntryNextOpFileOffset = - RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); - NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); + CbObjectView OpView(OpBuffer.GetData()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + LogEntry.OpKeyHash, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(OpView, LogEntry); + MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); + const uint64_t EntryNextOpFileOffset = + RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); + NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); + } } } } @@ -1031,12 +1044,38 @@ struct ProjectStore::OplogStorage : public RefCounted MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); if (OpBufferView.GetSize() == Entry.Address.Size) { - Handler(Entry.Lsn, CbObjectView(OpBufferView.GetData())); + CbObjectView OpView(OpBufferView.GetData()); + if (OpView.GetSize() != OpBufferView.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBufferView.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } continue; } IoBuffer OpBuffer(Entry.Address.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - Handler(Entry.Lsn, CbObjectView(OpBuffer.Data())); + CbObjectView OpView(OpBuffer.Data()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } } } @@ -1272,7 +1311,10 @@ ProjectStore::Oplog::Flush() { RwLock::SharedLockScope Lock(m_OplogLock); - ZEN_ASSERT(m_Storage); + if (!m_Storage) + { + return; + } m_Storage->Flush(); if (!m_MetaValid) @@ -5871,284 +5913,390 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } Project->TouchOplog(OplogId); - if (Method == "import"sv) - { - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - HttpReq.WriteResponse(Result.first); - return Result.first != HttpResponseCode::BadRequest; - } - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - return true; - } - else if (Method == "export"sv) - { - std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - return true; - } - else if (Method == "getchunks"sv) - { - ZEN_TRACE_CPU("Store::Rpc::getchunks"); - - RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); - int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); + uint32_t MethodHash = HashStringDjb2(Method); - CbPackage ResponsePackage; - std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage); - if (Result.first == HttpResponseCode::OK) - { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + switch (MethodHash) + { + case HashStringDjb2("import"sv): { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + if (!AreDiskWritesAllowed()) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } + std::pair<HttpResponseCode, std::string> Result = + Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + HttpReq.WriteResponse(Result.first); + return Result.first != HttpResponseCode::BadRequest; } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + return true; } + case HashStringDjb2("export"sv): + { + std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + return true; + } + case HashStringDjb2("getchunks"sv): + { + ZEN_TRACE_CPU("Store::Rpc::getchunks"); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - return true; - } - else if (Method == "putchunks"sv) - { - ZEN_TRACE_CPU("Store::Rpc::putchunks"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - if (!Attachments.empty()) - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); + int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); - WriteAttachmentBuffers.reserve(Attachments.size()); - WriteRawHashes.reserve(Attachments.size()); + CbPackage ResponsePackage; + std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage); + if (Result.first == HttpResponseCode::OK) + { + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + } - for (const CbAttachment& Attachment : Attachments) - { - IoHash RawHash = Attachment.GetHash(); - const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); - WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(RawHash); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + return true; } + case HashStringDjb2("putchunks"sv): + { + ZEN_TRACE_CPU("Store::Rpc::putchunks"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } - Oplog->CaptureAddedAttachments(WriteRawHashes); - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - } - HttpReq.WriteResponse(HttpResponseCode::OK); - return true; - } - else if (Method == "snapshot"sv) - { - ZEN_TRACE_CPU("Store::Rpc::snapshot"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } + CbObject Object = Package.GetObject(); + const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); - // Snapshot all referenced files. This brings the content of all - // files into the CID store + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + if (!Attachments.empty()) + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; - uint32_t OpCount = 0; - uint64_t InlinedBytes = 0; - uint64_t InlinedFiles = 0; - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); - std::vector<CbObject> NewOps; - struct AddedChunk - { - IoBuffer Buffer; - uint64_t RawSize = 0; - }; - tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); + IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + if (UsingTempFiles) + { + AttachmentPayload.SetDeleteOnClose(true); + } + WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); + WriteRawHashes.push_back(RawHash); + } + + Oplog->CaptureAddedAttachments(WriteRawHashes); + m_CidStore.AddChunks(WriteAttachmentBuffers, + WriteRawHashes, + UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); + } + HttpReq.WriteResponse(HttpResponseCode::OK); + return true; + } + case HashStringDjb2("snapshot"sv): + { + ZEN_TRACE_CPU("Store::Rpc::snapshot"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } - Oplog->IterateOplog( - [&](CbObjectView Op) { - bool OpRewritten = false; - bool AllOk = true; + // Snapshot all referenced files. This brings the content of all + // files into the CID store - CbWriter FilesArrayWriter; - FilesArrayWriter.BeginArray("files"sv); + uint32_t OpCount = 0; + uint64_t InlinedBytes = 0; + uint64_t InlinedFiles = 0; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; - for (CbFieldView& Field : Op["files"sv]) + std::vector<CbObject> NewOps; + struct AddedChunk { - bool CopyField = true; + IoBuffer Buffer; + uint64_t RawSize = 0; + }; + tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; - if (CbObjectView View = Field.AsObjectView()) - { - const IoHash DataHash = View["data"sv].AsHash(); + Oplog->IterateOplog( + [&](CbObjectView Op) { + bool OpRewritten = false; + bool AllOk = true; + + CbWriter FilesArrayWriter; + FilesArrayWriter.BeginArray("files"sv); - if (DataHash == IoHash::Zero) + for (CbFieldView& Field : Op["files"sv]) { - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project->RootDir / ServerPath; - BasicFile DataFile; - std::error_code Ec; - DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + bool CopyField = true; - if (Ec) + if (CbObjectView View = Field.AsObjectView()) { - // Error... + const IoHash DataHash = View["data"sv].AsHash(); - ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); - - AllOk = false; - } - else - { - // Read file contents into memory, compress and keep in map of chunks to add to Cid store - IoBuffer FileIoBuffer = DataFile.ReadAll(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); - const uint64_t RawSize = Compressed.DecodeRawSize(); - const IoHash RawHash = Compressed.DecodeRawHash(); - if (!AddedChunks.contains(RawHash)) + if (DataHash == IoHash::Zero) { - const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); - BasicFile ChunkTempFile; - ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); - ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project->RootDir / ServerPath; + BasicFile DataFile; + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + if (Ec) { - Oid ChunkId = View["id"sv].AsObjectId(); - ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", - FilePath, - ChunkId, - Ec.message()); + // Error... + + ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); + AllOk = false; } else { - void* FileHandle = ChunkTempFile.Detach(); - IoBuffer ChunkBuffer(IoBuffer::File, - FileHandle, - 0, - Compressed.GetCompressed().GetSize(), - /*IsWholeFile*/ true); - ChunkBuffer.SetDeleteOnClose(true); - AddedChunks.insert_or_assign(RawHash, - AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); - } - } + // Read file contents into memory, compress and keep in map of chunks to add to Cid store + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); + const uint64_t RawSize = Compressed.DecodeRawSize(); + const IoHash RawHash = Compressed.DecodeRawHash(); + if (!AddedChunks.contains(RawHash)) + { + const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); + BasicFile ChunkTempFile; + ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); + ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + if (Ec) + { + Oid ChunkId = View["id"sv].AsObjectId(); + ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", + FilePath, + ChunkId, + Ec.message()); + AllOk = false; + } + else + { + void* FileHandle = ChunkTempFile.Detach(); + IoBuffer ChunkBuffer(IoBuffer::File, + FileHandle, + 0, + Compressed.GetCompressed().GetSize(), + /*IsWholeFile*/ true); + ChunkBuffer.SetDeleteOnClose(true); + AddedChunks.insert_or_assign( + RawHash, + AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); + } + } - TotalBytes += RawSize; - ++TotalFiles; + TotalBytes += RawSize; + ++TotalFiles; - // Rewrite file array entry with new data reference - CbObjectWriter Writer(View.GetSize()); - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; + // Rewrite file array entry with new data reference + CbObjectWriter Writer(View.GetSize()); + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, RawHash); + + CbObject RewrittenOp = Writer.Save(); + FilesArrayWriter.AddObject(std::move(RewrittenOp)); + CopyField = false; } - return false; - }); - Writer.AddBinaryAttachment("data"sv, RawHash); + } + } - CbObject RewrittenOp = Writer.Save(); - FilesArrayWriter.AddObject(std::move(RewrittenOp)); - CopyField = false; + if (CopyField) + { + FilesArrayWriter.AddField(Field); + } + else + { + OpRewritten = true; } } - } - if (CopyField) + if (OpRewritten && AllOk) + { + FilesArrayWriter.EndArray(); + CbArray FilesArray = FilesArrayWriter.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + + NewOps.push_back(std::move(RewrittenOp)); + } + + OpCount++; + }, + Oplog::Paging{}); + + CbObjectWriter ResponseObj; + + // Persist rewritten oplog entries + if (!NewOps.empty()) + { + ResponseObj.BeginArray("rewritten_ops"); + + for (CbObject& NewOp : NewOps) { - FilesArrayWriter.AddField(Field); + ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); + + ResponseObj.AddInteger(NewLsn.Number); } - else + + ResponseObj.EndArray(); + } + + // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the + // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have + // unreferenced chunks. + for (auto It : AddedChunks) + { + const IoHash& RawHash = It.first; + AddedChunk& Chunk = It.second; + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); + if (Result.New) { - OpRewritten = true; + InlinedBytes += Chunk.RawSize; + ++InlinedFiles; } } - if (OpRewritten && AllOk) - { - FilesArrayWriter.EndArray(); - CbArray FilesArray = FilesArrayWriter.Save().AsArray(); + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; + ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; - CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { - if (Field.GetName() == "files"sv) - { - NewWriter.AddArray("files"sv, FilesArray); + ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); - return true; - } + HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); + return true; + } + case HashStringDjb2("addopattachments"sv): + { + ZEN_TRACE_CPU("Store::Rpc::addopattachments"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } + } + case HashStringDjb2("appendops"sv): + { + ZEN_TRACE_CPU("Store::Rpc::appendops"); + if (!AreDiskWritesAllowed()) + { + HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + return true; + } - return false; - }); + CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); - NewOps.push_back(std::move(RewrittenOp)); + size_t OpsBufferSize = 0; + for (CbFieldView OpView : OpsArray) + { + OpsBufferSize += OpView.GetSize(); } + UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); + MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); - OpCount++; - }, - Oplog::Paging{}); - - CbObjectWriter ResponseObj; + std::vector<CbObjectView> Ops; + Ops.reserve(OpsArray.Num()); + for (CbFieldView OpView : OpsArray) + { + OpView.CopyTo(OpsBuffersMemory); + Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); + OpsBuffersMemory.MidInline(OpView.GetSize()); + } - // Persist rewritten oplog entries - if (!NewOps.empty()) - { - ResponseObj.BeginArray("rewritten_ops"); + std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops); + ZEN_ASSERT(LSNs.size() == Ops.size()); - for (CbObject& NewOp : NewOps) - { - ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + std::vector<IoHash> MissingAttachments; + for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) + { + if (LSNs[OpIndex]) + { + CbObjectView Op = Ops[OpIndex]; + Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { + const IoHash Cid = AttachmentView.AsAttachment(); + if (!m_CidStore.ContainsChunk(Cid)) + { + MissingAttachments.push_back(Cid); + } + }); + } + } - ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); + CbPackage ResponsePackage; - ResponseObj.AddInteger(NewLsn.Number); - } + { + CbObjectWriter ResponseObj; + ResponseObj.BeginArray("written_ops"); - ResponseObj.EndArray(); - } + for (ProjectStore::LogSequenceNumber NewLsn : LSNs) + { + ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); + ResponseObj.AddInteger(NewLsn.Number); + } + ResponseObj.EndArray(); - // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the - // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have - // unreferenced chunks. - for (auto It : AddedChunks) - { - const IoHash& RawHash = It.first; - AddedChunk& Chunk = It.second; - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); - if (Result.New) - { - InlinedBytes += Chunk.RawSize; - ++InlinedFiles; - } - } + if (!MissingAttachments.empty()) + { + ResponseObj.BeginArray("need"); - ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; - ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; + for (const IoHash& Cid : MissingAttachments) + { + ResponseObj.AddHash(Cid); + } + ResponseObj.EndArray(); + } + ResponsePackage.SetObject(ResponseObj.Save()); + } - ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); + std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage); - HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); - return true; + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); + return true; + } + default: + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); + return false; } - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); return true; } |