aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore/blockstore.cpp
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore/blockstore.cpp')
-rw-r--r--src/zenstore/blockstore.cpp163
1 files changed, 163 insertions, 0 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 644ddf7b4..8ef9e842f 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -453,6 +453,103 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons
}
}
+void
+BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
+{
+ ZEN_TRACE_CPU("BlockStore::WriteChunks");
+
+ ZEN_ASSERT(!Datas.empty());
+ ZEN_ASSERT(Alignment > 0u);
+
+ const size_t TotalCount = Datas.size();
+ uint64_t TotalSize = 0;
+ uint64_t LargestSize = 0;
+ for (const IoBuffer& Data : Datas)
+ {
+ uint64_t Size = Data.GetSize();
+ ZEN_ASSERT(Size > 0);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ TotalSize += Size;
+ LargestSize = Max(LargestSize, Size);
+ }
+
+ const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u);
+ const uint64_t BufferSize = Min(TotalSize, MinSize);
+ std::vector<uint8_t> Buffer(BufferSize);
+
+ size_t Offset = 0;
+ while (Offset < TotalCount)
+ {
+ size_t Count = 1;
+ uint32_t RangeSize = gsl::narrow<uint32_t>(Datas[Offset].GetSize());
+
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment);
+ if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize))
+ {
+ std::filesystem::path BlockPath;
+ WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath);
+ if (WriteBlockIndex == (uint32_t)m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
+ NewBlockFile->Create(m_MaxBlockSize);
+
+ m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
+ m_WriteBlock = NewBlockFile;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ AlignedInsertOffset = 0;
+ }
+ while (Offset + Count < TotalCount)
+ {
+ uint32_t NextRangeSize = gsl::narrow<uint32_t>(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize());
+ if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize))
+ {
+ break;
+ }
+ Count++;
+ RangeSize = NextRangeSize;
+ }
+ m_CurrentInsertOffset = AlignedInsertOffset + RangeSize;
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
+ InsertLock.ReleaseNow();
+
+ {
+ MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ MemoryView SourceBuffer = Datas[Index + Offset];
+ WriteBuffer.CopyFrom(SourceBuffer);
+ WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
+ }
+ WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
+ }
+
+ m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+
+ uint32_t ChunkOffset = AlignedInsertOffset;
+ std::vector<BlockStoreLocation> Locations(Count);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ uint32_t ChunkSize = gsl::narrow<uint32_t>(Datas[Offset + Index].GetSize());
+ Locations[Index] = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize};
+ ChunkOffset += gsl::narrow<uint32_t>(RoundUp(ChunkSize, Alignment));
+ }
+ Callback(Locations);
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
+
+ Offset += Count;
+ }
+}
+
BlockStore::ReclaimSnapshotState
BlockStore::GetReclaimSnapshotState()
{
@@ -1543,6 +1640,72 @@ namespace blockstore::impl {
} // namespace blockstore::impl
+TEST_CASE("blockstore.multichunks")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory, 128, 1024);
+
+ std::vector<IoBuffer> MultiChunkData;
+ std::string FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size()));
+
+ std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size()));
+
+ std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size()));
+
+ BlockStoreLocation Locations[5];
+ size_t ChunkOffset = 0;
+ Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
+ for (const BlockStoreLocation& Location : InLocations)
+ {
+ Locations[ChunkOffset++] = Location;
+ }
+ CHECK(ChunkOffset <= MultiChunkData.size());
+ });
+ CHECK(ChunkOffset == 3);
+
+ CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
+ CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
+ CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);
+
+ MultiChunkData.resize(0);
+ std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size()));
+
+ std::string FifthChunkData =
+ "ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size()));
+
+ Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
+ for (const BlockStoreLocation& Location : InLocations)
+ {
+ CHECK(ChunkOffset < 5);
+ Locations[ChunkOffset++] = Location;
+ }
+ });
+ CHECK(ChunkOffset == 5);
+
+ CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
+ CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
+ CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);
+
+ CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData);
+ CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData);
+}
+
TEST_CASE("blockstore.chunks")
{
using namespace blockstore::impl;