diff options
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 818 |
1 files changed, 124 insertions, 694 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 00a38c3b6..3974fb989 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -26,10 +26,20 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <random> #endif +#include <zencore/memory/llm.h> + ////////////////////////////////////////////////////////////////////////// namespace zen { +const FLLMTag& +GetBlocksTag() +{ + static FLLMTag _("blocks"); + + return _; +} + ////////////////////////////////////////////////////////////////////////// BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) @@ -254,7 +264,6 @@ BlockStoreFile::GetMetaPath() const //////////////////////////////////////////////////////// constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024; -constexpr uint64_t IterateSmallChunkMaxGapSize = 4 * 1024; BlockStore::BlockStore() { @@ -267,6 +276,7 @@ BlockStore::~BlockStore() void BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::Initialize"); ZEN_ASSERT(MaxBlockSize > 0); @@ -331,18 +341,9 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max } void -BlockStore::BlockIndexSet::Add(uint32_t BlockIndex) -{ - if (!std::binary_search(begin(BlockIndexes), end(BlockIndexes), BlockIndex)) - { - auto It = std::lower_bound(begin(BlockIndexes), end(BlockIndexes), BlockIndex); - BlockIndexes.insert(It, BlockIndex); - } -} - -void -BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) +BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk"); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); @@ -355,7 +356,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) DeleteBlocks.insert(It.first); } - for (const uint32_t BlockIndex : KnownLocations.GetBlockIndices()) + for (const uint32_t BlockIndex : KnownBlocks) { DeleteBlocks.erase(BlockIndex); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull()) @@ -389,10 +390,16 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) BlockStore::BlockEntryCountMap BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact"); BlockEntryCountMap Result; { RwLock::SharedLockScope InsertLock(m_InsertLock); + + const uint64_t SmallBlockLimit = m_MaxBlockSize / 2; + + std::vector<uint32_t> SmallBlockIndexes; + for (const auto& It : m_ChunkBlocks) { uint32_t BlockIndex = It.first; @@ -413,35 +420,53 @@ BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUs UsedCount = UsageIt->second.EntryCount; } - uint64_t BlockSize = It.second ? It.second->FileSize() : 0u; - if (BlockSize == 0) + uint64_t PhysicalSize = It.second ? It.second->FileSize() : 0u; + if (PhysicalSize == 0) { Result.insert_or_assign(BlockIndex, UsedCount); continue; } + bool IsBelowUnusedLimit = false; + if (BlockUsageThresholdPercent == 100) { - if (UsedSize < BlockSize) + if (UsedSize < PhysicalSize) { - Result.insert_or_assign(BlockIndex, UsedCount); + IsBelowUnusedLimit = true; } } else if (BlockUsageThresholdPercent == 0) { if (UsedSize == 0) { - Result.insert_or_assign(BlockIndex, UsedCount); + IsBelowUnusedLimit = true; } } else { - const uint32_t UsedPercent = UsedSize < BlockSize ? gsl::narrow<uint32_t>((100 * UsedSize) / BlockSize) : 100u; + const uint32_t UsedPercent = UsedSize < PhysicalSize ? gsl::narrow<uint32_t>((100 * UsedSize) / PhysicalSize) : 100u; if (UsedPercent < BlockUsageThresholdPercent) { - Result.insert_or_assign(BlockIndex, UsedCount); + IsBelowUnusedLimit = true; } } + + if (IsBelowUnusedLimit) + { + Result.insert_or_assign(BlockIndex, UsedCount); + } + else if (PhysicalSize < SmallBlockLimit) + { + Result.insert_or_assign(BlockIndex, UsedCount); + SmallBlockIndexes.push_back(BlockIndex); + } + } + + // If we only find one small block to compact, let it be. + if (SmallBlockIndexes.size() == 1 && Result.size() == 1) + { + Result.erase(SmallBlockIndexes[0]); } } return Result; @@ -495,6 +520,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, void BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunk"); ZEN_ASSERT(Data != nullptr); @@ -539,21 +565,21 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons Ref<BlockStoreFile> WriteBlock = m_WriteBlock; m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); + auto _ = MakeGuard([this, WriteBlockIndex]() { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + }); WriteBlock->Write(Data, ChunkSize, AlignedInsertOffset); m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); Callback({.BlockIndex = WriteBlockIndex, .Offset = AlignedInsertOffset, .Size = ChunkSize}); - - { - RwLock::ExclusiveLockScope _(m_InsertLock); - m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); - } } void BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunks"); ZEN_ASSERT(!Datas.empty()); @@ -615,6 +641,10 @@ BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const Wri Ref<BlockStoreFile> WriteBlock = m_WriteBlock; m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); + auto _ = MakeGuard([this, WriteBlockIndex]() { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + }); { MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); @@ -639,35 +669,10 @@ BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const Wri } Callback(Locations); - { - RwLock::ExclusiveLockScope _(m_InsertLock); - m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); - } - Offset += Count; } } -BlockStore::ReclaimSnapshotState -BlockStore::GetReclaimSnapshotState() -{ - ReclaimSnapshotState State; - RwLock::SharedLockScope _(m_InsertLock); - for (uint32_t BlockIndex : m_ActiveWriteBlocks) - { - State.m_ActiveWriteBlocks.insert(BlockIndex); - } - if (m_WriteBlock) - { - State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex); - } - for (auto It : m_ChunkBlocks) - { - State.m_BlockIndexes.insert(It.first); - } - return State; -} - IoBuffer BlockStore::TryGetChunk(const BlockStoreLocation& Location) const { @@ -690,6 +695,7 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const void BlockStore::Flush(bool ForceNewBlock) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::Flush"); if (ForceNewBlock) @@ -713,429 +719,6 @@ BlockStore::Flush(bool ForceNewBlock) } } -void -BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, - const std::vector<BlockStoreLocation>& ChunkLocations, - const ChunkIndexArray& KeepChunkIndexes, - uint32_t PayloadAlignment, - bool DryRun, - const ReclaimCallback& ChangeCallback, - const ClaimDiskReserveCallback& DiskReserveCallback) -{ - ZEN_TRACE_CPU("BlockStore::ReclaimSpace"); - - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = ChunkLocations.size(); - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = 0; - uint64_t NewTotalSize = 0; - - uint64_t MovedCount = 0; - uint64_t DeletedCount = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "chunks ({}).", - m_BlocksBasePath, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedCount, - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - }); - - size_t BlockCount = Snapshot.m_BlockIndexes.size(); - if (BlockCount == 0) - { - ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath); - return; - } - - tsl::robin_set<size_t> KeepChunkMap; - KeepChunkMap.reserve(KeepChunkIndexes.size()); - for (size_t KeepChunkIndex : KeepChunkIndexes) - { - KeepChunkMap.insert(KeepChunkIndex); - } - - tsl::robin_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<ChunkIndexArray> BlockKeepChunks; - std::vector<ChunkIndexArray> BlockDeleteChunks; - - BlockIndexToChunkMapIndex.reserve(BlockCount); - BlockKeepChunks.reserve(BlockCount); - BlockDeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; - - size_t DeleteCount = 0; - for (size_t Index = 0; Index < TotalChunkCount; ++Index) - { - const BlockStoreLocation& Location = ChunkLocations[Index]; - if (!Snapshot.m_BlockIndexes.contains(Location.BlockIndex)) - { - // We did not know about the block when we took the snapshot, don't touch it - continue; - } - OldTotalSize += Location.Size; - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); - size_t ChunkMapIndex = 0; - if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) - { - ChunkMapIndex = BlockKeepChunks.size(); - BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; - BlockKeepChunks.resize(ChunkMapIndex + 1); - BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); - BlockDeleteChunks.resize(ChunkMapIndex + 1); - BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); - } - else - { - ChunkMapIndex = BlockIndexPtr->second; - } - - if (KeepChunkMap.contains(Index)) - { - ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; - IndexMap.push_back(Index); - NewTotalSize += Location.Size; - continue; - } - ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; - IndexMap.push_back(Index); - DeleteCount++; - } - - std::vector<uint32_t> BlocksToReWrite; - BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); - for (const auto& Entry : BlockIndexToChunkMapIndex) - { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; - if (ChunkMap.empty()) - { - continue; - } - BlocksToReWrite.push_back(BlockIndex); - } - - { - // Any known block not referenced should be added as well - RwLock::SharedLockScope __(m_InsertLock); - for (std::uint32_t BlockIndex : Snapshot.m_BlockIndexes) - { - if (!m_ChunkBlocks.contains(BlockIndex)) - { - continue; - } - bool WasActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex); - if (WasActiveWriteBlock) - { - continue; - } - if (BlockIndexToChunkMapIndex.contains(BlockIndex)) - { - continue; - } - size_t ChunkMapIndex = ChunkMapIndex = BlockKeepChunks.size(); - BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; - BlockKeepChunks.resize(ChunkMapIndex + 1); - BlockDeleteChunks.resize(ChunkMapIndex + 1); - BlocksToReWrite.push_back(BlockIndex); - } - } - - if (DryRun) - { - ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}", - m_BlocksBasePath, - DeleteCount, - NiceBytes(OldTotalSize - NewTotalSize), - TotalChunkCount, - OldTotalSize); - return; - } - - try - { - ZEN_TRACE_CPU("BlockStore::ReclaimSpace::Compact"); - Ref<BlockStoreFile> NewBlockFile; - auto NewBlockFileGuard = MakeGuard([&]() { - if (NewBlockFile && NewBlockFile->IsOpen()) - { - ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); - m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); - ZEN_ASSERT_SLOW(NewBlockFile->MetaSize() == 0); - NewBlockFile->MarkAsDeleteOnClose(); - } - }); - - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; - for (uint32_t BlockIndex : BlocksToReWrite) - { - bool IsActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex); - - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - - Ref<BlockStoreFile> OldBlockFile; - if (!IsActiveWriteBlock) - { - RwLock::SharedLockScope _i(m_InsertLock); - Stopwatch Timer; - const auto __ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) - { - OldBlockFile = It->second; - } - } - - ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; - if (KeepMap.empty()) - { - ZEN_TRACE_CPU("BlockStore::ReclaimSpace::DeleteBlock"); - - const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; - for (size_t DeleteIndex : DeleteMap) - { - DeletedSize += ChunkLocations[DeleteIndex].Size; - } - ChangeCallback({}, DeleteMap); - DeletedCount += DeleteMap.size(); - if (OldBlockFile) - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - Stopwatch Timer; - const auto __ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); - m_ChunkBlocks.erase(BlockIndex); - m_TotalSize.fetch_sub(OldBlockFile->TotalSize(), std::memory_order::relaxed); - OldBlockFile->MarkAsDeleteOnClose(); - } - continue; - } - else if (!OldBlockFile && !IsActiveWriteBlock) - { - // If the block file pointed to does not exist, move any keep chunk them to deleted list - ZEN_ERROR("Expected to find block {} in {} - this should never happen, marking {} entries as deleted.", - BlockIndex, - m_BlocksBasePath, - KeepMap.size()); - - BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end()); - KeepMap.clear(); - } - else if (OldBlockFile && (OldBlockFile->FileSize() == 0)) - { - // Block created to accommodate missing blocks - ZEN_WARN("Missing block {} in {} - backing data for locations is missing, marking {} entries as deleted.", - BlockIndex, - m_BlocksBasePath, - KeepMap.size()); - - BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end()); - KeepMap.clear(); - } - - MovedChunksArray MovedChunks; - if (OldBlockFile) - { - ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock"); - - ZEN_INFO("Moving {} chunks from '{}' to new block", KeepMap.size(), GetBlockPath(m_BlocksBasePath, BlockIndex)); - - uint64_t OldBlockSize = OldBlockFile->FileSize(); - std::vector<uint8_t> Chunk; - for (const size_t& ChunkIndex : KeepMap) - { - const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; - if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) - { - ZEN_WARN( - "ReclaimSpace skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " - "size {}", - m_BlocksBasePath, - ChunkLocation.Offset, - ChunkLocation.Size, - OldBlockFile->GetPath(), - OldBlockSize); - continue; - } - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), ChunkLocation.Size, ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + ChunkLocation.Size > m_MaxBlockSize)) - { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - - if (NewBlockFile) - { - ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); - NewBlockFile = nullptr; - } - { - ChangeCallback(MovedChunks, {}); - MovedCount += KeepMap.size(); - MovedChunks.clear(); - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - - std::filesystem::path NewBlockPath; - NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); - if (NextBlockIndex == (uint32_t)m_MaxBlockCount) - { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BlocksBasePath, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); - return; - } - if (Space.Free < m_MaxBlockSize) - { - uint64_t ReclaimedSpace = DiskReserveCallback(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) - { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_BlocksBasePath, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - RwLock::ExclusiveLockScope _l(m_InsertLock); - Stopwatch Timer; - const auto __ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); - ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); - m_ChunkBlocks.erase(NextBlockIndex); - return; - } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_BlocksBasePath, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); - } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - } - - NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); - MovedChunks.push_back( - {ChunkIndex, - {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}}); - uint64_t OldOffset = WriteOffset; - WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment); - m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed); - } - Chunk.clear(); - if (NewBlockFile) - { - ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); - } - } - - const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; - for (size_t DeleteIndex : DeleteMap) - { - DeletedSize += ChunkLocations[DeleteIndex].Size; - } - - ChangeCallback(MovedChunks, DeleteMap); - MovedCount += MovedChunks.size(); - DeletedCount += DeleteMap.size(); - MovedChunks.clear(); - - if (OldBlockFile) - { - RwLock::ExclusiveLockScope __(m_InsertLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); - m_ChunkBlocks.erase(BlockIndex); - m_TotalSize.fetch_sub(OldBlockFile->TotalSize(), std::memory_order::relaxed); - OldBlockFile->MarkAsDeleteOnClose(); - } - } - if (NewBlockFile) - { - ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); - NewBlockFile = nullptr; - } - } - catch (const std::system_error& SystemError) - { - if (IsOOM(SystemError.code())) - { - ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, SystemError.what()); - } - else if (IsOOD(SystemError.code())) - { - ZEN_WARN("reclaiming space for '{}' ran out of disk space: '{}'", m_BlocksBasePath, SystemError.what()); - } - else - { - ZEN_ERROR("reclaiming space for '{}' failed with system error exception: '{}'", m_BlocksBasePath, SystemError.what()); - } - } - catch (const std::bad_alloc& BadAlloc) - { - ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, BadAlloc.what()); - } - catch (const std::exception& ex) - { - ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); - } -} - bool BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, std::span<const size_t> InChunkIndexes, @@ -1143,6 +726,7 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, const IterateChunksLargeSizeCallback& LargeSizeCallback, uint64_t LargeSizeLimit) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::IterateBlock"); if (InChunkIndexes.empty()) @@ -1150,26 +734,27 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, return true; } - uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit); - if (LargeSizeLimit == 0u) + if (LargeSizeLimit == 0) { - LargeSizeLimit = IterateSmallChunkWindowSize; - } - else - { - IterateSmallChunkWindowSize = - Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize); + LargeSizeLimit = DefaultIterateSmallChunkWindowSize; } + uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit); + + const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 512u); + + IterateSmallChunkWindowSize = Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize); + uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex; std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end()); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset; }); - auto GetNextRange = [LargeSizeLimit, IterateSmallChunkWindowSize, &ChunkLocations](uint64_t BlockFileSize, - std::span<const size_t> ChunkIndexes, - size_t StartIndexOffset) -> size_t { + auto GetNextRange = [LargeSizeLimit, + IterateSmallChunkWindowSize, + IterateSmallChunkMaxGapSize, + &ChunkLocations](uint64_t BlockFileSize, std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; size_t StartIndex = ChunkIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; @@ -1224,8 +809,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, ZEN_ASSERT(BlockFile); InsertLock.ReleaseNow(); - IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + IoBuffer ReadBuffer; + void* BufferBase = nullptr; size_t LocationIndexOffset = 0; while (LocationIndexOffset < ChunkIndexes.size()) @@ -1235,11 +820,16 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, const size_t BlockSize = BlockFile->FileSize(); const size_t RangeCount = GetNextRange(BlockSize, ChunkIndexes, LocationIndexOffset); - if (RangeCount > 0) + if (RangeCount > 1) { size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + if (ReadBuffer.GetSize() < Size) + { + ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize)); + BufferBase = ReadBuffer.MutableData(); + } BlockFile->Read(BufferBase, Size, FirstLocation.Offset); for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { @@ -1293,6 +883,7 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, bool BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::IterateChunks"); Stopwatch Timer; @@ -1302,31 +893,39 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); - tsl::robin_map<uint32_t, size_t> BlockIndexToBlockChunks; - std::vector<std::vector<size_t>> BlocksChunks; - + std::vector<size_t> ChunkOrder(ChunkLocations.size()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { - const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; - if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) - { - BlocksChunks[It->second].push_back(ChunkIndex); - } - else + ChunkOrder[ChunkIndex] = ChunkIndex; + } + + std::sort(ChunkOrder.begin(), ChunkOrder.end(), [&ChunkLocations](const size_t Lhs, const size_t Rhs) { + return ChunkLocations[Lhs].BlockIndex < ChunkLocations[Rhs].BlockIndex; + }); + size_t RangeStart = 0; + size_t RangeEnd = 0; + const std::span<size_t> ChunkIndexRange(ChunkOrder); + while (RangeStart < ChunkOrder.size()) + { + const size_t ChunkIndex = ChunkOrder[RangeStart]; + const uint32_t BlockIndex = ChunkLocations[ChunkIndex].BlockIndex; + RangeEnd++; + while (RangeEnd < ChunkOrder.size()) { - BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); - BlocksChunks.push_back(std::vector<size_t>({ChunkIndex})); + const size_t NextChunkIndex = ChunkOrder[RangeEnd]; + if (ChunkLocations[NextChunkIndex].BlockIndex != BlockIndex) + { + break; + } + ++RangeEnd; } - } - for (auto& BlockChunks : BlocksChunks) - { - ZEN_ASSERT(!BlockChunks.empty()); - uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex; - if (!Callback(BlockIndex, BlockChunks)) + if (!Callback(BlockIndex, ChunkIndexRange.subspan(RangeStart, RangeEnd - RangeStart))) { return false; } + + RangeStart = RangeEnd; } return true; } @@ -1338,6 +937,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, const ClaimDiskReserveCallback& DiskReserveCallback, std::string_view LogPrefix) { + ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::CompactBlocks"); uint64_t DeletedSize = 0; @@ -1581,7 +1181,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, KeepChunkIndexes.size(), NiceBytes(MovedFromBlock), GetBlockPath(m_BlocksBasePath, BlockIndex).filename(), - NiceBytes(OldBlockSize - MovedFromBlock)); + OldBlockSize > MovedFromBlock ? NiceBytes(OldBlockSize - MovedFromBlock) : 0); } if (TargetFileBuffer) { @@ -1934,53 +1534,6 @@ TEST_CASE("blockstore.chunks") CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); } -TEST_CASE("blockstore.clean.stray.blocks") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", 128, 1024); - - std::string FirstChunkData = "This is the data of the first chunk that we will write"; - BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); - std::string SecondChunkData = "This is the data for the second chunk that we will write"; - BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); - std::string ThirdChunkData = - "This is a much longer string that will not fit in the first block so it should be placed in the second block"; - BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); - - Store.Close(); - - Store.Initialize(RootDirectory / "store", 128, 1024); - CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); - IoBuffer ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); - CHECK(ThirdChunk); - - // Reclaim space should delete unreferenced block - Store.ReclaimSpace(Store.GetReclaimSnapshotState(), {FirstChunkLocation, SecondChunkLocation}, {0, 1}, 4, false); - // Block lives on as long as we reference it via ThirdChunk - CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); - ThirdChunk = {}; - CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); - ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); - CHECK(!ThirdChunk); - - // Recreate a fake block for a missing chunk location - BlockStore::BlockIndexSet KnownBlocks; - KnownBlocks.Add(FirstChunkLocation.BlockIndex); - KnownBlocks.Add(SecondChunkLocation.BlockIndex); - KnownBlocks.Add(ThirdChunkLocation.BlockIndex); - Store.SyncExistingBlocksOnDisk(KnownBlocks); - - // We create a fake block for the location - we should still not be able to get the chunk - CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); - ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); - CHECK(!ThirdChunk); -} - TEST_CASE("blockstore.flush.force.new.block") { using namespace blockstore::impl; @@ -2113,7 +1666,8 @@ TEST_CASE("blockstore.iterate.chunks") break; } return true; - }); + }, + 0); CHECK(Continue); }); return true; @@ -2122,125 +1676,6 @@ TEST_CASE("blockstore.iterate.chunks") WorkLatch.Wait(); } -TEST_CASE("blockstore.reclaim.space") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", 512, 1024); - - constexpr size_t ChunkCount = 200; - constexpr size_t Alignment = 8; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkHashes; - ChunkLocations.reserve(ChunkCount); - ChunkHashes.reserve(ChunkCount); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex); - - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - std::vector<size_t> ChunksToKeep; - ChunksToKeep.reserve(ChunkLocations.size()); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - ChunksToKeep.push_back(ChunkIndex); - } - - Store.Flush(/*ForceNewBlock*/ false); - BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); - Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); - - // If we keep all the chunks we should not get any callbacks on moved/deleted stuff - Store.ReclaimSpace( - State1, - ChunkLocations, - ChunksToKeep, - Alignment, - false, - [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, - []() { - CHECK(false); - return 0; - }); - - size_t DeleteChunkCount = 38; - ChunksToKeep.clear(); - for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) - { - ChunksToKeep.push_back(ChunkIndex); - } - - std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations; - size_t MovedChunkCount = 0; - size_t DeletedChunkCount = 0; - Store.ReclaimSpace( - State1, - ChunkLocations, - ChunksToKeep, - Alignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { - for (const auto& MovedChunk : MovedChunks) - { - CHECK(MovedChunk.first >= DeleteChunkCount); - NewChunkLocations[MovedChunk.first] = MovedChunk.second; - } - MovedChunkCount += MovedChunks.size(); - for (size_t DeletedIndex : DeletedChunks) - { - CHECK(DeletedIndex < DeleteChunkCount); - } - DeletedChunkCount += DeletedChunks.size(); - }, - []() { - CHECK(false); - return 0; - }); - CHECK(MovedChunkCount <= DeleteChunkCount); - CHECK(DeletedChunkCount == DeleteChunkCount); - ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); - - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); - if (ChunkIndex >= DeleteChunkCount) - { - IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - } - } - - // We need to take a new state since reclaim space add new block when compacting - BlockStore::ReclaimSnapshotState State2 = Store.GetReclaimSnapshotState(); - NewChunkLocations = ChunkLocations; - MovedChunkCount = 0; - DeletedChunkCount = 0; - Store.ReclaimSpace( - State2, - ChunkLocations, - {}, - Alignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { - CHECK(MovedChunks.empty()); - DeletedChunkCount += DeletedChunks.size(); - }, - []() { - CHECK(false); - return 0; - }); - CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); -} - TEST_CASE("blockstore.thread.read.write") { using namespace blockstore::impl; @@ -2400,12 +1835,11 @@ TEST_CASE("blockstore.compact.blocks") } SUBCASE("keep current write block") { - uint64_t PreSize = Store.TotalSize(); - BlockStoreCompactState State; - BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); + uint64_t PreSize = Store.TotalSize(); + BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { - if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) + if (Store.IsWriting(Location.BlockIndex)) { continue; } @@ -2429,9 +1863,8 @@ TEST_CASE("blockstore.compact.blocks") { Store.Flush(true); - uint64_t PreSize = Store.TotalSize(); - BlockStoreCompactState State; - BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); + uint64_t PreSize = Store.TotalSize(); + BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { State.AddKeepLocation(Location); @@ -2451,11 +1884,10 @@ TEST_CASE("blockstore.compact.blocks") } SUBCASE("drop first block") { - uint64_t PreSize = Store.TotalSize(); - BlockStoreCompactState State; - BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); + uint64_t PreSize = Store.TotalSize(); + BlockStoreCompactState State; - CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0)); + CHECK(!Store.IsWriting(0)); State.IncludeBlock(0); uint64_t FirstBlockSize = 0; @@ -2485,11 +1917,10 @@ TEST_CASE("blockstore.compact.blocks") } SUBCASE("compact first block") { - uint64_t PreSize = Store.TotalSize(); - BlockStoreCompactState State; - BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); + uint64_t PreSize = Store.TotalSize(); + BlockStoreCompactState State; - CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0)); + CHECK(!Store.IsWriting(0)); State.IncludeBlock(0); uint64_t SkipChunkCount = 2; @@ -2544,14 +1975,13 @@ TEST_CASE("blockstore.compact.blocks") } SUBCASE("compact every other item") { - uint64_t PreSize = Store.TotalSize(); - BlockStoreCompactState State; - BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); - bool SkipFlag = false; + uint64_t PreSize = Store.TotalSize(); + BlockStoreCompactState State; + bool SkipFlag = false; for (const BlockStoreLocation& Location : ChunkLocations) { - if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) + if (Store.IsWriting(Location.BlockIndex)) { continue; } @@ -2568,7 +1998,7 @@ TEST_CASE("blockstore.compact.blocks") std::vector<BlockStoreLocation> DroppedLocations; for (const BlockStoreLocation& Location : ChunkLocations) { - if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) + if (Store.IsWriting(Location.BlockIndex)) { continue; } @@ -2605,7 +2035,7 @@ TEST_CASE("blockstore.compact.blocks") for (size_t Index = 0; Index < ChunkLocations.size(); Index++) { const BlockStoreLocation& Location = ChunkLocations[Index]; - if (SkipFlag && !SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) + if (SkipFlag && !Store.IsWriting(Location.BlockIndex)) { CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), Location) != DroppedLocations.end()); CHECK(!Store.TryGetChunk(Location)); |