aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenserver/projectstore
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenserver/projectstore')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp59
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp139
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h4
3 files changed, 129 insertions, 73 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 3a7922aaf..3c281275e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1448,19 +1448,36 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
auto Attachments = OpPackage.GetAttachments();
- for (const auto& Attach : Attachments)
+ if (!Attachments.empty())
{
- ZEN_ASSERT(Attach.IsCompressedBinary());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ std::vector<uint64_t> WriteRawSizes;
- CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
- const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash());
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+ WriteRawSizes.reserve(Attachments.size());
- if (InsertResult.New)
+ for (const auto& Attach : Attachments)
{
- NewAttachmentBytes += AttachmentSize;
+ ZEN_ASSERT(Attach.IsCompressedBinary());
+
+ CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
+ const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
+ WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attach.GetHash());
+ WriteRawSizes.push_back(AttachmentSize);
+ AttachmentBytes += AttachmentSize;
+ }
+
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ NewAttachmentBytes += WriteRawSizes[Index];
+ }
}
- AttachmentBytes += AttachmentSize;
}
ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
@@ -3354,12 +3371,25 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
return true;
}
+
std::span<const CbAttachment> Attachments = Package.GetAttachments();
- for (const CbAttachment& Attachment : Attachments)
+ if (!Attachments.empty())
{
- IoHash RawHash = Attachment.GetHash();
- CompressedBuffer Compressed = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ CompressedBuffer Compressed = Attachment.AsCompressedBinary();
+ WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
}
HttpReq.WriteResponse(HttpResponseCode::OK);
return true;
@@ -4703,9 +4733,8 @@ TEST_CASE("project.store.block")
return CompositeBuffer(SharedBuffer(Buffer));
}));
}
- CompressedBuffer Block = GenerateBlock(std::move(Chunks));
- IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
- CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+ CompressedBuffer Block = GenerateBlock(std::move(Chunks));
+ CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
}
#endif
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index e9c6964c5..e922fcf1c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -143,20 +143,8 @@ LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
}
bool
-IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer BlockPayload =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer();
- if (RawHash != BlockHash)
- {
- ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash);
- return false;
- }
-
MemoryView BlockView = BlockPayload.GetView();
const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
uint32_t NumberSize;
@@ -2126,20 +2114,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Stopwatch Timer;
- size_t NeedAttachmentCount = 0;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
- for (CbFieldView LargeChunksField : LargeChunksArray)
- {
- IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (HasAttachment(AttachmentHash))
- {
- continue;
- }
- OnNeedAttachment(AttachmentHash);
- NeedAttachmentCount++;
- };
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
-
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2185,6 +2159,21 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
}
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
+
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ NeedAttachmentCount++;
+ };
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
@@ -2215,8 +2204,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Chunked.ChunkHashes.size());
}
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
-
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
@@ -2474,17 +2461,31 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- for (const auto& It : Chunks)
+
+ if (!Chunks.empty())
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(),
- It.first,
- CidStore::InsertMode::kCopyOnly);
- if (InsertResult.New)
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
}
}
});
@@ -2558,23 +2559,36 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
WantedChunks.reserve(Chunks.size());
WantedChunks.insert(Chunks.begin(), Chunks.end());
- bool StoreChunksOK =
- IterateBlock(BlockHash,
- IoBuffer(Bytes),
- [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- WantedChunks.erase(AttachmentRawHash);
- }
- });
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ }
+
+ bool StoreChunksOK = IterateBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
+
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
@@ -2587,7 +2601,22 @@ LoadOplog(CidStore& ChunkStore,
{});
return;
}
+
ZEN_ASSERT(WantedChunks.empty());
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
});
});
};
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index d4ccd8c7b..d6e064bdf 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -162,8 +162,6 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
-bool IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
} // namespace zen