aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/compactcas.cpp
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp58
1 files changed, 58 insertions, 0 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 84905df15..ec2bfbdec 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -204,6 +204,64 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}
+std::vector<CasStore::InsertResult>
+CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes)
+{
+ ZEN_ASSERT(Chunks.size() == ChunkHashes.size());
+ std::vector<CasStore::InsertResult> Result(Chunks.size());
+ std::vector<size_t> NewChunkIndexes;
+ Result.reserve(Chunks.size());
+ {
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
+ bool IsNew = !m_LocationMap.contains(ChunkHash);
+ Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew};
+ if (IsNew)
+ {
+ NewChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+
+ if (NewChunkIndexes.empty())
+ {
+ return Result;
+ }
+
+ std::vector<IoBuffer> Datas;
+ for (size_t ChunkIndex : NewChunkIndexes)
+ {
+ const IoBuffer& Chunk = Chunks[ChunkIndex];
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+ Datas.emplace_back(Chunk);
+ }
+
+ size_t ChunkOffset = 0;
+ m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<CasDiskIndexEntry> IndexEntries;
+ for (const BlockStoreLocation& Location : Locations)
+ {
+ size_t ChunkIndex = NewChunkIndexes[ChunkOffset++];
+ IndexEntries.emplace_back(
+ CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
+ }
+ m_CasLog.Append(IndexEntries);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
+ {
+ m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size());
+ m_Locations.push_back(DiskIndexEntry.Location);
+ }
+ }
+ });
+ return Result;
+}
+
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{