aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenserver/projectstore/remoteprojectstore.cpp
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp139
1 files changed, 84 insertions, 55 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index e9c6964c5..e922fcf1c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -143,20 +143,8 @@ LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
}
bool
-IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer BlockPayload =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer();
- if (RawHash != BlockHash)
- {
- ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash);
- return false;
- }
-
MemoryView BlockView = BlockPayload.GetView();
const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
uint32_t NumberSize;
@@ -2126,20 +2114,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Stopwatch Timer;
- size_t NeedAttachmentCount = 0;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
- for (CbFieldView LargeChunksField : LargeChunksArray)
- {
- IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (HasAttachment(AttachmentHash))
- {
- continue;
- }
- OnNeedAttachment(AttachmentHash);
- NeedAttachmentCount++;
- };
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
-
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2185,6 +2159,21 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
}
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
+
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ NeedAttachmentCount++;
+ };
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
@@ -2215,8 +2204,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Chunked.ChunkHashes.size());
}
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
-
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
@@ -2474,17 +2461,31 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- for (const auto& It : Chunks)
+
+ if (!Chunks.empty())
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(),
- It.first,
- CidStore::InsertMode::kCopyOnly);
- if (InsertResult.New)
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
}
}
});
@@ -2558,23 +2559,36 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
WantedChunks.reserve(Chunks.size());
WantedChunks.insert(Chunks.begin(), Chunks.end());
- bool StoreChunksOK =
- IterateBlock(BlockHash,
- IoBuffer(Bytes),
- [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- WantedChunks.erase(AttachmentRawHash);
- }
- });
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ }
+
+ bool StoreChunksOK = IterateBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
+
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
@@ -2587,7 +2601,22 @@ LoadOplog(CidStore& ChunkStore,
{});
return;
}
+
ZEN_ASSERT(WantedChunks.empty());
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
});
});
};