aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/cas.cpp
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/cas.cpp')
-rw-r--r--src/zenstore/cas.cpp123
1 files changed, 114 insertions, 9 deletions
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index a1a4a0acc..2fe466028 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -49,15 +49,18 @@ public:
CasImpl(GcManager& Gc);
virtual ~CasImpl();
- virtual void Initialize(const CidStoreConfiguration& InConfig) override;
- virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
- virtual bool ContainsChunk(const IoHash& ChunkHash) override;
- virtual void FilterChunks(HashKeySet& InOutChunks) override;
- virtual void Flush() override;
- virtual void ScrubStorage(ScrubContext& Ctx) override;
- virtual void GarbageCollect(GcContext& GcCtx) override;
- virtual CidStoreSize TotalSize() const override;
+ virtual void Initialize(const CidStoreConfiguration& InConfig) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
+ virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace) override;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) override;
+ virtual void FilterChunks(HashKeySet& InOutChunks) override;
+ virtual void Flush() override;
+ virtual void ScrubStorage(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
+ virtual CidStoreSize TotalSize() const override;
private:
CasContainerStrategy m_TinyStrategy;
@@ -250,6 +253,108 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode)
return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode);
}
+static void
+GetCompactCasResults(CasContainerStrategy& Strategy,
+ std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ std::span<size_t> Indexes,
+ std::vector<CasStore::InsertResult> Results)
+{
+ const size_t Count = Indexes.size();
+ if (Count == 1)
+ {
+ const size_t Index = Indexes[0];
+ Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index]);
+ return;
+ }
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> Hashes;
+ Chunks.reserve(Count);
+ Hashes.reserve(Count);
+ for (size_t Index : Indexes)
+ {
+ Chunks.push_back(Data[Index]);
+ Hashes.push_back(ChunkHashes[Index]);
+ }
+
+ Strategy.InsertChunks(Chunks, Hashes);
+
+ for (size_t Offset = 0; Offset < Count; Offset++)
+ {
+ size_t Index = Indexes[Offset];
+ Results[Index] = Results[Offset];
+ }
+};
+
+static void
+GetFileCasResults(FileCasStrategy& Strategy,
+ CasStore::InsertMode Mode,
+ std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ std::span<size_t> Indexes,
+ std::vector<CasStore::InsertResult> Results)
+{
+ for (size_t Index : Indexes)
+ {
+ Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index], Mode);
+ }
+};
+
+std::vector<CasStore::InsertResult>
+CasImpl::InsertChunks(std::span<IoBuffer> Data, std::span<IoHash> ChunkHashes, CasStore::InsertMode Mode)
+{
+ ZEN_TRACE_CPU("CAS::InsertChunks");
+ ZEN_ASSERT(Data.size() == ChunkHashes.size());
+
+ if (Data.size() == 1)
+ {
+ std::vector<CasStore::InsertResult> Result(1);
+ Result[0] = InsertChunk(Data[0], ChunkHashes[0], Mode);
+ return Result;
+ }
+
+ std::vector<size_t> TinyIndexes;
+ std::vector<size_t> SmallIndexes;
+ std::vector<size_t> LargeIndexes;
+
+ for (size_t Index = 0; Index < Data.size(); Index++)
+ {
+ const uint64_t ChunkSize = Data[Index].Size();
+ if (ChunkSize < m_Config.TinyValueThreshold)
+ {
+ ZEN_ASSERT(ChunkSize);
+ TinyIndexes.push_back(Index);
+ }
+ else if (ChunkSize < m_Config.HugeValueThreshold)
+ {
+ SmallIndexes.push_back(Index);
+ }
+ else
+ {
+ LargeIndexes.push_back(Index);
+ }
+ }
+
+ std::vector<CasStore::InsertResult> Result(Data.size());
+
+ if (!TinyIndexes.empty())
+ {
+ GetCompactCasResults(m_TinyStrategy, Data, ChunkHashes, TinyIndexes, Result);
+ }
+
+ if (!SmallIndexes.empty())
+ {
+ GetCompactCasResults(m_SmallStrategy, Data, ChunkHashes, SmallIndexes, Result);
+ }
+
+ if (!LargeIndexes.empty())
+ {
+ GetFileCasResults(m_LargeStrategy, Mode, Data, ChunkHashes, LargeIndexes, Result);
+ }
+
+ return Result;
+}
+
IoBuffer
CasImpl::FindChunk(const IoHash& ChunkHash)
{