aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-17 16:48:57 +0100
committerGitHub Enterprise <[email protected]>2025-11-17 16:48:57 +0100
commitec4aa58a1bc03351f56325b04933d85dc6101135 (patch)
treee6ed15dfee7316dd52ae989c0d883b419e79c3a7 /src/zenstore/blockstore.cpp
parentInclude version string on the dashboard's start page. (#651) (diff)
downloadzen-ec4aa58a1bc03351f56325b04933d85dc6101135.tar.xz
zen-ec4aa58a1bc03351f56325b04933d85dc6101135.zip
add append-only buffering of BlockStoreFile (#652)
* add append-only buffering of BlockStoreFile replaces use of BasicFileWriter in Compact which bypassed cached position in BlockStore
Diffstat (limited to 'src/zenstore/blockstore.cpp')
-rw-r--r--src/zenstore/blockstore.cpp133
1 files changed, 124 insertions, 9 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 86aa234c3..d94643fa7 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -6,6 +6,7 @@
#include <zencore/except.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/memory/memory.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
@@ -219,6 +220,95 @@ BlockStoreFile::SetMetaData(const IoBuffer& Payload)
return true;
}
+class BlockStoreFileAppender
+{
+public:
+ BlockStoreFileAppender() = delete;
+ BlockStoreFileAppender(const BlockStoreFileAppender&) = delete;
+ BlockStoreFileAppender(BlockStoreFileAppender&&) = delete;
+ BlockStoreFileAppender& operator=(const BlockStoreFileAppender&) = delete;
+
+ BlockStoreFileAppender(BlockStoreFile& BlockFile, uint64_t BufferSize)
+ : m_BlockFile(BlockFile)
+ , m_BufferSize(BufferSize)
+ , m_Buffer(nullptr)
+ {
+ m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize);
+ }
+ ~BlockStoreFileAppender()
+ {
+ Flush();
+ Memory::Free(m_Buffer);
+ }
+
+ uint64_t Append(const void* Data, uint64_t Size, uint64_t Alignment)
+ {
+ AlignWritePos(Alignment);
+
+ if (Size >= m_BufferSize)
+ {
+ Flush();
+
+ const uint64_t WrittenPos = m_WritePos;
+ m_BlockFile.Write(m_Buffer, Size, m_BufferStart);
+ m_WritePos += Size;
+ m_BufferStart = m_WritePos;
+ return WrittenPos;
+ }
+
+ {
+ const uint64_t PendingSize = m_WritePos - m_BufferStart;
+ if (PendingSize + Size > m_BufferSize)
+ {
+ Flush();
+ }
+ }
+
+ ZEN_ASSERT((m_WritePos - m_BufferStart) + Size <= m_BufferSize);
+
+ const uint64_t WrittenPos = m_WritePos;
+ memmove(&m_Buffer[m_WritePos - m_BufferStart], Data, Size);
+ m_WritePos += Size;
+ return WrittenPos;
+ }
+
+ void Flush()
+ {
+ uint64_t PendingSize = m_WritePos - m_BufferStart;
+ if (PendingSize > 0)
+ {
+ m_BlockFile.Write(m_Buffer, PendingSize, m_BufferStart);
+ m_BufferStart = m_WritePos;
+ }
+ }
+
+private:
+ void AlignWritePos(uint64_t Alignment)
+ {
+ const uint64_t AlignedWritePos = RoundUp(m_WritePos, Alignment);
+ const uint64_t Padding = AlignedWritePos - m_WritePos;
+ ZEN_ASSERT_SLOW(Padding <= m_BufferSize);
+
+ if (Padding > 0)
+ {
+ const uint64_t PendingSize = m_WritePos - m_BufferStart;
+ if (PendingSize + Padding > m_BufferSize)
+ {
+ Flush();
+ }
+ memset(&m_Buffer[m_WritePos - m_BufferStart], 0, Padding);
+ m_WritePos += Padding;
+ ZEN_ASSERT(m_WritePos == AlignedWritePos);
+ }
+ }
+
+ BlockStoreFile& m_BlockFile;
+ const uint64_t m_BufferSize;
+ uint8_t* m_Buffer;
+ uint64_t m_BufferStart = 0;
+ uint64_t m_WritePos = 0;
+};
+
static bool
IsMetaDataValid(const std::filesystem::path& BlockPath, const std::filesystem::path& MetaPath)
{
@@ -1081,10 +1171,12 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
uint64_t AddedSize = 0;
uint64_t RemovedSize = 0;
- Ref<BlockStoreFile> NewBlockFile;
- uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block
- std::unique_ptr<BasicFileWriter> TargetFileBuffer;
- auto NewBlockFileGuard = MakeGuard([&]() {
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block
+
+ std::unique_ptr<BlockStoreFileAppender> TargetFileBuffer;
+
+ auto NewBlockFileGuard = MakeGuard([&]() {
TargetFileBuffer.reset();
if (NewBlockFile)
{
@@ -1226,6 +1318,13 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
LogPrefix,
GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(),
NiceBytes(NewBlockSize));
+#ifndef NDEBUG
+ for (const std::pair<size_t, BlockStoreLocation>& MovedChunk : MovedChunks)
+ {
+ ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex);
+ ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize);
+ }
+#endif // NDEBUG
if (!ReportChanges())
{
@@ -1295,13 +1394,11 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
NewBlockFile->Create(m_MaxBlockSize);
NewBlockIndex = NextBlockIndex;
WriteOffset = 0;
- TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(256u * 1024u, m_MaxBlockSize));
+ TargetFileBuffer = std::make_unique<BlockStoreFileAppender>(*NewBlockFile, Min(256u * 1024u, m_MaxBlockSize));
}
const uint64_t OldWriteOffset = WriteOffset;
- WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment);
-
- TargetFileBuffer->Write(ChunkView.GetData(), ChunkLocation.Size, WriteOffset);
+ WriteOffset = TargetFileBuffer->Append(ChunkView.GetData(), ChunkLocation.Size, PayloadAlignment);
MovedChunks.push_back(
{ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
WriteOffset += ChunkLocation.Size;
@@ -1321,6 +1418,17 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
{
TargetFileBuffer->Flush();
}
+#ifndef NDEBUG
+ if (NewBlockFile)
+ {
+ const uint64_t NewBlockSize = NewBlockFile->FileSize();
+ for (const std::pair<size_t, BlockStoreLocation>& MovedChunk : MovedChunks)
+ {
+ ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex);
+ ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize);
+ }
+ }
+#endif // NDEBUG
if (!ReportChanges())
{
@@ -1352,10 +1460,17 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
{
ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
NewBlockFile->Flush();
- uint64_t NewBlockSize = NewBlockFile->FileSize();
+ const uint64_t NewBlockSize = NewBlockFile->FileSize();
MovedSize += NewBlockSize;
NewBlockFile = nullptr;
ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize));
+#ifndef NDEBUG
+ for (const std::pair<size_t, BlockStoreLocation>& MovedChunk : MovedChunks)
+ {
+ ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex);
+ ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize);
+ }
+#endif // NDEBUG
}
ReportChanges();