diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-22 20:21:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-22 20:21:02 +0200 |
| commit | 96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch) | |
| tree | 9d1975c4d76d7a577ecfe8e2fe9456738571528b | |
| parent | fix LogRemoteStoreStatsDetails (#53) (diff) | |
| download | zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip | |
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 77 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 59 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 139 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 4 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 163 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 91 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 123 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 17 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 50 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 58 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 21 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 3 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 21 |
15 files changed, 654 insertions, 177 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index b70485a8c..92849ab44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Improvement: Add disk buffering in http client (improves download speed for oplog import) - Improvement: Add block hash verification for blocks received at oplog import - Improvement: Offload block decoding and chunk writing form network worker pool threads (improves download speed for oplog import) +- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import ## 5.4.4 - Bugfix: Get raw size for compressed chunks correctly for `/prj/{project}/oplog/{log}/chunkinfos` diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 4d2c507fe..ccea53942 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -2873,6 +2873,10 @@ TEST_CASE("project.remote") CHECK(IsHttpSuccessCode(Response.status_code)); CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); + for (auto A : ResponsePackage.GetAttachments()) + { + CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash()); + } }; auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) { diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f64b9c5a5..8106e9db9 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -848,17 +848,20 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - std::vector<IoHash> ReferencedAttachments; - AttachmentsToStoreLocally.reserve(NumAttachments); + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ReferencedAttachments; + std::vector<IoBuffer> WriteAttachmentBuffers; + WriteAttachmentBuffers.reserve(NumAttachments); + std::vector<IoHash> WriteRawHashes; + WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments([this, &Package, &Ref, - &AttachmentsToStoreLocally, + &WriteAttachmentBuffers, + &WriteRawHashes, &ReferencedAttachments, &Count, QueryLocal, @@ -872,7 +875,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (StoreLocal) { - AttachmentsToStoreLocally.emplace_back(Attachment); + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attachment->GetHash()); } Count.Valid++; } @@ -923,18 +928,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con m_CacheStore .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - } - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - ZEN_ASSERT_SLOW(StoreLocal); - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + if (!WriteAttachmentBuffers.empty()) { - Count.New++; + std::vector<CidStore::InsertResult> InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) + { + if (Result.New) + { + Count.New++; + } + } } + + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; } BinaryWriter MemStream; @@ -1151,23 +1160,27 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<IoHash> ReferencedAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<IoHash> ReferencedAttachments; ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(NumAttachments); + WriteRawHashes.reserve(NumAttachments); CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count](CbFieldView HashView) { + [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( + CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); ReferencedAttachments.push_back(Hash); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { - AttachmentsToStoreLocally.emplace_back(Attachment); + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } @@ -1202,14 +1215,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + if (!WriteAttachmentBuffers.empty()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& InsertResult : InsertResults) { - Count.New++; + if (InsertResult.New) + { + Count.New++; + } } + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 3a7922aaf..3c281275e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1448,19 +1448,36 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) auto Attachments = OpPackage.GetAttachments(); - for (const auto& Attach : Attachments) + if (!Attachments.empty()) { - ZEN_ASSERT(Attach.IsCompressedBinary()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + std::vector<uint64_t> WriteRawSizes; - CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); - const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + WriteRawSizes.reserve(Attachments.size()); - if (InsertResult.New) + for (const auto& Attach : Attachments) { - NewAttachmentBytes += AttachmentSize; + ZEN_ASSERT(Attach.IsCompressedBinary()); + + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); + WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attach.GetHash()); + WriteRawSizes.push_back(AttachmentSize); + AttachmentBytes += AttachmentSize; + } + + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + NewAttachmentBytes += WriteRawSizes[Index]; + } } - AttachmentBytes += AttachmentSize; } ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); @@ -3354,12 +3371,25 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); return true; } + std::span<const CbAttachment> Attachments = Package.GetAttachments(); - for (const CbAttachment& Attachment : Attachments) + if (!Attachments.empty()) { - IoHash RawHash = Attachment.GetHash(); - CompressedBuffer Compressed = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + CompressedBuffer Compressed = Attachment.AsCompressedBinary(); + WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(RawHash); + } + + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); } HttpReq.WriteResponse(HttpResponseCode::OK); return true; @@ -4703,9 +4733,8 @@ TEST_CASE("project.store.block") return CompositeBuffer(SharedBuffer(Buffer)); })); } - CompressedBuffer Block = GenerateBlock(std::move(Chunks)); - IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); - CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); + CompressedBuffer Block = GenerateBlock(std::move(Chunks)); + CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); } #endif diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index e9c6964c5..e922fcf1c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -143,20 +143,8 @@ LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) } bool -IterateBlock(const IoHash& BlockHash, - IoBuffer&& CompressedBlock, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { - IoHash RawHash; - uint64_t RawSize; - IoBuffer BlockPayload = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer(); - if (RawHash != BlockHash) - { - ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash); - return false; - } - MemoryView BlockView = BlockPayload.GetView(); const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); uint32_t NumberSize; @@ -2126,20 +2114,6 @@ ParseOplogContainer(const CbObject& ContainerObject, Stopwatch Timer; - size_t NeedAttachmentCount = 0; - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); - for (CbFieldView LargeChunksField : LargeChunksArray) - { - IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (HasAttachment(AttachmentHash)) - { - continue; - } - OnNeedAttachment(AttachmentHash); - NeedAttachmentCount++; - }; - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); - size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2185,6 +2159,21 @@ ParseOplogContainer(const CbObject& ContainerObject, } } } + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); + + size_t NeedAttachmentCount = 0; + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + for (CbFieldView LargeChunksField : LargeChunksArray) + { + IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); + if (HasAttachment(AttachmentHash)) + { + continue; + } + OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; + }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) @@ -2215,8 +2204,6 @@ ParseOplogContainer(const CbObject& ContainerObject, Chunked.ChunkHashes.size()); } - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); @@ -2474,17 +2461,31 @@ LoadOplog(CidStore& ChunkStore, { return; } - for (const auto& It : Chunks) + + if (!Chunks.empty()) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), - It.first, - CidStore::InsertMode::kCopyOnly); - if (InsertResult.New) + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); + + for (const auto& It : Chunks) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } } } }); @@ -2558,23 +2559,36 @@ LoadOplog(CidStore& ChunkStore, std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; WantedChunks.reserve(Chunks.size()); WantedChunks.insert(Chunks.begin(), Chunks.end()); - bool StoreChunksOK = - IterateBlock(BlockHash, - IoBuffer(Bytes), - [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); - } - WantedChunks.erase(AttachmentRawHash); - } - }); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + IoHash RawHash; + uint64_t RawSize; + SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress(); + if (RawHash != BlockHash) + { + ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), + {}); + } + + bool StoreChunksOK = IterateBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }); + if (!StoreChunksOK) { ReportMessage(OptionalContext, @@ -2587,7 +2601,22 @@ LoadOplog(CidStore& ChunkStore, {}); return; } + ZEN_ASSERT(WantedChunks.empty()); + + if (!WriteAttachmentBuffers.empty()) + { + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } }); }); }; diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index d4ccd8c7b..d6e064bdf 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -162,8 +162,6 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); -bool IterateBlock(const IoHash& BlockHash, - IoBuffer&& CompressedBlock, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); +bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); } // namespace zen diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 644ddf7b4..8ef9e842f 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -453,6 +453,103 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons } } +void +BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) +{ + ZEN_TRACE_CPU("BlockStore::WriteChunks"); + + ZEN_ASSERT(!Datas.empty()); + ZEN_ASSERT(Alignment > 0u); + + const size_t TotalCount = Datas.size(); + uint64_t TotalSize = 0; + uint64_t LargestSize = 0; + for (const IoBuffer& Data : Datas) + { + uint64_t Size = Data.GetSize(); + ZEN_ASSERT(Size > 0); + ZEN_ASSERT(Size <= m_MaxBlockSize); + TotalSize += Size; + LargestSize = Max(LargestSize, Size); + } + + const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u); + const uint64_t BufferSize = Min(TotalSize, MinSize); + std::vector<uint8_t> Buffer(BufferSize); + + size_t Offset = 0; + while (Offset < TotalCount) + { + size_t Count = 1; + uint32_t RangeSize = gsl::narrow<uint32_t>(Datas[Offset].GetSize()); + + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment); + if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize)) + { + std::filesystem::path BlockPath; + WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); + if (WriteBlockIndex == (uint32_t)m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); + NewBlockFile->Create(m_MaxBlockSize); + + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; + m_WriteBlock = NewBlockFile; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + AlignedInsertOffset = 0; + } + while (Offset + Count < TotalCount) + { + uint32_t NextRangeSize = gsl::narrow<uint32_t>(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize()); + if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize)) + { + break; + } + Count++; + RangeSize = NextRangeSize; + } + m_CurrentInsertOffset = AlignedInsertOffset + RangeSize; + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlocks.push_back(WriteBlockIndex); + InsertLock.ReleaseNow(); + + { + MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); + for (size_t Index = 0; Index < Count; Index++) + { + MemoryView SourceBuffer = Datas[Index + Offset]; + WriteBuffer.CopyFrom(SourceBuffer); + WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); + } + WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); + } + + m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); + + uint32_t ChunkOffset = AlignedInsertOffset; + std::vector<BlockStoreLocation> Locations(Count); + for (size_t Index = 0; Index < Count; Index++) + { + uint32_t ChunkSize = gsl::narrow<uint32_t>(Datas[Offset + Index].GetSize()); + Locations[Index] = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize}; + ChunkOffset += gsl::narrow<uint32_t>(RoundUp(ChunkSize, Alignment)); + } + Callback(Locations); + + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + } + + Offset += Count; + } +} + BlockStore::ReclaimSnapshotState BlockStore::GetReclaimSnapshotState() { @@ -1543,6 +1640,72 @@ namespace blockstore::impl { } // namespace blockstore::impl +TEST_CASE("blockstore.multichunks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory, 128, 1024); + + std::vector<IoBuffer> MultiChunkData; + std::string FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size())); + + std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size())); + + std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size())); + + BlockStoreLocation Locations[5]; + size_t ChunkOffset = 0; + Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) { + for (const BlockStoreLocation& Location : InLocations) + { + Locations[ChunkOffset++] = Location; + } + CHECK(ChunkOffset <= MultiChunkData.size()); + }); + CHECK(ChunkOffset == 3); + + CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData); + CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData); + CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData); + + MultiChunkData.resize(0); + std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size())); + + std::string FifthChunkData = + "ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size())); + + Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) { + for (const BlockStoreLocation& Location : InLocations) + { + CHECK(ChunkOffset < 5); + Locations[ChunkOffset++] = Location; + } + }); + CHECK(ChunkOffset == 5); + + CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData); + CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData); + CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData); + + CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData); + CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData); +} + TEST_CASE("blockstore.chunks") { using namespace blockstore::impl; diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 3cb1b03a5..60b2fa1a1 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -315,49 +315,58 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag uint64_t RecordObjectSize = Record.GetSize(); uint64_t TransferredSize = RecordObjectSize; - AttachmentCount Count; - size_t NumAttachments = Package->GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<IoHash> ReferencedAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; + AttachmentCount Count; + size_t NumAttachments = Package->GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<IoHash> ReferencedAttachments; + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); + WriteAttachmentBuffers.reserve(NumAttachments); + WriteRawHashes.reserve(NumAttachments); const bool HasUpstream = m_UpstreamCache.IsActive(); Stopwatch Timer; - Request.RecordObject.IterateAttachments( - [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize]( - CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ToString(ZenContentType::kCbPackage), - ValueHash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(ValueHash)) + Request.RecordObject.IterateAttachments([this, + &Request, + Package, + &WriteAttachmentBuffers, + &WriteRawHashes, + &ValidAttachments, + &ReferencedAttachments, + &Count, + &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) { + WriteAttachmentBuffers.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(ValueHash); ValidAttachments.emplace_back(ValueHash); Count.Valid++; } - Count.Total++; - }); + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ToString(ZenContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); if (Count.Invalid > 0) { @@ -371,16 +380,20 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments); m_CacheStats.WriteCount++; - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + if (!WriteAttachmentBuffers.empty()) { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < InsertResults.size(); Index++) { - Count.New++; + if (InsertResults[Index].New) + { + Count.New++; + } + TransferredSize += WriteAttachmentBuffers[Index].GetSize(); } - TransferredSize += Chunk.GetCompressedSize(); } + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", Request.Namespace, diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index a1a4a0acc..2fe466028 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -49,15 +49,18 @@ public: CasImpl(GcManager& Gc); virtual ~CasImpl(); - virtual void Initialize(const CidStoreConfiguration& InConfig) override; - virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; - virtual bool ContainsChunk(const IoHash& ChunkHash) override; - virtual void FilterChunks(HashKeySet& InOutChunks) override; - virtual void Flush() override; - virtual void ScrubStorage(ScrubContext& Ctx) override; - virtual void GarbageCollect(GcContext& GcCtx) override; - virtual CidStoreSize TotalSize() const override; + virtual void Initialize(const CidStoreConfiguration& InConfig) override; + virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; + virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + InsertMode Mode = InsertMode::kMayBeMovedInPlace) override; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; + virtual bool ContainsChunk(const IoHash& ChunkHash) override; + virtual void FilterChunks(HashKeySet& InOutChunks) override; + virtual void Flush() override; + virtual void ScrubStorage(ScrubContext& Ctx) override; + virtual void GarbageCollect(GcContext& GcCtx) override; + virtual CidStoreSize TotalSize() const override; private: CasContainerStrategy m_TinyStrategy; @@ -250,6 +253,108 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode); } +static void +GetCompactCasResults(CasContainerStrategy& Strategy, + std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + std::span<size_t> Indexes, + std::vector<CasStore::InsertResult> Results) +{ + const size_t Count = Indexes.size(); + if (Count == 1) + { + const size_t Index = Indexes[0]; + Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index]); + return; + } + std::vector<IoBuffer> Chunks; + std::vector<IoHash> Hashes; + Chunks.reserve(Count); + Hashes.reserve(Count); + for (size_t Index : Indexes) + { + Chunks.push_back(Data[Index]); + Hashes.push_back(ChunkHashes[Index]); + } + + Strategy.InsertChunks(Chunks, Hashes); + + for (size_t Offset = 0; Offset < Count; Offset++) + { + size_t Index = Indexes[Offset]; + Results[Index] = Results[Offset]; + } +}; + +static void +GetFileCasResults(FileCasStrategy& Strategy, + CasStore::InsertMode Mode, + std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + std::span<size_t> Indexes, + std::vector<CasStore::InsertResult> Results) +{ + for (size_t Index : Indexes) + { + Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index], Mode); + } +}; + +std::vector<CasStore::InsertResult> +CasImpl::InsertChunks(std::span<IoBuffer> Data, std::span<IoHash> ChunkHashes, CasStore::InsertMode Mode) +{ + ZEN_TRACE_CPU("CAS::InsertChunks"); + ZEN_ASSERT(Data.size() == ChunkHashes.size()); + + if (Data.size() == 1) + { + std::vector<CasStore::InsertResult> Result(1); + Result[0] = InsertChunk(Data[0], ChunkHashes[0], Mode); + return Result; + } + + std::vector<size_t> TinyIndexes; + std::vector<size_t> SmallIndexes; + std::vector<size_t> LargeIndexes; + + for (size_t Index = 0; Index < Data.size(); Index++) + { + const uint64_t ChunkSize = Data[Index].Size(); + if (ChunkSize < m_Config.TinyValueThreshold) + { + ZEN_ASSERT(ChunkSize); + TinyIndexes.push_back(Index); + } + else if (ChunkSize < m_Config.HugeValueThreshold) + { + SmallIndexes.push_back(Index); + } + else + { + LargeIndexes.push_back(Index); + } + } + + std::vector<CasStore::InsertResult> Result(Data.size()); + + if (!TinyIndexes.empty()) + { + GetCompactCasResults(m_TinyStrategy, Data, ChunkHashes, TinyIndexes, Result); + } + + if (!SmallIndexes.empty()) + { + GetCompactCasResults(m_SmallStrategy, Data, ChunkHashes, SmallIndexes, Result); + } + + if (!LargeIndexes.empty()) + { + GetFileCasResults(m_LargeStrategy, Mode, Data, ChunkHashes, LargeIndexes, Result); + } + + return Result; +} + IoBuffer CasImpl::FindChunk(const IoHash& ChunkHash) { diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index b951d6d59..f93724905 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -40,13 +40,16 @@ public: virtual void Initialize(const CidStoreConfiguration& Config) = 0; virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; - virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; - virtual void FilterChunks(HashKeySet& InOutChunks) = 0; - virtual void Flush() = 0; - virtual void ScrubStorage(ScrubContext& Ctx) = 0; - virtual void GarbageCollect(GcContext& GcCtx) = 0; - virtual CidStoreSize TotalSize() const = 0; + virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; + virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; + virtual void Flush() = 0; + virtual void ScrubStorage(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; + virtual CidStoreSize TotalSize() const = 0; protected: CidStoreConfiguration m_Config; diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 67b7e95ac..68bccd06b 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -45,6 +45,50 @@ struct CidStore::Impl return {.New = Result.New}; } + std::vector<CidStore::InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, CidStore::InsertMode Mode) + { + if (ChunkDatas.size() == 1) + { + std::vector<CidStore::InsertResult> Result(1); + Result[0] = AddChunk(ChunkDatas[0], RawHashes[0], Mode); + return Result; + } + ZEN_ASSERT(ChunkDatas.size() == RawHashes.size()); + std::vector<IoBuffer> Chunks; + Chunks.reserve(ChunkDatas.size()); +#if ZEN_BUILD_DEBUG + size_t Offset = 0; +#endif + uint64_t TotalSize = 0; + for (const IoBuffer& ChunkData : ChunkDatas) + { + TotalSize += ChunkData.GetSize(); +#if ZEN_BUILD_DEBUG + IoHash VerifyRawHash; + uint64_t _; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHashes[Offset++] == VerifyRawHash); +#endif + Chunks.push_back(ChunkData); + Chunks.back().SetContentType(ZenContentType::kCompressedBinary); + } + + metrics::RequestStats::Scope $(m_AddChunkOps, TotalSize); + + std::vector<CasStore::InsertResult> CasResults = + m_CasStore.InsertChunks(Chunks, RawHashes, static_cast<CasStore::InsertMode>(Mode)); + ZEN_ASSERT(CasResults.size() == ChunkDatas.size()); + std::vector<CidStore::InsertResult> Result; + for (const CasStore::InsertResult& CasResult : CasResults) + { + if (CasResult.New) + { + m_WriteCount++; + } + Result.emplace_back(CidStore::InsertResult{.New = CasResult.New}); + } + return Result; + } + IoBuffer FindChunkByCid(const IoHash& DecompressedId) { metrics::RequestStats::Scope StatsScope(m_FindChunkOps, 0); @@ -145,6 +189,12 @@ CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode return m_Impl->AddChunk(ChunkData, RawHash, Mode); } +std::vector<CidStore::InsertResult> +CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode) +{ + return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode); +} + IoBuffer CidStore::FindChunkByCid(const IoHash& DecompressedId) { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 84905df15..ec2bfbdec 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -204,6 +204,64 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } +std::vector<CasStore::InsertResult> +CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes) +{ + ZEN_ASSERT(Chunks.size() == ChunkHashes.size()); + std::vector<CasStore::InsertResult> Result(Chunks.size()); + std::vector<size_t> NewChunkIndexes; + Result.reserve(Chunks.size()); + { + RwLock::SharedLockScope _(m_LocationMapLock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) + { + const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; + bool IsNew = !m_LocationMap.contains(ChunkHash); + Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew}; + if (IsNew) + { + NewChunkIndexes.push_back(ChunkIndex); + } + } + } + + if (NewChunkIndexes.empty()) + { + return Result; + } + + std::vector<IoBuffer> Datas; + for (size_t ChunkIndex : NewChunkIndexes) + { + const IoBuffer& Chunk = Chunks[ChunkIndex]; +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + Datas.emplace_back(Chunk); + } + + size_t ChunkOffset = 0; + m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + std::vector<CasDiskIndexEntry> IndexEntries; + for (const BlockStoreLocation& Location : Locations) + { + size_t ChunkIndex = NewChunkIndexes[ChunkOffset++]; + IndexEntries.emplace_back( + CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); + } + m_CasLog.Append(IndexEntries); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) + { + m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size()); + m_Locations.push_back(DiskIndexEntry.Location); + } + } + }); + return Result; +} + IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 932844da7..eb1d36b31 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -51,16 +51,17 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore CasContainerStrategy(GcManager& Gc); ~CasContainerStrategy(); - CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); - IoBuffer FindChunk(const IoHash& ChunkHash); - bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(HashKeySet& InOutChunks); - void Initialize(const std::filesystem::path& RootDirectory, - const std::string_view ContainerBaseName, - uint32_t MaxBlockSize, - uint32_t Alignment, - bool IsNewStore); - void Flush(); + CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); + std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes); + IoBuffer FindChunk(const IoHash& ChunkHash); + bool HaveChunk(const IoHash& ChunkHash); + void FilterChunks(HashKeySet& InOutChunks); + void Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint32_t Alignment, + bool IsNewStore); + void Flush(); // GcStorage diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 656040936..c28aa8102 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -161,6 +161,9 @@ public: void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback); + typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback; + void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); + IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; void Flush(bool ForceNewBlock); diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index 4c9f30608..54f562767 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -73,15 +73,18 @@ public: kMayBeMovedInPlace }; - void Initialize(const CidStoreConfiguration& Config); - InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); - virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; - bool ContainsChunk(const IoHash& DecompressedId); - void FilterChunks(HashKeySet& InOutChunks); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); - CidStoreSize TotalSize() const; - CidStoreStats Stats() const; + void Initialize(const CidStoreConfiguration& Config); + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, + std::span<IoHash> RawHashes, + InsertMode Mode = InsertMode::kMayBeMovedInPlace); + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + bool ContainsChunk(const IoHash& DecompressedId); + void FilterChunks(HashKeySet& InOutChunks); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); + CidStoreSize TotalSize() const; + CidStoreStats Stats() const; virtual void ReportMetrics(StatsMetrics& Statsd) override; |