diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-25 20:29:55 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-25 20:29:55 +0100 |
| commit | 8bd1d98042df6c9aaed6df707826c9c2f6904dae (patch) | |
| tree | 363b20e6d2f21aaefe6fa3507ca55920b33ae96d /src | |
| parent | /chunkinfo /files response size and rawsize size consistency (#230) (diff) | |
| download | zen-8bd1d98042df6c9aaed6df707826c9c2f6904dae.tar.xz zen-8bd1d98042df6c9aaed6df707826c9c2f6904dae.zip | |
fix oplog snapshot deadlock (#233)
* store inlined chunk as temp files and store to Cid after oplog iteration is complete and ops updated
* make sure we can get to the payload when doing `prep` for new ops
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 88 |
1 files changed, 57 insertions, 31 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 802f112c4..68ab122ef 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2506,7 +2506,7 @@ ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHa MissingChunks.reserve(ChunkHashes.size()); for (const IoHash& FileHash : ChunkHashes) { - if (!m_CidStore.ContainsChunk(FileHash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(FileHash); !Payload) { MissingChunks.push_back(FileHash); } @@ -4829,16 +4829,21 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, uint64_t TotalBytes = 0; uint64_t TotalFiles = 0; - std::vector<CbObject> NewOps; - std::unordered_map<Oid, IoHash, Oid::Hasher> NewChunkMappings; + std::vector<CbObject> NewOps; + struct AddedChunk + { + IoBuffer Buffer; + uint64_t RawSize = 0; + }; + tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; Oplog->IterateOplog( [&](CbObjectView Op) { bool OpRewritten = false; bool AllOk = true; - CbWriter Cbo; - Cbo.BeginArray("files"sv); + CbWriter FilesArrayWriter; + FilesArrayWriter.BeginArray("files"sv); for (CbFieldView& Field : Op["files"sv]) { @@ -4866,26 +4871,38 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } else { - // Read file contents into memory, compress and store in CidStore - - Oid ChunkId = View["id"sv].AsObjectId(); - IoBuffer FileIoBuffer = DataFile.ReadAll(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); - const IoHash RawHash = Compressed.DecodeRawHash(); - const uint64_t RawSize = Compressed.DecodeRawSize(); - IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer(); - Oplog->CaptureAddedAttachments(std::vector<IoHash>{RawHash}); - CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash); + // Read file contents into memory, compress and keep in map of chunks to add to Cid store + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); + const uint64_t RawSize = Compressed.DecodeRawSize(); + const IoHash RawHash = Compressed.DecodeRawHash(); + if (!AddedChunks.contains(RawHash)) + { + const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); + BasicFile ChunkTempFile; + ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); + ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + if (Ec) + { + Oid ChunkId = View["id"sv].AsObjectId(); + ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", + FilePath, + ChunkId, + Ec.message()); + AllOk = false; + } + else + { + IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFileHandle(ChunkTempFile.Detach()); + ChunkBuffer.SetDeleteOnClose(true); + AddedChunks.insert_or_assign(RawHash, + AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); + } + } TotalBytes += RawSize; ++TotalFiles; - if (Result.New) - { - InlinedBytes += RawSize; - ++InlinedFiles; - } - // Rewrite file array entry with new data reference CbObjectWriter Writer; RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { @@ -4899,17 +4916,15 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, Writer.AddBinaryAttachment("data"sv, RawHash); CbObject RewrittenOp = Writer.Save(); - Cbo.AddObject(std::move(RewrittenOp)); + FilesArrayWriter.AddObject(std::move(RewrittenOp)); CopyField = false; - - NewChunkMappings.insert_or_assign(ChunkId, RawHash); } } } if (CopyField) { - Cbo.AddField(Field); + FilesArrayWriter.AddField(Field); } else { @@ -4919,8 +4934,8 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, if (OpRewritten && AllOk) { - Cbo.EndArray(); - CbArray FilesArray = Cbo.Save().AsArray(); + FilesArrayWriter.EndArray(); + CbArray FilesArray = FilesArrayWriter.Save().AsArray(); CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { if (Field.GetName() == "files"sv) @@ -4940,13 +4955,9 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, }, Oplog::Paging{}); - // Make sure we have references to our attachments - Oplog->AddChunkMappings(NewChunkMappings); - CbObjectWriter ResponseObj; // Persist rewritten oplog entries - if (!NewOps.empty()) { ResponseObj.BeginArray("rewritten_ops"); @@ -4963,6 +4974,21 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, ResponseObj.EndArray(); } + // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the new + // chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have + // unreferenced chunks. + for (auto It : AddedChunks) + { + const IoHash& RawHash = It.first; + AddedChunk& Chunk = It.second; + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); + if (Result.New) + { + InlinedBytes += Chunk.RawSize; + ++InlinedFiles; + } + } + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; |