diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-22 20:21:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-22 20:21:02 +0200 |
| commit | 96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch) | |
| tree | 9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/compactcas.cpp | |
| parent | fix LogRemoteStoreStatsDetails (#53) (diff) | |
| download | zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip | |
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 84905df15..ec2bfbdec 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -204,6 +204,64 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } +std::vector<CasStore::InsertResult> +CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes) +{ + ZEN_ASSERT(Chunks.size() == ChunkHashes.size()); + std::vector<CasStore::InsertResult> Result(Chunks.size()); + std::vector<size_t> NewChunkIndexes; + Result.reserve(Chunks.size()); + { + RwLock::SharedLockScope _(m_LocationMapLock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) + { + const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; + bool IsNew = !m_LocationMap.contains(ChunkHash); + Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew}; + if (IsNew) + { + NewChunkIndexes.push_back(ChunkIndex); + } + } + } + + if (NewChunkIndexes.empty()) + { + return Result; + } + + std::vector<IoBuffer> Datas; + for (size_t ChunkIndex : NewChunkIndexes) + { + const IoBuffer& Chunk = Chunks[ChunkIndex]; +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + Datas.emplace_back(Chunk); + } + + size_t ChunkOffset = 0; + m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + std::vector<CasDiskIndexEntry> IndexEntries; + for (const BlockStoreLocation& Location : Locations) + { + size_t ChunkIndex = NewChunkIndexes[ChunkOffset++]; + IndexEntries.emplace_back( + CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); + } + m_CasLog.Append(IndexEntries); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) + { + m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size()); + m_Locations.push_back(DiskIndexEntry.Location); + } + } + }); + return Result; +} + IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { |