aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenserver
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp77
-rw-r--r--src/zenserver/projectstore/projectstore.cpp59
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp139
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h4
4 files changed, 176 insertions, 103 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f64b9c5a5..8106e9db9 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -848,17 +848,20 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
CbPackage Package;
if (Package.TryLoad(ClientResultValue.Value))
{
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- std::vector<IoHash> ReferencedAttachments;
- AttachmentsToStoreLocally.reserve(NumAttachments);
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ std::vector<IoHash> WriteRawHashes;
+ WriteRawHashes.reserve(NumAttachments);
CacheRecord.IterateAttachments([this,
&Package,
&Ref,
- &AttachmentsToStoreLocally,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
&ReferencedAttachments,
&Count,
QueryLocal,
@@ -872,7 +875,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
{
if (StoreLocal)
{
- AttachmentsToStoreLocally.emplace_back(Attachment);
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attachment->GetHash());
}
Count.Valid++;
}
@@ -923,18 +928,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
m_CacheStore
.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
m_CacheStats.WriteCount++;
- }
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- ZEN_ASSERT_SLOW(StoreLocal);
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
+ if (!WriteAttachmentBuffers.empty())
{
- Count.New++;
+ std::vector<CidStore::InsertResult> InsertResults =
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
+ {
+ if (Result.New)
+ {
+ Count.New++;
+ }
+ }
}
+
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
}
BinaryWriter MemStream;
@@ -1151,23 +1160,27 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ WriteRawHashes.reserve(NumAttachments);
CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count](CbFieldView HashView) {
+ [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count](
+ CbFieldView HashView) {
const IoHash Hash = HashView.AsHash();
ReferencedAttachments.push_back(Hash);
if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (Attachment->IsCompressedBinary())
{
- AttachmentsToStoreLocally.emplace_back(Attachment);
+ WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Hash);
ValidAttachments.emplace_back(Hash);
Count.Valid++;
}
@@ -1202,14 +1215,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
m_CacheStats.WriteCount++;
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ if (!WriteAttachmentBuffers.empty())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& InsertResult : InsertResults)
{
- Count.New++;
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
}
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
}
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 3a7922aaf..3c281275e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1448,19 +1448,36 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
auto Attachments = OpPackage.GetAttachments();
- for (const auto& Attach : Attachments)
+ if (!Attachments.empty())
{
- ZEN_ASSERT(Attach.IsCompressedBinary());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ std::vector<uint64_t> WriteRawSizes;
- CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
- const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash());
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+ WriteRawSizes.reserve(Attachments.size());
- if (InsertResult.New)
+ for (const auto& Attach : Attachments)
{
- NewAttachmentBytes += AttachmentSize;
+ ZEN_ASSERT(Attach.IsCompressedBinary());
+
+ CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
+ const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
+ WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attach.GetHash());
+ WriteRawSizes.push_back(AttachmentSize);
+ AttachmentBytes += AttachmentSize;
+ }
+
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ NewAttachmentBytes += WriteRawSizes[Index];
+ }
}
- AttachmentBytes += AttachmentSize;
}
ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
@@ -3354,12 +3371,25 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
return true;
}
+
std::span<const CbAttachment> Attachments = Package.GetAttachments();
- for (const CbAttachment& Attachment : Attachments)
+ if (!Attachments.empty())
{
- IoHash RawHash = Attachment.GetHash();
- CompressedBuffer Compressed = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ CompressedBuffer Compressed = Attachment.AsCompressedBinary();
+ WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
}
HttpReq.WriteResponse(HttpResponseCode::OK);
return true;
@@ -4703,9 +4733,8 @@ TEST_CASE("project.store.block")
return CompositeBuffer(SharedBuffer(Buffer));
}));
}
- CompressedBuffer Block = GenerateBlock(std::move(Chunks));
- IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
- CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+ CompressedBuffer Block = GenerateBlock(std::move(Chunks));
+ CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
}
#endif
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index e9c6964c5..e922fcf1c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -143,20 +143,8 @@ LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
}
bool
-IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer BlockPayload =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer();
- if (RawHash != BlockHash)
- {
- ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash);
- return false;
- }
-
MemoryView BlockView = BlockPayload.GetView();
const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
uint32_t NumberSize;
@@ -2126,20 +2114,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Stopwatch Timer;
- size_t NeedAttachmentCount = 0;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
- for (CbFieldView LargeChunksField : LargeChunksArray)
- {
- IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (HasAttachment(AttachmentHash))
- {
- continue;
- }
- OnNeedAttachment(AttachmentHash);
- NeedAttachmentCount++;
- };
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
-
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2185,6 +2159,21 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
}
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
+
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ NeedAttachmentCount++;
+ };
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
@@ -2215,8 +2204,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Chunked.ChunkHashes.size());
}
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
-
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
@@ -2474,17 +2461,31 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- for (const auto& It : Chunks)
+
+ if (!Chunks.empty())
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(),
- It.first,
- CidStore::InsertMode::kCopyOnly);
- if (InsertResult.New)
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
}
}
});
@@ -2558,23 +2559,36 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
WantedChunks.reserve(Chunks.size());
WantedChunks.insert(Chunks.begin(), Chunks.end());
- bool StoreChunksOK =
- IterateBlock(BlockHash,
- IoBuffer(Bytes),
- [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- WantedChunks.erase(AttachmentRawHash);
- }
- });
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ }
+
+ bool StoreChunksOK = IterateBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
+
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
@@ -2587,7 +2601,22 @@ LoadOplog(CidStore& ChunkStore,
{});
return;
}
+
ZEN_ASSERT(WantedChunks.empty());
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
});
});
};
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index d4ccd8c7b..d6e064bdf 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -162,8 +162,6 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
-bool IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
} // namespace zen