diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-22 20:21:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-22 20:21:02 +0200 |
| commit | 96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch) | |
| tree | 9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenserver | |
| parent | fix LogRemoteStoreStatsDetails (#53) (diff) | |
| download | zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip | |
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 77 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 59 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 139 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 4 |
4 files changed, 176 insertions, 103 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f64b9c5a5..8106e9db9 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -848,17 +848,20 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - std::vector<IoHash> ReferencedAttachments; - AttachmentsToStoreLocally.reserve(NumAttachments); + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ReferencedAttachments; + std::vector<IoBuffer> WriteAttachmentBuffers; + WriteAttachmentBuffers.reserve(NumAttachments); + std::vector<IoHash> WriteRawHashes; + WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments([this, &Package, &Ref, - &AttachmentsToStoreLocally, + &WriteAttachmentBuffers, + &WriteRawHashes, &ReferencedAttachments, &Count, QueryLocal, @@ -872,7 +875,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (StoreLocal) { - AttachmentsToStoreLocally.emplace_back(Attachment); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attachment->GetHash()); } Count.Valid++; } @@ -923,18 +928,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con m_CacheStore .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - } - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - ZEN_ASSERT_SLOW(StoreLocal); - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + if (!WriteAttachmentBuffers.empty()) { - Count.New++; + std::vector<CidStore::InsertResult> InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) + { + if (Result.New) + { + Count.New++; + } + } } + + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; } BinaryWriter MemStream; @@ -1151,23 +1160,27 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<IoHash> ReferencedAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<IoHash> ReferencedAttachments; ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(NumAttachments); + WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count](CbFieldView HashView) { + [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( + CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); ReferencedAttachments.push_back(Hash); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { - AttachmentsToStoreLocally.emplace_back(Attachment); + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } @@ -1202,14 +1215,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + if (!WriteAttachmentBuffers.empty()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& InsertResult : InsertResults) { - Count.New++; + if (InsertResult.New) + { + Count.New++; + } } + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 3a7922aaf..3c281275e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1448,19 +1448,36 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) auto Attachments = OpPackage.GetAttachments(); - for (const auto& Attach : Attachments) + if (!Attachments.empty()) { - ZEN_ASSERT(Attach.IsCompressedBinary()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + std::vector<uint64_t> WriteRawSizes; - CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); - const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + WriteRawSizes.reserve(Attachments.size()); - if (InsertResult.New) + for (const auto& Attach : Attachments) { - NewAttachmentBytes += AttachmentSize; + ZEN_ASSERT(Attach.IsCompressedBinary()); + + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); + WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attach.GetHash()); + WriteRawSizes.push_back(AttachmentSize); + AttachmentBytes += AttachmentSize; + } + + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + NewAttachmentBytes += WriteRawSizes[Index]; + } } - AttachmentBytes += AttachmentSize; } ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); @@ -3354,12 +3371,25 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); return true; } + std::span<const CbAttachment> Attachments = Package.GetAttachments(); - for (const CbAttachment& Attachment : Attachments) + if (!Attachments.empty()) { - IoHash RawHash = Attachment.GetHash(); - CompressedBuffer Compressed = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + CompressedBuffer Compressed = Attachment.AsCompressedBinary(); + WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(RawHash); + } + + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); } HttpReq.WriteResponse(HttpResponseCode::OK); return true; @@ -4703,9 +4733,8 @@ TEST_CASE("project.store.block") return CompositeBuffer(SharedBuffer(Buffer)); })); } - CompressedBuffer Block = GenerateBlock(std::move(Chunks)); - IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); - CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); + CompressedBuffer Block = GenerateBlock(std::move(Chunks)); + CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); } #endif diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index e9c6964c5..e922fcf1c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -143,20 +143,8 @@ LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) } bool -IterateBlock(const IoHash& BlockHash, - IoBuffer&& CompressedBlock, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { - IoHash RawHash; - uint64_t RawSize; - IoBuffer BlockPayload = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer(); - if (RawHash != BlockHash) - { - ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash); - return false; - } - MemoryView BlockView = BlockPayload.GetView(); const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); uint32_t NumberSize; @@ -2126,20 +2114,6 @@ ParseOplogContainer(const CbObject& ContainerObject, Stopwatch Timer; - size_t NeedAttachmentCount = 0; - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); - for (CbFieldView LargeChunksField : LargeChunksArray) - { - IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (HasAttachment(AttachmentHash)) - { - continue; - } - OnNeedAttachment(AttachmentHash); - NeedAttachmentCount++; - }; - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); - size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2185,6 +2159,21 @@ ParseOplogContainer(const CbObject& ContainerObject, } } } + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); + + size_t NeedAttachmentCount = 0; + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + for (CbFieldView LargeChunksField : LargeChunksArray) + { + IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); + if (HasAttachment(AttachmentHash)) + { + continue; + } + OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; + }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) @@ -2215,8 +2204,6 @@ ParseOplogContainer(const CbObject& ContainerObject, Chunked.ChunkHashes.size()); } - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); @@ -2474,17 +2461,31 @@ LoadOplog(CidStore& ChunkStore, { return; } - for (const auto& It : Chunks) + + if (!Chunks.empty()) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), - It.first, - CidStore::InsertMode::kCopyOnly); - if (InsertResult.New) + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); + + for (const auto& It : Chunks) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } } } }); @@ -2558,23 +2559,36 @@ LoadOplog(CidStore& ChunkStore, std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; WantedChunks.reserve(Chunks.size()); WantedChunks.insert(Chunks.begin(), Chunks.end()); - bool StoreChunksOK = - IterateBlock(BlockHash, - IoBuffer(Bytes), - [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); - } - WantedChunks.erase(AttachmentRawHash); - } - }); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + IoHash RawHash; + uint64_t RawSize; + SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress(); + if (RawHash != BlockHash) + { + ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), + {}); + } + + bool StoreChunksOK = IterateBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }); + if (!StoreChunksOK) { ReportMessage(OptionalContext, @@ -2587,7 +2601,22 @@ LoadOplog(CidStore& ChunkStore, {}); return; } + ZEN_ASSERT(WantedChunks.empty()); + + if (!WriteAttachmentBuffers.empty()) + { + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } }); }); }; diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index d4ccd8c7b..d6e064bdf 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -162,8 +162,6 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); -bool IterateBlock(const IoHash& BlockHash, - IoBuffer&& CompressedBlock, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); +bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); } // namespace zen |