diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-22 20:21:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-22 20:21:02 +0200 |
| commit | 96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch) | |
| tree | 9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/cache/cacherpc.cpp | |
| parent | fix LogRemoteStoreStatsDetails (#53) (diff) | |
| download | zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip | |
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 91 |
1 files changed, 52 insertions, 39 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 3cb1b03a5..60b2fa1a1 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -315,49 +315,58 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag uint64_t RecordObjectSize = Record.GetSize(); uint64_t TransferredSize = RecordObjectSize; - AttachmentCount Count; - size_t NumAttachments = Package->GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<IoHash> ReferencedAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; + AttachmentCount Count; + size_t NumAttachments = Package->GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<IoHash> ReferencedAttachments; + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); + WriteAttachmentBuffers.reserve(NumAttachments); + WriteRawHashes.reserve(NumAttachments); const bool HasUpstream = m_UpstreamCache.IsActive(); Stopwatch Timer; - Request.RecordObject.IterateAttachments( - [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize]( - CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ToString(ZenContentType::kCbPackage), - ValueHash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(ValueHash)) + Request.RecordObject.IterateAttachments([this, + &Request, + Package, + &WriteAttachmentBuffers, + &WriteRawHashes, + &ValidAttachments, + &ReferencedAttachments, + &Count, + &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) { + WriteAttachmentBuffers.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(ValueHash); ValidAttachments.emplace_back(ValueHash); Count.Valid++; } - Count.Total++; - }); + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ToString(ZenContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); if (Count.Invalid > 0) { @@ -371,16 +380,20 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + if (!WriteAttachmentBuffers.empty()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < InsertResults.size(); Index++) { - Count.New++; + if (InsertResults[Index].New) + { + Count.New++; + } + TransferredSize += WriteAttachmentBuffers[Index].GetSize(); } - TransferredSize += Chunk.GetCompressedSize(); } + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", Request.Namespace, |