aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-25 20:29:55 +0100
committerGitHub Enterprise <[email protected]>2024-11-25 20:29:55 +0100
commit8bd1d98042df6c9aaed6df707826c9c2f6904dae (patch)
tree363b20e6d2f21aaefe6fa3507ca55920b33ae96d /src
parent/chunkinfo /files response size and rawsize size consistency (#230) (diff)
downloadzen-8bd1d98042df6c9aaed6df707826c9c2f6904dae.tar.xz
zen-8bd1d98042df6c9aaed6df707826c9c2f6904dae.zip
fix oplog snapshot deadlock (#233)
* store inlined chunk as temp files and store to Cid after oplog iteration is complete and ops updated * make sure we can get to the payload when doing `prep` for new ops
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp88
1 files changed, 57 insertions, 31 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 802f112c4..68ab122ef 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2506,7 +2506,7 @@ ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHa
MissingChunks.reserve(ChunkHashes.size());
for (const IoHash& FileHash : ChunkHashes)
{
- if (!m_CidStore.ContainsChunk(FileHash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(FileHash); !Payload)
{
MissingChunks.push_back(FileHash);
}
@@ -4829,16 +4829,21 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
uint64_t TotalBytes = 0;
uint64_t TotalFiles = 0;
- std::vector<CbObject> NewOps;
- std::unordered_map<Oid, IoHash, Oid::Hasher> NewChunkMappings;
+ std::vector<CbObject> NewOps;
+ struct AddedChunk
+ {
+ IoBuffer Buffer;
+ uint64_t RawSize = 0;
+ };
+ tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
Oplog->IterateOplog(
[&](CbObjectView Op) {
bool OpRewritten = false;
bool AllOk = true;
- CbWriter Cbo;
- Cbo.BeginArray("files"sv);
+ CbWriter FilesArrayWriter;
+ FilesArrayWriter.BeginArray("files"sv);
for (CbFieldView& Field : Op["files"sv])
{
@@ -4866,26 +4871,38 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else
{
- // Read file contents into memory, compress and store in CidStore
-
- Oid ChunkId = View["id"sv].AsObjectId();
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
- const IoHash RawHash = Compressed.DecodeRawHash();
- const uint64_t RawSize = Compressed.DecodeRawSize();
- IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer();
- Oplog->CaptureAddedAttachments(std::vector<IoHash>{RawHash});
- CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash);
+ // Read file contents into memory, compress and keep in map of chunks to add to Cid store
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ if (!AddedChunks.contains(RawHash))
+ {
+ const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
+ BasicFile ChunkTempFile;
+ ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
+ ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ if (Ec)
+ {
+ Oid ChunkId = View["id"sv].AsObjectId();
+ ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
+ FilePath,
+ ChunkId,
+ Ec.message());
+ AllOk = false;
+ }
+ else
+ {
+ IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFileHandle(ChunkTempFile.Detach());
+ ChunkBuffer.SetDeleteOnClose(true);
+ AddedChunks.insert_or_assign(RawHash,
+ AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ }
+ }
TotalBytes += RawSize;
++TotalFiles;
- if (Result.New)
- {
- InlinedBytes += RawSize;
- ++InlinedFiles;
- }
-
// Rewrite file array entry with new data reference
CbObjectWriter Writer;
RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
@@ -4899,17 +4916,15 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
Writer.AddBinaryAttachment("data"sv, RawHash);
CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
+ FilesArrayWriter.AddObject(std::move(RewrittenOp));
CopyField = false;
-
- NewChunkMappings.insert_or_assign(ChunkId, RawHash);
}
}
}
if (CopyField)
{
- Cbo.AddField(Field);
+ FilesArrayWriter.AddField(Field);
}
else
{
@@ -4919,8 +4934,8 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
if (OpRewritten && AllOk)
{
- Cbo.EndArray();
- CbArray FilesArray = Cbo.Save().AsArray();
+ FilesArrayWriter.EndArray();
+ CbArray FilesArray = FilesArrayWriter.Save().AsArray();
CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
if (Field.GetName() == "files"sv)
@@ -4940,13 +4955,9 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
},
Oplog::Paging{});
- // Make sure we have references to our attachments
- Oplog->AddChunkMappings(NewChunkMappings);
-
CbObjectWriter ResponseObj;
// Persist rewritten oplog entries
-
if (!NewOps.empty())
{
ResponseObj.BeginArray("rewritten_ops");
@@ -4963,6 +4974,21 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
ResponseObj.EndArray();
}
+ // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the new
+ // chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // unreferenced chunks.
+ for (auto It : AddedChunks)
+ {
+ const IoHash& RawHash = It.first;
+ AddedChunk& Chunk = It.second;
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ if (Result.New)
+ {
+ InlinedBytes += Chunk.RawSize;
+ ++InlinedFiles;
+ }
+ }
+
ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;