diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-22 20:21:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-22 20:21:02 +0200 |
| commit | 96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch) | |
| tree | 9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/blockstore.cpp | |
| parent | fix LogRemoteStoreStatsDetails (#53) (diff) | |
| download | zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip | |
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 644ddf7b4..8ef9e842f 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -453,6 +453,103 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons } } +void +BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) +{ + ZEN_TRACE_CPU("BlockStore::WriteChunks"); + + ZEN_ASSERT(!Datas.empty()); + ZEN_ASSERT(Alignment > 0u); + + const size_t TotalCount = Datas.size(); + uint64_t TotalSize = 0; + uint64_t LargestSize = 0; + for (const IoBuffer& Data : Datas) + { + uint64_t Size = Data.GetSize(); + ZEN_ASSERT(Size > 0); + ZEN_ASSERT(Size <= m_MaxBlockSize); + TotalSize += Size; + LargestSize = Max(LargestSize, Size); + } + + const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u); + const uint64_t BufferSize = Min(TotalSize, MinSize); + std::vector<uint8_t> Buffer(BufferSize); + + size_t Offset = 0; + while (Offset < TotalCount) + { + size_t Count = 1; + uint32_t RangeSize = gsl::narrow<uint32_t>(Datas[Offset].GetSize()); + + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment); + if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize)) + { + std::filesystem::path BlockPath; + WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); + if (WriteBlockIndex == (uint32_t)m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); + NewBlockFile->Create(m_MaxBlockSize); + + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; + m_WriteBlock = NewBlockFile; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + AlignedInsertOffset = 0; + } + while (Offset + Count < TotalCount) + { + uint32_t NextRangeSize = gsl::narrow<uint32_t>(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize()); + if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize)) + { + break; + } + Count++; + RangeSize = NextRangeSize; + } + m_CurrentInsertOffset = AlignedInsertOffset + RangeSize; + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlocks.push_back(WriteBlockIndex); + InsertLock.ReleaseNow(); + + { + MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); + for (size_t Index = 0; Index < Count; Index++) + { + MemoryView SourceBuffer = Datas[Index + Offset]; + WriteBuffer.CopyFrom(SourceBuffer); + WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); + } + WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); + } + + m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); + + uint32_t ChunkOffset = AlignedInsertOffset; + std::vector<BlockStoreLocation> Locations(Count); + for (size_t Index = 0; Index < Count; Index++) + { + uint32_t ChunkSize = gsl::narrow<uint32_t>(Datas[Offset + Index].GetSize()); + Locations[Index] = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize}; + ChunkOffset += gsl::narrow<uint32_t>(RoundUp(ChunkSize, Alignment)); + } + Callback(Locations); + + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + } + + Offset += Count; + } +} + BlockStore::ReclaimSnapshotState BlockStore::GetReclaimSnapshotState() { @@ -1543,6 +1640,72 @@ namespace blockstore::impl { } // namespace blockstore::impl +TEST_CASE("blockstore.multichunks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory, 128, 1024); + + std::vector<IoBuffer> MultiChunkData; + std::string FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size())); + + std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size())); + + std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size())); + + BlockStoreLocation Locations[5]; + size_t ChunkOffset = 0; + Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) { + for (const BlockStoreLocation& Location : InLocations) + { + Locations[ChunkOffset++] = Location; + } + CHECK(ChunkOffset <= MultiChunkData.size()); + }); + CHECK(ChunkOffset == 3); + + CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData); + CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData); + CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData); + + MultiChunkData.resize(0); + std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size())); + + std::string FifthChunkData = + "ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93"; + MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size())); + + Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) { + for (const BlockStoreLocation& Location : InLocations) + { + CHECK(ChunkOffset < 5); + Locations[ChunkOffset++] = Location; + } + }); + CHECK(ChunkOffset == 5); + + CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData); + CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData); + CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData); + + CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData); + CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex); + CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData); +} + TEST_CASE("blockstore.chunks") { using namespace blockstore::impl; |