diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-22 20:21:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-22 20:21:02 +0200 |
| commit | 96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch) | |
| tree | 9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/cas.cpp | |
| parent | fix LogRemoteStoreStatsDetails (#53) (diff) | |
| download | zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip | |
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/cas.cpp')
| -rw-r--r-- | src/zenstore/cas.cpp | 123 |
1 files changed, 114 insertions, 9 deletions
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index a1a4a0acc..2fe466028 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -49,15 +49,18 @@ public: CasImpl(GcManager& Gc); virtual ~CasImpl(); - virtual void Initialize(const CidStoreConfiguration& InConfig) override; - virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; - virtual bool ContainsChunk(const IoHash& ChunkHash) override; - virtual void FilterChunks(HashKeySet& InOutChunks) override; - virtual void Flush() override; - virtual void ScrubStorage(ScrubContext& Ctx) override; - virtual void GarbageCollect(GcContext& GcCtx) override; - virtual CidStoreSize TotalSize() const override; + virtual void Initialize(const CidStoreConfiguration& InConfig) override; + virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; + virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + InsertMode Mode = InsertMode::kMayBeMovedInPlace) override; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; + virtual bool ContainsChunk(const IoHash& ChunkHash) override; + virtual void FilterChunks(HashKeySet& InOutChunks) override; + virtual void Flush() override; + virtual void ScrubStorage(ScrubContext& Ctx) override; + virtual void GarbageCollect(GcContext& GcCtx) override; + virtual CidStoreSize TotalSize() const override; private: CasContainerStrategy m_TinyStrategy; @@ -250,6 +253,108 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode); } +static void +GetCompactCasResults(CasContainerStrategy& Strategy, + std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + std::span<size_t> Indexes, + std::vector<CasStore::InsertResult> Results) +{ + const size_t Count = Indexes.size(); + if (Count == 1) + { + const size_t Index = Indexes[0]; + Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index]); + return; + } + std::vector<IoBuffer> Chunks; + std::vector<IoHash> Hashes; + Chunks.reserve(Count); + Hashes.reserve(Count); + for (size_t Index : Indexes) + { + Chunks.push_back(Data[Index]); + Hashes.push_back(ChunkHashes[Index]); + } + + Strategy.InsertChunks(Chunks, Hashes); + + for (size_t Offset = 0; Offset < Count; Offset++) + { + size_t Index = Indexes[Offset]; + Results[Index] = Results[Offset]; + } +}; + +static void +GetFileCasResults(FileCasStrategy& Strategy, + CasStore::InsertMode Mode, + std::span<IoBuffer> Data, + std::span<IoHash> ChunkHashes, + std::span<size_t> Indexes, + std::vector<CasStore::InsertResult> Results) +{ + for (size_t Index : Indexes) + { + Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index], Mode); + } +}; + +std::vector<CasStore::InsertResult> +CasImpl::InsertChunks(std::span<IoBuffer> Data, std::span<IoHash> ChunkHashes, CasStore::InsertMode Mode) +{ + ZEN_TRACE_CPU("CAS::InsertChunks"); + ZEN_ASSERT(Data.size() == ChunkHashes.size()); + + if (Data.size() == 1) + { + std::vector<CasStore::InsertResult> Result(1); + Result[0] = InsertChunk(Data[0], ChunkHashes[0], Mode); + return Result; + } + + std::vector<size_t> TinyIndexes; + std::vector<size_t> SmallIndexes; + std::vector<size_t> LargeIndexes; + + for (size_t Index = 0; Index < Data.size(); Index++) + { + const uint64_t ChunkSize = Data[Index].Size(); + if (ChunkSize < m_Config.TinyValueThreshold) + { + ZEN_ASSERT(ChunkSize); + TinyIndexes.push_back(Index); + } + else if (ChunkSize < m_Config.HugeValueThreshold) + { + SmallIndexes.push_back(Index); + } + else + { + LargeIndexes.push_back(Index); + } + } + + std::vector<CasStore::InsertResult> Result(Data.size()); + + if (!TinyIndexes.empty()) + { + GetCompactCasResults(m_TinyStrategy, Data, ChunkHashes, TinyIndexes, Result); + } + + if (!SmallIndexes.empty()) + { + GetCompactCasResults(m_SmallStrategy, Data, ChunkHashes, SmallIndexes, Result); + } + + if (!LargeIndexes.empty()) + { + GetFileCasResults(m_LargeStrategy, Mode, Data, ChunkHashes, LargeIndexes, Result); + } + + return Result; +} + IoBuffer CasImpl::FindChunk(const IoHash& ChunkHash) { |