diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-27 11:17:36 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-27 11:17:36 +0100 |
| commit | 91e085f7caa9c384d8dab781e9b29ce0d70f6626 (patch) | |
| tree | c0ced6e0e825b5074d3012b037fc20847e22a513 /src | |
| parent | use already built lookup when verifying folder (#615) (diff) | |
| download | zen-91e085f7caa9c384d8dab781e9b29ce0d70f6626.tar.xz zen-91e085f7caa9c384d8dab781e9b29ce0d70f6626.zip | |
optimize blockstore flush (#614)
* rework block store block flushing to only happen once at end of block write outside of locks
* fix warning at startup if no gc.dlog file exists
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 83 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 16 |
3 files changed, 66 insertions, 35 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 337a5f8e0..86aa234c3 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -177,19 +177,10 @@ BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) } void -BlockStoreFile::Flush(uint64_t FinalSize) +BlockStoreFile::Flush() { ZEN_TRACE_CPU("BlockStoreFile::Flush"); m_File.Flush(); - if (FinalSize != (uint64_t)-1) - { - ZEN_ASSERT(FinalSize <= m_IoBuffer.GetSize()); - uint64_t ExpectedSize = 0; - while (!m_CachedFileSize.compare_exchange_weak(ExpectedSize, FinalSize)) - { - ZEN_ASSERT(ExpectedSize <= FinalSize); - } - } } BasicFile& @@ -568,6 +559,42 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, } void +BlockStore::AddActiveWriteBlock(RwLock::ExclusiveLockScope& Lock, uint32_t BlockIndex) +{ + ZEN_UNUSED(Lock); + m_ActiveWriteBlocks.push_back(BlockIndex); +} + +void +BlockStore::RemoveActiveWriteBlock(uint32_t BlockIndex) +{ + eastl::fixed_vector<Ref<BlockStoreFile>, 2> FlushBlocks; + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), BlockIndex)); + for (auto It = m_BlocksToFlush.begin(); It != m_BlocksToFlush.end();) + { + const uint32_t FlushBlockIndex = *It; + if (std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), FlushBlockIndex) == m_ActiveWriteBlocks.end()) + { + FlushBlocks.push_back(m_ChunkBlocks[FlushBlockIndex]); + ZEN_DEBUG("Flushing block {} at '{}'", FlushBlockIndex, GetBlockPath(m_BlocksBasePath, FlushBlockIndex)); + It = m_BlocksToFlush.erase(It); + } + else + { + It++; + } + } + } + for (Ref<BlockStoreFile>& FlushBlock : FlushBlocks) + { + FlushBlock->Flush(); + FlushBlock = nullptr; + } +} + +void BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); @@ -589,10 +616,8 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons { if (m_WriteBlock) { - m_WriteBlock->Flush(m_CurrentInsertOffset); - m_WriteBlock = nullptr; + m_BlocksToFlush.push_back(WriteBlockIndex); } - WriteBlockIndex += IsWriting ? 1 : 0; std::filesystem::path BlockPath; WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); @@ -604,6 +629,8 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); NewBlockFile->Create(m_MaxBlockSize); + ZEN_DEBUG("Created block {} at '{}'", WriteBlockIndex, BlockPath); + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; m_WriteBlock = NewBlockFile; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); @@ -613,12 +640,10 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons uint32_t AlignedWriteSize = AlignedInsertOffset - m_CurrentInsertOffset + ChunkSize; m_CurrentInsertOffset = AlignedInsertOffset + ChunkSize; Ref<BlockStoreFile> WriteBlock = m_WriteBlock; - m_ActiveWriteBlocks.push_back(WriteBlockIndex); + AddActiveWriteBlock(InsertLock, WriteBlockIndex); InsertLock.ReleaseNow(); - auto _ = MakeGuard([this, WriteBlockIndex]() { - RwLock::ExclusiveLockScope _(m_InsertLock); - m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); - }); + + auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); WriteBlock->Write(Data, ChunkSize, AlignedInsertOffset); m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); @@ -662,6 +687,10 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment); if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize)) { + if (m_WriteBlock) + { + m_BlocksToFlush.push_back(WriteBlockIndex); + } std::filesystem::path BlockPath; WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); if (WriteBlockIndex == (uint32_t)m_MaxBlockCount) @@ -671,6 +700,8 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); NewBlockFile->Create(m_MaxBlockSize); + ZEN_DEBUG("Created block {} at '{}'", WriteBlockIndex, BlockPath); + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; m_WriteBlock = NewBlockFile; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); @@ -689,12 +720,10 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con } m_CurrentInsertOffset = AlignedInsertOffset + RangeSize; Ref<BlockStoreFile> WriteBlock = m_WriteBlock; - m_ActiveWriteBlocks.push_back(WriteBlockIndex); + AddActiveWriteBlock(InsertLock, WriteBlockIndex); InsertLock.ReleaseNow(); - auto _ = MakeGuard([this, WriteBlockIndex]() { - RwLock::ExclusiveLockScope _(m_InsertLock); - m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); - }); + + auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); { MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); @@ -784,7 +813,7 @@ BlockStore::Flush(bool ForceNewBlock) { if (m_WriteBlock) { - m_WriteBlock->Flush(m_CurrentInsertOffset); + m_WriteBlock->Flush(); } m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; @@ -1185,7 +1214,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(WriteOffset); + NewBlockFile->Flush(); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; @@ -1322,7 +1351,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(WriteOffset); + NewBlockFile->Flush(); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; @@ -1453,8 +1482,6 @@ TEST_CASE("blockstore.blockfile") CHECK(std::string(Boop) == "boop"); File1.Flush(); CHECK(File1.FileSize() == 10); - File1.Flush(10); - CHECK(File1.FileSize() == 10); } { BlockStoreFile File1(RootDirectory / "1"); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index c05cb5269..e58bbb6e2 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1782,7 +1782,7 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) } const std::filesystem::path GcDiskUsageLogPath = m_Config.RootDirectory / "gc.dlog"; - if (!TCasLogFile<DiskUsageWindow::DiskUsageEntry>::IsValid(GcDiskUsageLogPath)) + if (IsFile(GcDiskUsageLogPath) && !TCasLogFile<DiskUsageWindow::DiskUsageEntry>::IsValid(GcDiskUsageLogPath)) { ZEN_WARN("GC disk usage log at '{}' is malformed, restarting log", GcDiskUsageLogPath); RemoveFile(GcDiskUsageLogPath); diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 4006f4275..bc9b5da40 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -94,7 +94,7 @@ struct BlockStoreFile : public RefCounted IoBuffer GetChunk(uint64_t Offset, uint64_t Size); void Read(void* Data, uint64_t Size, uint64_t FileOffset); void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Flush(uint64_t FinalSize = (uint64_t)-1); + void Flush(); BasicFile& GetBasicFile(); void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); bool IsOpen() const; @@ -186,17 +186,21 @@ public: IoBuffer GetMetaData(uint32_t BlockIndex) const; private: + void AddActiveWriteBlock(RwLock::ExclusiveLockScope& Lock, uint32_t BlockIndex); + void RemoveActiveWriteBlock(uint32_t BlockIndex); + static const char* GetBlockFileExtension(); static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); uint32_t GetFreeBlockIndex(uint32_t StartProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const; tsl::robin_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; - mutable RwLock m_InsertLock; // used to serialize inserts - Ref<BlockStoreFile> m_WriteBlock; - std::uint32_t m_CurrentInsertOffset = 0; - std::atomic_uint32_t m_WriteBlockIndex{}; - std::vector<uint32_t> m_ActiveWriteBlocks; + mutable RwLock m_InsertLock; // used to serialize inserts + Ref<BlockStoreFile> m_WriteBlock; + std::uint32_t m_CurrentInsertOffset = 0; + std::atomic_uint32_t m_WriteBlockIndex{}; + eastl::fixed_vector<uint32_t, 2> m_ActiveWriteBlocks; + eastl::fixed_vector<uint32_t, 2> m_BlocksToFlush; uint64_t m_MaxBlockSize = 1u << 28; uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; |