// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { ////////////////////////////////////////////////////////////////////////// BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) { } BlockStoreFile::~BlockStoreFile() { m_IoBuffer = IoBuffer(); m_File.Detach(); } const std::filesystem::path& BlockStoreFile::GetPath() const { return m_Path; } void BlockStoreFile::Open() { ZEN_TRACE_CPU("BlockStoreFile::Open"); uint32_t RetriesLeft = 3; m_File.Open(m_Path, BasicFile::Mode::kDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); void* FileHandle = m_File.Handle(); m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true); } void BlockStoreFile::Create(uint64_t InitialSize) { ZEN_TRACE_CPU("BlockStoreFile::Create"); auto ParentPath = m_Path.parent_path(); if (!std::filesystem::is_directory(ParentPath)) { CreateDirectories(ParentPath); } uint32_t RetriesLeft = 3; m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to create cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); void* FileHandle = m_File.Handle(); // We map our m_IoBuffer beyond the file size as we will grow it over time and want // to be able to create sub-buffers of all the written range later m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false); } uint64_t BlockStoreFile::FileSize() { return m_File.FileSize(); } void BlockStoreFile::MarkAsDeleteOnClose() { m_IoBuffer.SetDeleteOnClose(true); } IoBuffer BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) { if (Offset + Size > m_IoBuffer.GetSize()) { return {}; } return IoBuffer(m_IoBuffer, Offset, Size); } void BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) { m_File.Read(Data, Size, FileOffset); } void BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) { ZEN_TRACE_CPU("BlockStoreFile::Write"); m_File.Write(Data, Size, FileOffset); } void BlockStoreFile::Flush() { ZEN_TRACE_CPU("BlockStoreFile::Flush"); m_File.Flush(); } BasicFile& BlockStoreFile::GetBasicFile() { return m_File; } void BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function&& ChunkFun) { m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } bool BlockStoreFile::IsOpen() const { return !!m_IoBuffer; } constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; BlockStore::BlockStore() { } BlockStore::~BlockStore() { } void BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount) { ZEN_TRACE_CPU("BlockStore::Initialize"); ZEN_ASSERT(MaxBlockSize > 0); ZEN_ASSERT(MaxBlockCount > 0); ZEN_ASSERT(IsPow2(MaxBlockCount)); std::unordered_map FoundBlocks; m_TotalSize = 0; m_BlocksBasePath = BlocksBasePath; m_MaxBlockSize = MaxBlockSize; m_MaxBlockCount = MaxBlockCount; if (std::filesystem::is_directory(m_BlocksBasePath)) { uint32_t NextBlockIndex = 0; std::vector FoldersToScan; FoldersToScan.push_back(m_BlocksBasePath); size_t FolderOffset = 0; while (FolderOffset < FoldersToScan.size()) { for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) { if (Entry.is_directory()) { FoldersToScan.push_back(Entry.path()); continue; } if (Entry.is_regular_file()) { const std::filesystem::path Path = Entry.path(); if (Path.extension() != GetBlockFileExtension()) { continue; } std::string FileName = PathToUtf8(Path.stem()); uint32_t BlockIndex; bool OK = ParseHexNumber(FileName, BlockIndex); if (!OK) { continue; } Ref BlockFile{new BlockStoreFile(Path)}; BlockFile->Open(); m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); m_ChunkBlocks[BlockIndex] = BlockFile; FoundBlocks[BlockIndex] = BlockFile->FileSize(); if (BlockIndex >= NextBlockIndex) { NextBlockIndex = (BlockIndex + 1) & (m_MaxBlockCount - 1); } } } ++FolderOffset; } m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release); } else { CreateDirectories(m_BlocksBasePath); } } void BlockStore::BlockIndexSet::Add(uint32_t BlockIndex) { if (!std::binary_search(begin(BlockIndexes), end(BlockIndexes), BlockIndex)) { auto It = std::lower_bound(begin(BlockIndexes), end(BlockIndexes), BlockIndex); BlockIndexes.insert(It, BlockIndex); } } void BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) { ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk"); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); tsl::robin_set MissingBlocks; tsl::robin_set DeleteBlocks; DeleteBlocks.reserve(m_ChunkBlocks.size()); for (auto It : m_ChunkBlocks) { DeleteBlocks.insert(It.first); } for (const uint32_t BlockIndex : KnownLocations.GetBlockIndices()) { DeleteBlocks.erase(BlockIndex); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull()) { continue; } else { MissingBlocks.insert(BlockIndex); } } for (std::uint32_t BlockIndex : MissingBlocks) { std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); Ref NewBlockFile(new BlockStoreFile(BlockPath)); NewBlockFile->Create(0); m_ChunkBlocks[BlockIndex] = NewBlockFile; } for (std::uint32_t BlockIndex : DeleteBlocks) { std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); if (m_ChunkBlocks[BlockIndex]) { m_TotalSize.fetch_sub(m_ChunkBlocks[BlockIndex]->FileSize(), std::memory_order::relaxed); m_ChunkBlocks[BlockIndex]->MarkAsDeleteOnClose(); } m_ChunkBlocks.erase(BlockIndex); } } BlockStore::BlockEntryCountMap BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent) { ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact"); BlockEntryCountMap Result; { RwLock::SharedLockScope InsertLock(m_InsertLock); for (const auto& It : m_ChunkBlocks) { uint32_t BlockIndex = It.first; if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock) { continue; } if (std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), BlockIndex) != m_ActiveWriteBlocks.end()) { continue; } uint64_t UsedSize = 0; uint32_t UsedCount = 0; if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end()) { UsedSize = UsageIt->second.DiskUsage; UsedCount = UsageIt->second.EntryCount; } uint64_t BlockSize = It.second ? It.second->FileSize() : 0u; if (BlockSize == 0) { Result.insert_or_assign(BlockIndex, UsedCount); continue; } if (BlockUsageThresholdPercent == 100) { if (UsedSize < BlockSize) { Result.insert_or_assign(BlockIndex, UsedCount); } } else if (BlockUsageThresholdPercent == 0) { if (UsedSize == 0) { Result.insert_or_assign(BlockIndex, UsedCount); } } else { const uint32_t UsedPercent = UsedSize < BlockSize ? gsl::narrow((100 * UsedSize) / BlockSize) : 100u; if (UsedPercent < BlockUsageThresholdPercent) { Result.insert_or_assign(BlockIndex, UsedCount); } } } } return Result; } void BlockStore::Close() { ZEN_TRACE_CPU("BlockStore::Close"); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; m_WriteBlockIndex = 0; m_ChunkBlocks.clear(); m_BlocksBasePath.clear(); } uint32_t BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const { if (m_ChunkBlocks.size() == m_MaxBlockCount) { return (uint32_t)m_MaxBlockCount; } while (true) { if (!m_ChunkBlocks.contains(ProbeIndex)) { OutBlockPath = GetBlockPath(m_BlocksBasePath, ProbeIndex); std::error_code Ec; bool Exists = std::filesystem::exists(OutBlockPath, Ec); if (Ec) { ZEN_WARN("Failed to probe existence of file '{}' when trying to allocate a new block. Reason: '{}'", OutBlockPath, Ec.message()); return (uint32_t)m_MaxBlockCount; } if (!Exists) { return ProbeIndex; } } ProbeIndex = (ProbeIndex + 1) & (m_MaxBlockCount - 1); } return ProbeIndex; } void BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback) { ZEN_TRACE_CPU("BlockStore::WriteChunk"); ZEN_ASSERT(Data != nullptr); ZEN_ASSERT(Size > 0u); ZEN_ASSERT(Size <= m_MaxBlockSize); ZEN_ASSERT(Alignment > 0u); uint32_t ChunkSize = gsl::narrow(Size); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); bool IsWriting = !!m_WriteBlock; uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment); if (!IsWriting || (AlignedInsertOffset + ChunkSize) > m_MaxBlockSize) { if (m_WriteBlock) { m_WriteBlock->Flush(); m_WriteBlock = nullptr; } WriteBlockIndex += IsWriting ? 1 : 0; std::filesystem::path BlockPath; WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); if (WriteBlockIndex == (uint32_t)m_MaxBlockCount) { throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); } Ref NewBlockFile(new BlockStoreFile(BlockPath)); NewBlockFile->Create(m_MaxBlockSize); m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; m_WriteBlock = NewBlockFile; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; AlignedInsertOffset = 0; } uint32_t AlignedWriteSize = AlignedInsertOffset - m_CurrentInsertOffset + ChunkSize; m_CurrentInsertOffset = AlignedInsertOffset + ChunkSize; Ref WriteBlock = m_WriteBlock; m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); WriteBlock->Write(Data, ChunkSize, AlignedInsertOffset); m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); Callback({.BlockIndex = WriteBlockIndex, .Offset = AlignedInsertOffset, .Size = ChunkSize}); { RwLock::ExclusiveLockScope _(m_InsertLock); m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); } } BlockStore::ReclaimSnapshotState BlockStore::GetReclaimSnapshotState() { ReclaimSnapshotState State; RwLock::SharedLockScope _(m_InsertLock); for (uint32_t BlockIndex : m_ActiveWriteBlocks) { State.m_ActiveWriteBlocks.insert(BlockIndex); } if (m_WriteBlock) { State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex); } for (auto It : m_ChunkBlocks) { State.m_BlockIndexes.insert(It.first); } return State; } IoBuffer BlockStore::TryGetChunk(const BlockStoreLocation& Location) const { ZEN_TRACE_CPU("BlockStore::TryGetChunk"); RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { if (const Ref& Block = BlockIt->second; Block) { IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size); if (Chunk.GetSize() == Location.Size) { return Chunk; } } } return IoBuffer(); } void BlockStore::Flush(bool ForceNewBlock) { ZEN_TRACE_CPU("BlockStore::Flush"); if (ForceNewBlock) { RwLock::ExclusiveLockScope _(m_InsertLock); if (m_CurrentInsertOffset > 0) { if (m_WriteBlock) { m_WriteBlock->Flush(); } m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; } return; } RwLock::SharedLockScope _(m_InsertLock); if (m_WriteBlock) { m_WriteBlock->Flush(); } } void BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, const std::vector& ChunkLocations, const ChunkIndexArray& KeepChunkIndexes, uint32_t PayloadAlignment, bool DryRun, const ReclaimCallback& ChangeCallback, const ClaimDiskReserveCallback& DiskReserveCallback) { ZEN_TRACE_CPU("BlockStore::ReclaimSpace"); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = ChunkLocations.size(); uint64_t DeletedSize = 0; uint64_t OldTotalSize = 0; uint64_t NewTotalSize = 0; uint64_t MovedCount = 0; uint64_t DeletedCount = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG( "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " "chunks ({}).", m_BlocksBasePath, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedCount, MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); }); size_t BlockCount = Snapshot.m_BlockIndexes.size(); if (BlockCount == 0) { ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath); return; } tsl::robin_set KeepChunkMap; KeepChunkMap.reserve(KeepChunkIndexes.size()); for (size_t KeepChunkIndex : KeepChunkIndexes) { KeepChunkMap.insert(KeepChunkIndex); } tsl::robin_map BlockIndexToChunkMapIndex; std::vector BlockKeepChunks; std::vector BlockDeleteChunks; BlockIndexToChunkMapIndex.reserve(BlockCount); BlockKeepChunks.reserve(BlockCount); BlockDeleteChunks.reserve(BlockCount); size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; size_t DeleteCount = 0; for (size_t Index = 0; Index < TotalChunkCount; ++Index) { const BlockStoreLocation& Location = ChunkLocations[Index]; if (!Snapshot.m_BlockIndexes.contains(Location.BlockIndex)) { // We did not know about the block when we took the snapshot, don't touch it continue; } OldTotalSize += Location.Size; auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); size_t ChunkMapIndex = 0; if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) { ChunkMapIndex = BlockKeepChunks.size(); BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; BlockKeepChunks.resize(ChunkMapIndex + 1); BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); BlockDeleteChunks.resize(ChunkMapIndex + 1); BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); } else { ChunkMapIndex = BlockIndexPtr->second; } if (KeepChunkMap.contains(Index)) { ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; IndexMap.push_back(Index); NewTotalSize += Location.Size; continue; } ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; IndexMap.push_back(Index); DeleteCount++; } std::vector BlocksToReWrite; BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); for (const auto& Entry : BlockIndexToChunkMapIndex) { uint32_t BlockIndex = Entry.first; size_t ChunkMapIndex = Entry.second; const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; if (ChunkMap.empty()) { continue; } BlocksToReWrite.push_back(BlockIndex); } { // Any known block not referenced should be added as well RwLock::SharedLockScope __(m_InsertLock); for (std::uint32_t BlockIndex : Snapshot.m_BlockIndexes) { if (!m_ChunkBlocks.contains(BlockIndex)) { continue; } bool WasActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex); if (WasActiveWriteBlock) { continue; } if (BlockIndexToChunkMapIndex.contains(BlockIndex)) { continue; } size_t ChunkMapIndex = ChunkMapIndex = BlockKeepChunks.size(); BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; BlockKeepChunks.resize(ChunkMapIndex + 1); BlockDeleteChunks.resize(ChunkMapIndex + 1); BlocksToReWrite.push_back(BlockIndex); } } if (DryRun) { ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}", m_BlocksBasePath, DeleteCount, NiceBytes(OldTotalSize - NewTotalSize), TotalChunkCount, OldTotalSize); return; } try { ZEN_TRACE_CPU("BlockStore::ReclaimSpace::Compact"); Ref NewBlockFile; auto NewBlockFileGuard = MakeGuard([&]() { if (NewBlockFile && NewBlockFile->IsOpen()) { ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); NewBlockFile->MarkAsDeleteOnClose(); } }); uint64_t WriteOffset = 0; uint32_t NewBlockIndex = 0; for (uint32_t BlockIndex : BlocksToReWrite) { bool IsActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex); const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; Ref OldBlockFile; if (!IsActiveWriteBlock) { RwLock::SharedLockScope _i(m_InsertLock); Stopwatch Timer; const auto __ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) { OldBlockFile = It->second; } } ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; if (KeepMap.empty()) { ZEN_TRACE_CPU("BlockStore::ReclaimSpace::DeleteBlock"); const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; for (size_t DeleteIndex : DeleteMap) { DeletedSize += ChunkLocations[DeleteIndex].Size; } ChangeCallback({}, DeleteMap); DeletedCount += DeleteMap.size(); if (OldBlockFile) { RwLock::ExclusiveLockScope _i(m_InsertLock); Stopwatch Timer; const auto __ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } continue; } else if (!OldBlockFile && !IsActiveWriteBlock) { // If the block file pointed to does not exist, move any keep chunk them to deleted list ZEN_ERROR("Expected to find block {} in {} - this should never happen, marking {} entries as deleted.", BlockIndex, m_BlocksBasePath, KeepMap.size()); BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end()); KeepMap.clear(); } else if (OldBlockFile && (OldBlockFile->FileSize() == 0)) { // Block created to accommodate missing blocks ZEN_WARN("Missing block {} in {} - backing data for locations is missing, marking {} entries as deleted.", BlockIndex, m_BlocksBasePath, KeepMap.size()); BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end()); KeepMap.clear(); } MovedChunksArray MovedChunks; if (OldBlockFile) { ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock"); ZEN_INFO("Moving {} chunks from '{}' to new block", KeepMap.size(), GetBlockPath(m_BlocksBasePath, BlockIndex)); uint64_t OldBlockSize = OldBlockFile->FileSize(); std::vector Chunk; for (const size_t& ChunkIndex : KeepMap) { const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) { ZEN_WARN( "ReclaimSpace skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " "size {}", m_BlocksBasePath, ChunkLocation.Offset, ChunkLocation.Size, OldBlockFile->GetPath(), OldBlockSize); continue; } Chunk.resize(ChunkLocation.Size); OldBlockFile->Read(Chunk.data(), ChunkLocation.Size, ChunkLocation.Offset); if (!NewBlockFile || (WriteOffset + ChunkLocation.Size > m_MaxBlockSize)) { uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); NewBlockFile = nullptr; } { ChangeCallback(MovedChunks, {}); MovedCount += KeepMap.size(); MovedChunks.clear(); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); std::filesystem::path NewBlockPath; NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); if (NextBlockIndex == (uint32_t)m_MaxBlockCount) { ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", m_BlocksBasePath, static_cast(std::numeric_limits::max()) + 1); return; } NewBlockFile = new BlockStoreFile(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); if (Error) { ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); return; } if (Space.Free < m_MaxBlockSize) { uint64_t ReclaimedSpace = DiskReserveCallback(); if (Space.Free + ReclaimedSpace < m_MaxBlockSize) { ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", m_BlocksBasePath, m_MaxBlockSize, NiceBytes(Space.Free + ReclaimedSpace)); RwLock::ExclusiveLockScope _l(m_InsertLock); Stopwatch Timer; const auto __ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); m_ChunkBlocks.erase(NextBlockIndex); return; } ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", m_BlocksBasePath, ReclaimedSpace, NiceBytes(Space.Free + ReclaimedSpace)); } NewBlockFile->Create(m_MaxBlockSize); NewBlockIndex = NextBlockIndex; WriteOffset = 0; } NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); MovedChunks.push_back( {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow(WriteOffset), .Size = ChunkLocation.Size}}); uint64_t OldOffset = WriteOffset; WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment); m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed); } Chunk.clear(); if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); } } const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; for (size_t DeleteIndex : DeleteMap) { DeletedSize += ChunkLocations[DeleteIndex].Size; } ChangeCallback(MovedChunks, DeleteMap); MovedCount += MovedChunks.size(); DeletedCount += DeleteMap.size(); MovedChunks.clear(); if (OldBlockFile) { RwLock::ExclusiveLockScope __(m_InsertLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } } if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); NewBlockFile = nullptr; } } catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code())) { ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, SystemError.what()); } else if (IsOOD(SystemError.code())) { ZEN_WARN("reclaiming space for '{}' ran out of disk space: '{}'", m_BlocksBasePath, SystemError.what()); } else { ZEN_ERROR("reclaiming space for '{}' failed with system error exception: '{}'", m_BlocksBasePath, SystemError.what()); } } catch (const std::bad_alloc& BadAlloc) { ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, BadAlloc.what()); } catch (const std::exception& ex) { ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); } } void BlockStore::IterateChunks(const std::vector& ChunkLocations, const IterateChunksSmallSizeCallback& SmallSizeCallback, const IterateChunksLargeSizeCallback& LargeSizeCallback) { ZEN_TRACE_CPU("BlockStore::IterateChunks"); ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); std::vector LocationIndexes; LocationIndexes.reserve(ChunkLocations.size()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { LocationIndexes.push_back(ChunkIndex); } std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; if (LocationA.BlockIndex < LocationB.BlockIndex) { return true; } else if (LocationA.BlockIndex > LocationB.BlockIndex) { return false; } return LocationA.Offset < LocationB.Offset; }); IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; void* BufferBase = ReadBuffer.MutableData(); RwLock::SharedLockScope _(m_InsertLock); auto GetNextRange = [&](size_t StartIndexOffset) { size_t ChunkCount = 0; size_t StartIndex = LocationIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; uint64_t StartOffset = StartLocation.Offset; while (StartIndexOffset + ChunkCount < LocationIndexes.size()) { size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount]; const BlockStoreLocation& Location = ChunkLocations[NextIndex]; if (Location.BlockIndex != StartLocation.BlockIndex) { break; } if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize) { break; } ++ChunkCount; } return ChunkCount; }; size_t LocationIndexOffset = 0; while (LocationIndexOffset < LocationIndexes.size()) { size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; const uint32_t BlockIndex = FirstLocation.BlockIndex; auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); if (FindBlockIt == m_ChunkBlocks.end()) { ZEN_LOG_SCOPE("block #{} not available", BlockIndex); while (ChunkLocations[ChunkIndex].BlockIndex == BlockIndex) { SmallSizeCallback(ChunkIndex, nullptr, 0); LocationIndexOffset++; if (LocationIndexOffset == LocationIndexes.size()) { break; } ChunkIndex = LocationIndexes[LocationIndexOffset]; } continue; } const Ref& BlockFile = FindBlockIt->second; ZEN_ASSERT(BlockFile); const size_t BlockSize = BlockFile->FileSize(); const size_t RangeCount = GetNextRange(LocationIndexOffset); if (RangeCount > 0) { size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1]; const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; BlockFile->Read(BufferBase, Size, FirstLocation.Offset); for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex]; const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) { ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", ChunkLocation.Offset, ChunkLocation.Size, BlockIndex, BlockSize); SmallSizeCallback(NextChunkIndex, nullptr, 0); continue; } void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); } LocationIndexOffset += RangeCount; continue; } if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) { ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", FirstLocation.Offset, FirstLocation.Size, BlockIndex, BlockSize); SmallSizeCallback(ChunkIndex, nullptr, 0); LocationIndexOffset++; continue; } LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); LocationIndexOffset++; } } void BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, uint32_t PayloadAlignment, const CompactCallback& ChangeCallback, const ClaimDiskReserveCallback& DiskReserveCallback, std::string_view LogPrefix) { ZEN_TRACE_CPU("BlockStore::CompactBlocks"); uint64_t DeletedSize = 0; uint64_t MovedCount = 0; uint64_t MovedSize = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG("{}Compact blocks for '{}' DONE after {}, deleted {} and moved {} chunks ({}) ", LogPrefix, m_BlocksBasePath, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceBytes(DeletedSize), MovedCount, NiceBytes(MovedSize)); }); uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block uint32_t NewBlockIndex = 0; MovedChunksArray MovedChunks; uint64_t AddedSize = 0; uint64_t RemovedSize = 0; Ref NewBlockFile; std::unique_ptr TargetFileBuffer; auto NewBlockFileGuard = MakeGuard([&]() { TargetFileBuffer.reset(); if (NewBlockFile) { { RwLock::ExclusiveLockScope _l(m_InsertLock); if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile) { m_ChunkBlocks.erase(NewBlockIndex); } } if (NewBlockFile->IsOpen()) { ZEN_DEBUG("{}Dropping incomplete cas block store file '{}'", LogPrefix, NewBlockFile->GetPath()); NewBlockFile->MarkAsDeleteOnClose(); } } }); auto ReportChanges = [&]() -> bool { bool Continue = true; if (!MovedChunks.empty() || RemovedSize > 0) { Continue = ChangeCallback(MovedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0); DeletedSize += RemovedSize; RemovedSize = 0; AddedSize = 0; MovedCount += MovedChunks.size(); MovedChunks.clear(); } return Continue; }; std::vector RemovedBlocks; CompactState.IterateBlocks([&](uint32_t BlockIndex, const std::vector& KeepChunkIndexes, const std::vector& ChunkLocations) -> bool { ZEN_TRACE_CPU("BlockStore::CompactBlock"); Ref OldBlockFile; { RwLock::SharedLockScope _(m_InsertLock); if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock) { ZEN_ERROR("{}compact Block was requested to rewrite the currently active write block in '{}', Block index {}", LogPrefix, m_BlocksBasePath, BlockIndex); return false; } auto It = m_ChunkBlocks.find(BlockIndex); if (It == m_ChunkBlocks.end()) { ZEN_WARN("{}compact Block was requested to rewrite an unknown block in '{}', Block index {}", LogPrefix, m_BlocksBasePath, BlockIndex); return true; } if (!It->second) { ZEN_WARN("{}compact Block was requested to rewrite a deleted block in '{}', Block index {}", LogPrefix, m_BlocksBasePath, BlockIndex); return true; } OldBlockFile = It->second; } ZEN_ASSERT(OldBlockFile); uint64_t OldBlockSize = OldBlockFile->FileSize(); if (KeepChunkIndexes.empty()) { ZEN_INFO("{}dropped all chunks from '{}', freeing {}", LogPrefix, GetBlockPath(m_BlocksBasePath, BlockIndex).filename(), NiceBytes(OldBlockSize)); } else { std::vector SortedChunkIndexes(KeepChunkIndexes); std::sort(SortedChunkIndexes.begin(), SortedChunkIndexes.end(), [&ChunkLocations](size_t Lhs, size_t Rhs) { return ChunkLocations[Lhs].Offset < ChunkLocations[Rhs].Offset; }); BasicFileBuffer SourceFileBuffer(OldBlockFile->GetBasicFile(), Min(65536u, OldBlockSize)); uint64_t MovedFromBlock = 0; std::vector Chunk; for (const size_t& ChunkIndex : SortedChunkIndexes) { const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) { ZEN_WARN( "{}compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " "size {}", LogPrefix, m_BlocksBasePath, ChunkLocation.Offset, ChunkLocation.Size, OldBlockFile->GetPath(), OldBlockSize); continue; } Chunk.resize(ChunkLocation.Size); SourceFileBuffer.Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) { TargetFileBuffer.reset(); if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize)); if (!ReportChanges()) { return false; } } uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); { RwLock::ExclusiveLockScope InsertLock(m_InsertLock); std::filesystem::path NewBlockPath; NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); if (NextBlockIndex == (uint32_t)m_MaxBlockCount) { ZEN_ERROR("{}unable to allocate a new block in '{}', count limit {} exeeded", LogPrefix, m_BlocksBasePath, static_cast(std::numeric_limits::max()) + 1); return false; } NewBlockFile = new BlockStoreFile(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); if (Error) { ZEN_ERROR("{}get disk space in '{}' FAILED, reason: '{}'", LogPrefix, m_BlocksBasePath, Error.message()); { RwLock::ExclusiveLockScope _l(m_InsertLock); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); } ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); NewBlockFile = nullptr; return false; } if (Space.Free < m_MaxBlockSize) { uint64_t ReclaimedSpace = DiskReserveCallback(); if (Space.Free + ReclaimedSpace < m_MaxBlockSize) { ZEN_WARN("{}garbage collect for '{}' FAILED, required disk space {}, free {}", LogPrefix, m_BlocksBasePath, m_MaxBlockSize, NiceBytes(Space.Free + ReclaimedSpace)); { RwLock::ExclusiveLockScope _l(m_InsertLock); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); } ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); NewBlockFile = nullptr; return false; } ZEN_INFO("{}using gc reserve for '{}', reclaimed {}, disk free {}", LogPrefix, m_BlocksBasePath, ReclaimedSpace, NiceBytes(Space.Free + ReclaimedSpace)); } NewBlockFile->Create(m_MaxBlockSize); NewBlockIndex = NextBlockIndex; WriteOffset = 0; TargetFileBuffer = std::make_unique(NewBlockFile->GetBasicFile(), Min(65536u, m_MaxBlockSize)); } TargetFileBuffer->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); MovedChunks.push_back( {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow(WriteOffset), .Size = ChunkLocation.Size}}); uint64_t WriteEndOffset = WriteOffset + ChunkLocation.Size; MovedFromBlock += (WriteEndOffset - WriteOffset); WriteOffset = RoundUp(WriteEndOffset, PayloadAlignment); AddedSize += Chunk.size(); } ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}", LogPrefix, KeepChunkIndexes.size(), NiceBytes(MovedFromBlock), GetBlockPath(m_BlocksBasePath, BlockIndex).filename(), NiceBytes(OldBlockSize - MovedFromBlock)); } if (TargetFileBuffer) { TargetFileBuffer->Flush(); } if (!ReportChanges()) { return false; } { RwLock::ExclusiveLockScope InsertLock(m_InsertLock); ZEN_DEBUG("{}marking cas block store file '{}' for delete ({})", LogPrefix, OldBlockFile->GetPath().filename(), NiceBytes(OldBlockSize)); OldBlockFile->MarkAsDeleteOnClose(); m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockSize); RemovedSize += OldBlockSize; } return true; }); if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize)); } ReportChanges(); } const char* BlockStore::GetBlockFileExtension() { return ".ucas"; } std::filesystem::path BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) { ExtendablePathBuilder<256> Path; char BlockHexString[9]; ToHexNumber(BlockIndex, BlockHexString); Path.Append(BlocksBasePath); Path.AppendSeparator(); Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); Path.AppendSeparator(); Path.Append(BlockHexString); Path.Append(GetBlockFileExtension()); return Path.ToPath(); } Ref BlockStore::GetBlockFile(uint32_t BlockIndex) { RwLock::SharedLockScope _(m_InsertLock); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) { return It->second; } return {}; } #if ZEN_WITH_TESTS TEST_CASE("blockstore.blockstoredisklocation") { BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits::max()}; CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); BlockStoreLocation MaxBlockIndexAndOffset = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = std::numeric_limits::max()}; CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4096, .Size = std::numeric_limits::max()}; CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, .Size = std::numeric_limits::max() / 2}; CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); } TEST_CASE("blockstore.blockfile") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path() / "blocks"; CreateDirectories(RootDirectory); { BlockStoreFile File1(RootDirectory / "1"); File1.Create(16384); CHECK(File1.FileSize() == 0); File1.Write("data", 5, 0); IoBuffer DataChunk = File1.GetChunk(0, 5); File1.Write("boop", 5, 5); IoBuffer BoopChunk = File1.GetChunk(5, 5); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); File1.Flush(); CHECK(File1.FileSize() == 10); } { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); char DataRaw[5]; File1.Read(DataRaw, 5, 0); CHECK(std::string(DataRaw) == "data"); IoBuffer DataChunk = File1.GetChunk(0, 5); char BoopRaw[5]; File1.Read(BoopRaw, 5, 5); CHECK(std::string(BoopRaw) == "boop"); IoBuffer BoopChunk = File1.GetChunk(5, 5); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } { IoBuffer DataChunk; IoBuffer BoopChunk; { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); DataChunk = File1.GetChunk(0, 5); BoopChunk = File1.GetChunk(5, 5); } CHECK(std::filesystem::exists(RootDirectory / "1")); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } CHECK(std::filesystem::exists(RootDirectory / "1")); { IoBuffer DataChunk; IoBuffer BoopChunk; { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); File1.MarkAsDeleteOnClose(); DataChunk = File1.GetChunk(0, 5); BoopChunk = File1.GetChunk(5, 5); } const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } CHECK(!std::filesystem::exists(RootDirectory / "1")); } namespace blockstore::impl { BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, uint32_t PayloadAlignment) { BlockStoreLocation Location; Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); CHECK(Location.Size == String.length()); return Location; }; std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) { IoBuffer ChunkData = Store.TryGetChunk(Location); if (!ChunkData) { return ""; } std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); return AsString; }; std::vector GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) { DirectoryContent DirectoryContent; GetDirectoryContent(RootDir, DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) | (Directories ? DirectoryContent::IncludeDirsFlag : 0), DirectoryContent); std::vector Result; Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end()); return Result; }; } // namespace blockstore::impl TEST_CASE("blockstore.chunks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory, 128, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); } TEST_CASE("blockstore.clean.stray.blocks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 128, 1024); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); Store.Close(); Store.Initialize(RootDirectory / "store", 128, 1024); CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); IoBuffer ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); CHECK(ThirdChunk); // Reclaim space should delete unreferenced block Store.ReclaimSpace(Store.GetReclaimSnapshotState(), {FirstChunkLocation, SecondChunkLocation}, {0, 1}, 4, false); // Block lives on as long as we reference it via ThirdChunk CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); ThirdChunk = {}; CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); CHECK(!ThirdChunk); // Recreate a fake block for a missing chunk location BlockStore::BlockIndexSet KnownBlocks; KnownBlocks.Add(FirstChunkLocation.BlockIndex); KnownBlocks.Add(SecondChunkLocation.BlockIndex); KnownBlocks.Add(ThirdChunkLocation.BlockIndex); Store.SyncExistingBlocksOnDisk(KnownBlocks); // We create a fake block for the location - we should still not be able to get the chunk CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); CHECK(!ThirdChunk); } TEST_CASE("blockstore.flush.force.new.block") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 128, 1024); std::string FirstChunkData = "This is the data of the first chunk that we will write"; WriteStringAsChunk(Store, FirstChunkData, 4); Store.Flush(/*ForceNewBlock*/ true); std::string SecondChunkData = "This is the data for the second chunk that we will write"; WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ true); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; WriteStringAsChunk(Store, ThirdChunkData, 4); CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); } TEST_CASE("blockstore.iterate.chunks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ false); std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, .Offset = ScrubSmallChunkWindowSize, .Size = ScrubSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; Store.IterateChunks( {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { switch (ChunkIndex) { case 0: CHECK(Data); CHECK(Size == FirstChunkData.size()); CHECK(std::string((const char*)Data, Size) == FirstChunkData); break; case 1: CHECK(Data); CHECK(Size == SecondChunkData.size()); CHECK(std::string((const char*)Data, Size) == SecondChunkData); break; case 2: CHECK(false); break; case 3: CHECK(!Data); break; case 4: CHECK(!Data); break; case 5: CHECK(!Data); break; default: CHECK(false); break; } }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { switch (ChunkIndex) { case 0: case 1: CHECK(false); break; case 2: { CHECK(Size == VeryLargeChunk.size()); char* Buffer = new char[Size]; size_t HashOffset = 0; File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { memcpy(&Buffer[HashOffset], Data, Size); HashOffset += Size; }); CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); delete[] Buffer; } break; case 3: CHECK(false); break; case 4: CHECK(false); break; case 5: CHECK(false); break; default: CHECK(false); break; } }); } TEST_CASE("blockstore.reclaim.space") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 512, 1024); constexpr size_t ChunkCount = 200; constexpr size_t Alignment = 8; std::vector ChunkLocations; std::vector ChunkHashes; ChunkLocations.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex); Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunksToKeep; ChunksToKeep.reserve(ChunkLocations.size()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { ChunksToKeep.push_back(ChunkIndex); } Store.Flush(/*ForceNewBlock*/ false); BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); // If we keep all the chunks we should not get any callbacks on moved/deleted stuff Store.ReclaimSpace( State1, ChunkLocations, ChunksToKeep, Alignment, false, [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, []() { CHECK(false); return 0; }); size_t DeleteChunkCount = 38; ChunksToKeep.clear(); for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) { ChunksToKeep.push_back(ChunkIndex); } std::vector NewChunkLocations = ChunkLocations; size_t MovedChunkCount = 0; size_t DeletedChunkCount = 0; Store.ReclaimSpace( State1, ChunkLocations, ChunksToKeep, Alignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { for (const auto& MovedChunk : MovedChunks) { CHECK(MovedChunk.first >= DeleteChunkCount); NewChunkLocations[MovedChunk.first] = MovedChunk.second; } MovedChunkCount += MovedChunks.size(); for (size_t DeletedIndex : DeletedChunks) { CHECK(DeletedIndex < DeleteChunkCount); } DeletedChunkCount += DeletedChunks.size(); }, []() { CHECK(false); return 0; }); CHECK(MovedChunkCount <= DeleteChunkCount); CHECK(DeletedChunkCount == DeleteChunkCount); ChunkLocations = std::vector(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); if (ChunkIndex >= DeleteChunkCount) { IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); } } // We need to take a new state since reclaim space add new block when compacting BlockStore::ReclaimSnapshotState State2 = Store.GetReclaimSnapshotState(); NewChunkLocations = ChunkLocations; MovedChunkCount = 0; DeletedChunkCount = 0; Store.ReclaimSpace( State2, ChunkLocations, {}, Alignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { CHECK(MovedChunks.empty()); DeletedChunkCount += DeletedChunks.size(); }, []() { CHECK(false); return 0; }); CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); } TEST_CASE("blockstore.thread.read.write") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024); constexpr size_t ChunkCount = 1000; constexpr size_t Alignment = 8; std::vector Chunks; std::vector ChunkHashes; Chunks.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex / 2); Chunks.push_back(Chunk); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunkLocations; ChunkLocations.resize(ChunkCount); WorkerThreadPool WorkerPool(8); std::atomic WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } std::vector SecondChunkLocations; SecondChunkLocations.resize(ChunkCount); WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { SecondChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }); WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size() * 2) { Sleep(1); } } TEST_CASE("blockstore.compact.blocks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024); constexpr size_t ChunkCount = 200; constexpr size_t Alignment = 8; std::vector Chunks; std::vector ChunkHashes; Chunks.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex / 2); Chunks.push_back(Chunk); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunkLocations; ChunkLocations.resize(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); } SUBCASE("touch nothing") { uint64_t PreSize = Store.TotalSize(); CHECK(PreSize > 0); BlockStoreCompactState State; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray&, uint64_t) { CHECK(false); return true; }, []() { CHECK(false); return 0; }); CHECK_EQ(PreSize, Store.TotalSize()); } SUBCASE("keep nothing") { Store.Flush(true); uint64_t PreSize = Store.TotalSize(); CHECK(PreSize > 0); BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { State.IncludeBlock(Location.BlockIndex); } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { RemovedSize += Removed; CHECK(Moved.empty()); return true; }, []() { return 0; }); CHECK_EQ(RemovedSize, PreSize); CHECK_EQ(0u, Store.TotalSize()); } SUBCASE("keep current write block") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); for (const BlockStoreLocation& Location : ChunkLocations) { if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { continue; } State.IncludeBlock(Location.BlockIndex); } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { RemovedSize += Removed; CHECK(Moved.empty()); return true; }, []() { return 0; }); CHECK_EQ(Store.TotalSize() + RemovedSize, PreSize); CHECK_LE(Store.TotalSize(), 1088); CHECK_GT(Store.TotalSize(), 0); } SUBCASE("keep everthing") { Store.Flush(true); uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); for (const BlockStoreLocation& Location : ChunkLocations) { State.AddKeepLocation(Location); } Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray&, uint64_t) { CHECK(false); return true; }, []() { CHECK(false); return 0; }); CHECK_EQ(Store.TotalSize(), PreSize); } SUBCASE("drop first block") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0)); State.IncludeBlock(0); uint64_t FirstBlockSize = 0; for (const BlockStoreLocation& Location : ChunkLocations) { if (Location.BlockIndex == 0) { FirstBlockSize = Max(FirstBlockSize, Location.Offset + Location.Size); } } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { CHECK(Moved.empty()); RemovedSize += Removed; return true; }, []() { CHECK(false); return 0; }); CHECK_EQ(FirstBlockSize, RemovedSize); CHECK_EQ(Store.TotalSize(), PreSize - FirstBlockSize); } SUBCASE("compact first block") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0)); State.IncludeBlock(0); uint64_t SkipChunkCount = 2; std::vector DroppedLocations(ChunkLocations.begin(), ChunkLocations.begin() + 2); for (auto It = ChunkLocations.begin() + 2; It != ChunkLocations.end(); It++) { const BlockStoreLocation& Location = *It; if (Location.BlockIndex != 0) { continue; } State.AddKeepLocation(Location); } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { for (const auto& Move : Moved) { const BlockStoreLocation& OldLocation = State.GetLocation(Move.first); CHECK(OldLocation.BlockIndex == 0); // Only move from block 0 CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), OldLocation) == DroppedLocations.end()); auto It = std::find(ChunkLocations.begin(), ChunkLocations.end(), OldLocation); CHECK(It != ChunkLocations.end()); (*It) = Move.second; } RemovedSize += Removed; return true; }, []() { CHECK(false); return 0; }); SkipChunkCount = 2; for (size_t Index = 0; Index < ChunkLocations.size(); Index++) { const BlockStoreLocation& Location = ChunkLocations[Index]; if (Location.BlockIndex == 0 && SkipChunkCount > 0) { CHECK(!Store.TryGetChunk(Location)); continue; } IoBuffer Buffer = Store.TryGetChunk(Location); CHECK(Buffer); IoHash RawHash = IoHash::HashBuffer(Buffer.Data(), Buffer.Size()); CHECK_EQ(ChunkHashes[Index], RawHash); } CHECK_LT(Store.TotalSize(), PreSize); } SUBCASE("compact every other item") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState(); bool SkipFlag = false; for (const BlockStoreLocation& Location : ChunkLocations) { if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { continue; } if (SkipFlag) { State.IncludeBlock(Location.BlockIndex); SkipFlag = false; continue; } SkipFlag = true; } SkipFlag = false; std::vector DroppedLocations; for (const BlockStoreLocation& Location : ChunkLocations) { if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { continue; } if (SkipFlag) { DroppedLocations.push_back(Location); SkipFlag = false; continue; } State.AddKeepLocation(Location); SkipFlag = true; } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { for (const auto& Move : Moved) { const BlockStoreLocation& OldLocation = State.GetLocation(Move.first); CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), OldLocation) == DroppedLocations.end()); auto It = std::find(ChunkLocations.begin(), ChunkLocations.end(), OldLocation); CHECK(It != ChunkLocations.end()); (*It) = Move.second; } RemovedSize += Removed; return true; }, []() { CHECK(false); return 0; }); SkipFlag = false; for (size_t Index = 0; Index < ChunkLocations.size(); Index++) { const BlockStoreLocation& Location = ChunkLocations[Index]; if (SkipFlag && !SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), Location) != DroppedLocations.end()); CHECK(!Store.TryGetChunk(Location)); SkipFlag = false; continue; } IoBuffer Buffer = Store.TryGetChunk(Location); CHECK(Buffer); IoHash RawHash = IoHash::HashBuffer(Buffer.Data(), Buffer.Size()); CHECK_EQ(ChunkHashes[Index], RawHash); SkipFlag = true; } CHECK_LT(Store.TotalSize(), PreSize); } } #endif void blockstore_forcelink() { } } // namespace zen