From 96f44f2f2d8cbcda254d0b193f5a1aece645daeb Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 22 Apr 2024 20:21:02 +0200 Subject: InsertChunks for CAS store (#55) - Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import --- src/zenstore/compactcas.cpp | 58 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) (limited to 'src/zenstore/compactcas.cpp') diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 84905df15..ec2bfbdec 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -204,6 +204,64 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } +std::vector +CasContainerStrategy::InsertChunks(std::span Chunks, std::span ChunkHashes) +{ + ZEN_ASSERT(Chunks.size() == ChunkHashes.size()); + std::vector Result(Chunks.size()); + std::vector NewChunkIndexes; + Result.reserve(Chunks.size()); + { + RwLock::SharedLockScope _(m_LocationMapLock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) + { + const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; + bool IsNew = !m_LocationMap.contains(ChunkHash); + Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew}; + if (IsNew) + { + NewChunkIndexes.push_back(ChunkIndex); + } + } + } + + if (NewChunkIndexes.empty()) + { + return Result; + } + + std::vector Datas; + for (size_t ChunkIndex : NewChunkIndexes) + { + const IoBuffer& Chunk = Chunks[ChunkIndex]; +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + Datas.emplace_back(Chunk); + } + + size_t ChunkOffset = 0; + m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span Locations) { + std::vector IndexEntries; + for (const BlockStoreLocation& Location : Locations) + { + size_t ChunkIndex = NewChunkIndexes[ChunkOffset++]; + IndexEntries.emplace_back( + CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); + } + m_CasLog.Append(IndexEntries); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) + { + m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size()); + m_Locations.push_back(DiskIndexEntry.Location); + } + } + }); + return Result; +} + IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { -- cgit v1.2.3