diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-17 16:48:57 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-17 16:48:57 +0100 |
| commit | ec4aa58a1bc03351f56325b04933d85dc6101135 (patch) | |
| tree | e6ed15dfee7316dd52ae989c0d883b419e79c3a7 /src/zenstore/blockstore.cpp | |
| parent | Include version string on the dashboard's start page. (#651) (diff) | |
| download | zen-ec4aa58a1bc03351f56325b04933d85dc6101135.tar.xz zen-ec4aa58a1bc03351f56325b04933d85dc6101135.zip | |
add append-only buffering of BlockStoreFile (#652)
* add append-only buffering of BlockStoreFile
replaces use of BasicFileWriter in Compact which bypassed cached position in BlockStore
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 133 |
1 files changed, 124 insertions, 9 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 86aa234c3..d94643fa7 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -6,6 +6,7 @@ #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/memory/memory.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> @@ -219,6 +220,95 @@ BlockStoreFile::SetMetaData(const IoBuffer& Payload) return true; } +class BlockStoreFileAppender +{ +public: + BlockStoreFileAppender() = delete; + BlockStoreFileAppender(const BlockStoreFileAppender&) = delete; + BlockStoreFileAppender(BlockStoreFileAppender&&) = delete; + BlockStoreFileAppender& operator=(const BlockStoreFileAppender&) = delete; + + BlockStoreFileAppender(BlockStoreFile& BlockFile, uint64_t BufferSize) + : m_BlockFile(BlockFile) + , m_BufferSize(BufferSize) + , m_Buffer(nullptr) + { + m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize); + } + ~BlockStoreFileAppender() + { + Flush(); + Memory::Free(m_Buffer); + } + + uint64_t Append(const void* Data, uint64_t Size, uint64_t Alignment) + { + AlignWritePos(Alignment); + + if (Size >= m_BufferSize) + { + Flush(); + + const uint64_t WrittenPos = m_WritePos; + m_BlockFile.Write(m_Buffer, Size, m_BufferStart); + m_WritePos += Size; + m_BufferStart = m_WritePos; + return WrittenPos; + } + + { + const uint64_t PendingSize = m_WritePos - m_BufferStart; + if (PendingSize + Size > m_BufferSize) + { + Flush(); + } + } + + ZEN_ASSERT((m_WritePos - m_BufferStart) + Size <= m_BufferSize); + + const uint64_t WrittenPos = m_WritePos; + memmove(&m_Buffer[m_WritePos - m_BufferStart], Data, Size); + m_WritePos += Size; + return WrittenPos; + } + + void Flush() + { + uint64_t PendingSize = m_WritePos - m_BufferStart; + if (PendingSize > 0) + { + m_BlockFile.Write(m_Buffer, PendingSize, m_BufferStart); + m_BufferStart = m_WritePos; + } + } + +private: + void AlignWritePos(uint64_t Alignment) + { + const uint64_t AlignedWritePos = RoundUp(m_WritePos, Alignment); + const uint64_t Padding = AlignedWritePos - m_WritePos; + ZEN_ASSERT_SLOW(Padding <= m_BufferSize); + + if (Padding > 0) + { + const uint64_t PendingSize = m_WritePos - m_BufferStart; + if (PendingSize + Padding > m_BufferSize) + { + Flush(); + } + memset(&m_Buffer[m_WritePos - m_BufferStart], 0, Padding); + m_WritePos += Padding; + ZEN_ASSERT(m_WritePos == AlignedWritePos); + } + } + + BlockStoreFile& m_BlockFile; + const uint64_t m_BufferSize; + uint8_t* m_Buffer; + uint64_t m_BufferStart = 0; + uint64_t m_WritePos = 0; +}; + static bool IsMetaDataValid(const std::filesystem::path& BlockPath, const std::filesystem::path& MetaPath) { @@ -1081,10 +1171,12 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, uint64_t AddedSize = 0; uint64_t RemovedSize = 0; - Ref<BlockStoreFile> NewBlockFile; - uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block - std::unique_ptr<BasicFileWriter> TargetFileBuffer; - auto NewBlockFileGuard = MakeGuard([&]() { + Ref<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block + + std::unique_ptr<BlockStoreFileAppender> TargetFileBuffer; + + auto NewBlockFileGuard = MakeGuard([&]() { TargetFileBuffer.reset(); if (NewBlockFile) { @@ -1226,6 +1318,13 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize)); +#ifndef NDEBUG + for (const std::pair<size_t, BlockStoreLocation>& MovedChunk : MovedChunks) + { + ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex); + ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize); + } +#endif // NDEBUG if (!ReportChanges()) { @@ -1295,13 +1394,11 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, NewBlockFile->Create(m_MaxBlockSize); NewBlockIndex = NextBlockIndex; WriteOffset = 0; - TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(256u * 1024u, m_MaxBlockSize)); + TargetFileBuffer = std::make_unique<BlockStoreFileAppender>(*NewBlockFile, Min(256u * 1024u, m_MaxBlockSize)); } const uint64_t OldWriteOffset = WriteOffset; - WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment); - - TargetFileBuffer->Write(ChunkView.GetData(), ChunkLocation.Size, WriteOffset); + WriteOffset = TargetFileBuffer->Append(ChunkView.GetData(), ChunkLocation.Size, PayloadAlignment); MovedChunks.push_back( {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}}); WriteOffset += ChunkLocation.Size; @@ -1321,6 +1418,17 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, { TargetFileBuffer->Flush(); } +#ifndef NDEBUG + if (NewBlockFile) + { + const uint64_t NewBlockSize = NewBlockFile->FileSize(); + for (const std::pair<size_t, BlockStoreLocation>& MovedChunk : MovedChunks) + { + ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex); + ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize); + } + } +#endif // NDEBUG if (!ReportChanges()) { @@ -1352,10 +1460,17 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); - uint64_t NewBlockSize = NewBlockFile->FileSize(); + const uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize)); +#ifndef NDEBUG + for (const std::pair<size_t, BlockStoreLocation>& MovedChunk : MovedChunks) + { + ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex); + ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize); + } +#endif // NDEBUG } ReportChanges(); |