aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp602
1 files changed, 375 insertions, 227 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 09cbe1aa3..262b35ea2 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -991,11 +991,24 @@ struct ProjectStore::OplogStorage : public RefCounted
}
else
{
- Handler(CbObjectView(OpBuffer.GetData()), LogEntry);
- MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number);
- const uint64_t EntryNextOpFileOffset =
- RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
- NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
+ CbObjectView OpView(OpBuffer.GetData());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(OpView, LogEntry);
+ MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number);
+ const uint64_t EntryNextOpFileOffset =
+ RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
+ NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
+ }
}
}
}
@@ -1031,12 +1044,38 @@ struct ProjectStore::OplogStorage : public RefCounted
MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
if (OpBufferView.GetSize() == Entry.Address.Size)
{
- Handler(Entry.Lsn, CbObjectView(OpBufferView.GetData()));
+ CbObjectView OpView(OpBufferView.GetData());
+ if (OpView.GetSize() != OpBufferView.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBufferView.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
continue;
}
IoBuffer OpBuffer(Entry.Address.Size);
OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- Handler(Entry.Lsn, CbObjectView(OpBuffer.Data()));
+ CbObjectView OpView(OpBuffer.Data());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
}
@@ -1272,7 +1311,10 @@ ProjectStore::Oplog::Flush()
{
RwLock::SharedLockScope Lock(m_OplogLock);
- ZEN_ASSERT(m_Storage);
+ if (!m_Storage)
+ {
+ return;
+ }
m_Storage->Flush();
if (!m_MetaValid)
@@ -5871,284 +5913,390 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Project->TouchOplog(OplogId);
- if (Method == "import"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
- std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- else if (Method == "export"sv)
- {
- std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- else if (Method == "getchunks"sv)
- {
- ZEN_TRACE_CPU("Store::Rpc::getchunks");
-
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
- int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
+ uint32_t MethodHash = HashStringDjb2(Method);
- CbPackage ResponsePackage;
- std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
- if (Result.first == HttpResponseCode::OK)
- {
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ switch (MethodHash)
+ {
+ case HashStringDjb2("import"sv):
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ if (!AreDiskWritesAllowed())
{
- Flags |= FormatFlags::kDenyPartialLocalReferences;
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
+ std::pair<HttpResponseCode, std::string> Result =
+ Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ HttpReq.WriteResponse(Result.first);
+ return Result.first != HttpResponseCode::BadRequest;
}
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ return true;
}
+ case HashStringDjb2("export"sv):
+ {
+ std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ return true;
+ }
+ case HashStringDjb2("getchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::getchunks");
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- return true;
- }
- else if (Method == "putchunks"sv)
- {
- ZEN_TRACE_CPU("Store::Rpc::putchunks");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- if (!Attachments.empty())
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
- WriteAttachmentBuffers.reserve(Attachments.size());
- WriteRawHashes.reserve(Attachments.size());
+ CbPackage ResponsePackage;
+ std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ }
- for (const CbAttachment& Attachment : Attachments)
- {
- IoHash RawHash = Attachment.GetHash();
- const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
- WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(RawHash);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ return true;
}
+ case HashStringDjb2("putchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::putchunks");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
- Oplog->CaptureAddedAttachments(WriteRawHashes);
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
- }
- HttpReq.WriteResponse(HttpResponseCode::OK);
- return true;
- }
- else if (Method == "snapshot"sv)
- {
- ZEN_TRACE_CPU("Store::Rpc::snapshot");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
+ CbObject Object = Package.GetObject();
+ const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
- // Snapshot all referenced files. This brings the content of all
- // files into the CID store
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ if (!Attachments.empty())
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
- uint32_t OpCount = 0;
- uint64_t InlinedBytes = 0;
- uint64_t InlinedFiles = 0;
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
- std::vector<CbObject> NewOps;
- struct AddedChunk
- {
- IoBuffer Buffer;
- uint64_t RawSize = 0;
- };
- tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
+ IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ if (UsingTempFiles)
+ {
+ AttachmentPayload.SetDeleteOnClose(true);
+ }
+ WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ Oplog->CaptureAddedAttachments(WriteRawHashes);
+ m_CidStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
+ }
+ HttpReq.WriteResponse(HttpResponseCode::OK);
+ return true;
+ }
+ case HashStringDjb2("snapshot"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::snapshot");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
- Oplog->IterateOplog(
- [&](CbObjectView Op) {
- bool OpRewritten = false;
- bool AllOk = true;
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
- CbWriter FilesArrayWriter;
- FilesArrayWriter.BeginArray("files"sv);
+ uint32_t OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
- for (CbFieldView& Field : Op["files"sv])
+ std::vector<CbObject> NewOps;
+ struct AddedChunk
{
- bool CopyField = true;
+ IoBuffer Buffer;
+ uint64_t RawSize = 0;
+ };
+ tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
- if (CbObjectView View = Field.AsObjectView())
- {
- const IoHash DataHash = View["data"sv].AsHash();
+ Oplog->IterateOplog(
+ [&](CbObjectView Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter FilesArrayWriter;
+ FilesArrayWriter.BeginArray("files"sv);
- if (DataHash == IoHash::Zero)
+ for (CbFieldView& Field : Op["files"sv])
{
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project->RootDir / ServerPath;
- BasicFile DataFile;
- std::error_code Ec;
- DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+ bool CopyField = true;
- if (Ec)
+ if (CbObjectView View = Field.AsObjectView())
{
- // Error...
+ const IoHash DataHash = View["data"sv].AsHash();
- ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
-
- AllOk = false;
- }
- else
- {
- // Read file contents into memory, compress and keep in map of chunks to add to Cid store
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
- const uint64_t RawSize = Compressed.DecodeRawSize();
- const IoHash RawHash = Compressed.DecodeRawHash();
- if (!AddedChunks.contains(RawHash))
+ if (DataHash == IoHash::Zero)
{
- const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
- BasicFile ChunkTempFile;
- ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
- ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
if (Ec)
{
- Oid ChunkId = View["id"sv].AsObjectId();
- ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
- FilePath,
- ChunkId,
- Ec.message());
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
AllOk = false;
}
else
{
- void* FileHandle = ChunkTempFile.Detach();
- IoBuffer ChunkBuffer(IoBuffer::File,
- FileHandle,
- 0,
- Compressed.GetCompressed().GetSize(),
- /*IsWholeFile*/ true);
- ChunkBuffer.SetDeleteOnClose(true);
- AddedChunks.insert_or_assign(RawHash,
- AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
- }
- }
+ // Read file contents into memory, compress and keep in map of chunks to add to Cid store
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ if (!AddedChunks.contains(RawHash))
+ {
+ const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
+ BasicFile ChunkTempFile;
+ ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
+ ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ if (Ec)
+ {
+ Oid ChunkId = View["id"sv].AsObjectId();
+ ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
+ FilePath,
+ ChunkId,
+ Ec.message());
+ AllOk = false;
+ }
+ else
+ {
+ void* FileHandle = ChunkTempFile.Detach();
+ IoBuffer ChunkBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ Compressed.GetCompressed().GetSize(),
+ /*IsWholeFile*/ true);
+ ChunkBuffer.SetDeleteOnClose(true);
+ AddedChunks.insert_or_assign(
+ RawHash,
+ AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ }
+ }
- TotalBytes += RawSize;
- ++TotalFiles;
+ TotalBytes += RawSize;
+ ++TotalFiles;
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer(View.GetSize());
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer(View.GetSize());
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ CbObject RewrittenOp = Writer.Save();
+ FilesArrayWriter.AddObject(std::move(RewrittenOp));
+ CopyField = false;
}
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, RawHash);
+ }
+ }
- CbObject RewrittenOp = Writer.Save();
- FilesArrayWriter.AddObject(std::move(RewrittenOp));
- CopyField = false;
+ if (CopyField)
+ {
+ FilesArrayWriter.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
}
}
- }
- if (CopyField)
+ if (OpRewritten && AllOk)
+ {
+ FilesArrayWriter.EndArray();
+ CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ },
+ Oplog::Paging{});
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
{
- FilesArrayWriter.AddField(Field);
+ ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+
+ ResponseObj.AddInteger(NewLsn.Number);
}
- else
+
+ ResponseObj.EndArray();
+ }
+
+ // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
+ // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // unreferenced chunks.
+ for (auto It : AddedChunks)
+ {
+ const IoHash& RawHash = It.first;
+ AddedChunk& Chunk = It.second;
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ if (Result.New)
{
- OpRewritten = true;
+ InlinedBytes += Chunk.RawSize;
+ ++InlinedFiles;
}
}
- if (OpRewritten && AllOk)
- {
- FilesArrayWriter.EndArray();
- CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
- CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
- if (Field.GetName() == "files"sv)
- {
- NewWriter.AddArray("files"sv, FilesArray);
+ ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
- return true;
- }
+ HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ return true;
+ }
+ case HashStringDjb2("addopattachments"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::addopattachments");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
+ }
+ case HashStringDjb2("appendops"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::appendops");
+ if (!AreDiskWritesAllowed())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ return true;
+ }
- return false;
- });
+ CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
- NewOps.push_back(std::move(RewrittenOp));
+ size_t OpsBufferSize = 0;
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpsBufferSize += OpView.GetSize();
}
+ UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
+ MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
- OpCount++;
- },
- Oplog::Paging{});
-
- CbObjectWriter ResponseObj;
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpsArray.Num());
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpView.CopyTo(OpsBuffersMemory);
+ Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
+ OpsBuffersMemory.MidInline(OpView.GetSize());
+ }
- // Persist rewritten oplog entries
- if (!NewOps.empty())
- {
- ResponseObj.BeginArray("rewritten_ops");
+ std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
+ ZEN_ASSERT(LSNs.size() == Ops.size());
- for (CbObject& NewOp : NewOps)
- {
- ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+ std::vector<IoHash> MissingAttachments;
+ for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
+ {
+ if (LSNs[OpIndex])
+ {
+ CbObjectView Op = Ops[OpIndex];
+ Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
+ const IoHash Cid = AttachmentView.AsAttachment();
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ MissingAttachments.push_back(Cid);
+ }
+ });
+ }
+ }
- ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+ CbPackage ResponsePackage;
- ResponseObj.AddInteger(NewLsn.Number);
- }
+ {
+ CbObjectWriter ResponseObj;
+ ResponseObj.BeginArray("written_ops");
- ResponseObj.EndArray();
- }
+ for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
+ {
+ ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+ ResponseObj.EndArray();
- // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
- // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
- // unreferenced chunks.
- for (auto It : AddedChunks)
- {
- const IoHash& RawHash = It.first;
- AddedChunk& Chunk = It.second;
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
- if (Result.New)
- {
- InlinedBytes += Chunk.RawSize;
- ++InlinedFiles;
- }
- }
+ if (!MissingAttachments.empty())
+ {
+ ResponseObj.BeginArray("need");
- ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
- ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+ for (const IoHash& Cid : MissingAttachments)
+ {
+ ResponseObj.AddHash(Cid);
+ }
+ ResponseObj.EndArray();
+ }
+ ResponsePackage.SetObject(ResponseObj.Save());
+ }
- ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
+ std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
- HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
- return true;
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
+ return true;
+ }
+ default:
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
+ return false;
}
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
return true;
}