// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { ////////////////////////////////////////////////////////////////////////// BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) { } BlockStoreFile::~BlockStoreFile() { m_IoBuffer = IoBuffer(); m_File.Detach(); } const std::filesystem::path& BlockStoreFile::GetPath() const { return m_Path; } void BlockStoreFile::Open() { m_File.Open(m_Path, BasicFile::Mode::kDelete); void* FileHandle = m_File.Handle(); m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); } void BlockStoreFile::Create(uint64_t InitialSize) { auto ParentPath = m_Path.parent_path(); if (!std::filesystem::is_directory(ParentPath)) { CreateDirectories(ParentPath); } m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete); if (InitialSize > 0) { m_File.SetFileSize(InitialSize); } void* FileHandle = m_File.Handle(); m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); } uint64_t BlockStoreFile::FileSize() { return m_File.FileSize(); } void BlockStoreFile::MarkAsDeleteOnClose() { m_IoBuffer.MarkAsDeleteOnClose(); } IoBuffer BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) { return IoBuffer(m_IoBuffer, Offset, Size); } void BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) { m_File.Read(Data, Size, FileOffset); } void BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) { m_File.Write(Data, Size, FileOffset); } void BlockStoreFile::Truncate(uint64_t Size) { m_File.SetFileSize(Size); } void BlockStoreFile::Flush() { m_File.Flush(); } BasicFile& BlockStoreFile::GetBasicFile() { return m_File; } void BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function&& ChunkFun) { m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; void BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount, const std::vector& KnownLocations) { ZEN_ASSERT(MaxBlockSize > 0); ZEN_ASSERT(MaxBlockCount > 0); ZEN_ASSERT(IsPow2(MaxBlockCount)); m_BlocksBasePath = BlocksBasePath; m_MaxBlockSize = MaxBlockSize; m_ChunkBlocks.clear(); std::unordered_set KnownBlocks; for (const auto& Entry : KnownLocations) { KnownBlocks.insert(Entry.BlockIndex); } if (std::filesystem::is_directory(m_BlocksBasePath)) { std::vector FoldersToScan; FoldersToScan.push_back(m_BlocksBasePath); size_t FolderOffset = 0; while (FolderOffset < FoldersToScan.size()) { for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) { if (Entry.is_directory()) { FoldersToScan.push_back(Entry.path()); continue; } if (Entry.is_regular_file()) { const std::filesystem::path Path = Entry.path(); if (Path.extension() != GetBlockFileExtension()) { continue; } std::string FileName = PathToUtf8(Path.stem()); uint32_t BlockIndex; bool OK = ParseHexNumber(FileName, BlockIndex); if (!OK) { continue; } if (!KnownBlocks.contains(BlockIndex)) { // Log removing unreferenced block // Clear out unused blocks ZEN_INFO("removing unused block at '{}'", Path); std::error_code Ec; std::filesystem::remove(Path, Ec); if (Ec) { ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); } continue; } Ref BlockFile = new BlockStoreFile(Path); BlockFile->Open(); m_ChunkBlocks[BlockIndex] = BlockFile; } } ++FolderOffset; } } else { CreateDirectories(m_BlocksBasePath); } } void BlockStore::Close() { RwLock::ExclusiveLockScope InsertLock(m_InsertLock); m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; m_WriteBlockIndex = 0; m_ChunkBlocks.clear(); m_BlocksBasePath.clear(); } void BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback) { ZEN_ASSERT(Data != nullptr); ZEN_ASSERT(Size > 0u); ZEN_ASSERT(Size <= m_MaxBlockSize); ZEN_ASSERT(Alignment > 0u); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); bool IsWriting = m_WriteBlock != nullptr; if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) { if (m_WriteBlock) { m_WriteBlock = nullptr; } { if (m_ChunkBlocks.size() == m_MaxBlockCount) { throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); } WriteBlockIndex += IsWriting ? 1 : 0; while (m_ChunkBlocks.contains(WriteBlockIndex)) { WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); } std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); m_WriteBlock = new BlockStoreFile(BlockPath); m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); } m_CurrentInsertOffset = 0; m_WriteBlock->Create(m_MaxBlockSize); } uint64_t InsertOffset = m_CurrentInsertOffset; m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); Ref WriteBlock = m_WriteBlock; m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); WriteBlock->Write(Data, Size, InsertOffset); Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); { RwLock::ExclusiveLockScope _(m_InsertLock); m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); } } BlockStore::ReclaimSnapshotState BlockStore::GetReclaimSnapshotState() { ReclaimSnapshotState State; RwLock::SharedLockScope _(m_InsertLock); for (uint32_t BlockIndex : m_ActiveWriteBlocks) { State.m_ActiveWriteBlocks.insert(BlockIndex); } State.BlockCount = m_ChunkBlocks.size(); return State; } IoBuffer BlockStore::TryGetChunk(const BlockStoreLocation& Location) { RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { if (const Ref& Block = BlockIt->second; Block) { return Block->GetChunk(Location.Offset, Location.Size); } } return IoBuffer(); } void BlockStore::Flush() { RwLock::ExclusiveLockScope _(m_InsertLock); if (m_CurrentInsertOffset > 0) { uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); m_WriteBlock = nullptr; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; } } void BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, const std::vector& ChunkLocations, const ChunkIndexArray& KeepChunkIndexes, uint64_t PayloadAlignment, bool DryRun, const ReclaimCallback& ChangeCallback, const ClaimDiskReserveCallback& DiskReserveCallback) { if (ChunkLocations.empty()) { return; } uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = ChunkLocations.size(); uint64_t DeletedSize = 0; uint64_t OldTotalSize = 0; uint64_t NewTotalSize = 0; uint64_t MovedCount = 0; uint64_t DeletedCount = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO( "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " "#{} " "of #{} " "chunks ({}).", m_BlocksBasePath, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedCount, MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); }); size_t BlockCount = Snapshot.BlockCount; std::unordered_set KeepChunkMap; KeepChunkMap.reserve(KeepChunkIndexes.size()); for (size_t KeepChunkIndex : KeepChunkIndexes) { KeepChunkMap.insert(KeepChunkIndex); } std::unordered_map BlockIndexToChunkMapIndex; std::vector BlockKeepChunks; std::vector BlockDeleteChunks; BlockIndexToChunkMapIndex.reserve(BlockCount); BlockKeepChunks.reserve(BlockCount); BlockDeleteChunks.reserve(BlockCount); size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; size_t DeleteCount = 0; for (size_t Index = 0; Index < TotalChunkCount; ++Index) { const BlockStoreLocation& Location = ChunkLocations[Index]; OldTotalSize += Location.Size; if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { continue; } auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); size_t ChunkMapIndex = 0; if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) { ChunkMapIndex = BlockKeepChunks.size(); BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; BlockKeepChunks.resize(ChunkMapIndex + 1); BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); BlockDeleteChunks.resize(ChunkMapIndex + 1); BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); } else { ChunkMapIndex = BlockIndexPtr->second; } if (KeepChunkMap.contains(Index)) { ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; IndexMap.push_back(Index); NewTotalSize += Location.Size; continue; } ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; IndexMap.push_back(Index); DeleteCount++; } std::unordered_set BlocksToReWrite; BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); for (const auto& Entry : BlockIndexToChunkMapIndex) { uint32_t BlockIndex = Entry.first; size_t ChunkMapIndex = Entry.second; const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; if (ChunkMap.empty()) { continue; } BlocksToReWrite.insert(BlockIndex); } if (DryRun) { ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", m_BlocksBasePath, DeleteCount, NiceBytes(OldTotalSize - NewTotalSize), TotalChunkCount, OldTotalSize); return; } Ref NewBlockFile; try { uint64_t WriteOffset = 0; uint32_t NewBlockIndex = 0; for (uint32_t BlockIndex : BlocksToReWrite) { const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; Ref OldBlockFile; { RwLock::SharedLockScope _i(m_InsertLock); Stopwatch Timer; const auto __ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); OldBlockFile = m_ChunkBlocks[BlockIndex]; ZEN_ASSERT(OldBlockFile); } const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; if (KeepMap.empty()) { const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; for (size_t DeleteIndex : DeleteMap) { DeletedSize += ChunkLocations[DeleteIndex].Size; } ChangeCallback({}, DeleteMap); DeletedCount += DeleteMap.size(); { RwLock::ExclusiveLockScope _i(m_InsertLock); Stopwatch Timer; const auto __ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); OldBlockFile->MarkAsDeleteOnClose(); } continue; } MovedChunksArray MovedChunks; std::vector Chunk; for (const size_t& ChunkIndex : KeepMap) { const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; Chunk.resize(ChunkLocation.Size); OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); if (NewBlockFile) { NewBlockFile->Truncate(WriteOffset); NewBlockFile->Flush(); NewBlockFile = nullptr; } { ChangeCallback(MovedChunks, {}); MovedCount += KeepMap.size(); MovedChunks.clear(); RwLock::ExclusiveLockScope __(m_InsertLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); if (m_ChunkBlocks.size() == m_MaxBlockCount) { ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", m_BlocksBasePath, static_cast(std::numeric_limits::max()) + 1); return; } while (m_ChunkBlocks.contains(NextBlockIndex)) { NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); } std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); NewBlockFile = new BlockStoreFile(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); if (Error) { ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); return; } if (Space.Free < m_MaxBlockSize) { uint64_t ReclaimedSpace = DiskReserveCallback(); if (Space.Free + ReclaimedSpace < m_MaxBlockSize) { ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", m_BlocksBasePath, m_MaxBlockSize, NiceBytes(Space.Free + ReclaimedSpace)); RwLock::ExclusiveLockScope _l(m_InsertLock); Stopwatch Timer; const auto __ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); m_ChunkBlocks.erase(NextBlockIndex); return; } ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", m_BlocksBasePath, ReclaimedSpace, NiceBytes(Space.Free + ReclaimedSpace)); } NewBlockFile->Create(m_MaxBlockSize); NewBlockIndex = NextBlockIndex; WriteOffset = 0; } NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); } Chunk.clear(); if (NewBlockFile) { NewBlockFile->Truncate(WriteOffset); NewBlockFile->Flush(); NewBlockFile = nullptr; } const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; for (size_t DeleteIndex : DeleteMap) { DeletedSize += ChunkLocations[DeleteIndex].Size; } ChangeCallback(MovedChunks, DeleteMap); MovedCount += KeepMap.size(); DeletedCount += DeleteMap.size(); MovedChunks.clear(); { RwLock::ExclusiveLockScope __(m_InsertLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); OldBlockFile->MarkAsDeleteOnClose(); } } } catch (std::exception& ex) { ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); if (NewBlockFile) { ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); NewBlockFile->MarkAsDeleteOnClose(); } } } void BlockStore::IterateChunks(const std::vector& ChunkLocations, IterateChunksSmallSizeCallback SmallSizeCallback, IterateChunksLargeSizeCallback LargeSizeCallback) { // We do a read sweep through the payloads file and validate // any entries that are contained within each segment, with // the assumption that most entries will be checked in this // pass. An alternative strategy would be to use memory mapping. { ChunkIndexArray BigChunks; IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; void* BufferBase = ReadBuffer.MutableData(); RwLock::SharedLockScope _(m_InsertLock); for (const auto& Block : m_ChunkBlocks) { uint64_t WindowStart = 0; uint64_t WindowEnd = ScrubSmallChunkWindowSize; uint32_t BlockIndex = Block.first; const Ref& BlockFile = Block.second; const uint64_t FileSize = BlockFile->FileSize(); do { const uint64_t ChunkSize = Min(ScrubSmallChunkWindowSize, FileSize - WindowStart); BlockFile->Read(BufferBase, ChunkSize, WindowStart); // TODO: We could be smarter here if the ChunkLocations were sorted on block index - we could // then only scan a subset of ChunkLocations instead of scanning through them all... for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { const BlockStoreLocation Location = ChunkLocations[ChunkIndex]; if (BlockIndex != Location.BlockIndex) { continue; } const uint64_t EntryOffset = Location.Offset; if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { const uint64_t EntryEnd = EntryOffset + Location.Size; if (EntryEnd >= WindowEnd) { BigChunks.push_back(ChunkIndex); continue; } SmallSizeCallback(ChunkIndex, reinterpret_cast(BufferBase) + Location.Offset - WindowStart, Location.Size); } } WindowStart += ScrubSmallChunkWindowSize; WindowEnd += ScrubSmallChunkWindowSize; } while (WindowStart < FileSize); } // Deal with large chunks and chunks that extend over a ScrubSmallChunkWindowSize border for (size_t ChunkIndex : BigChunks) { const BlockStoreLocation Location = ChunkLocations[ChunkIndex]; const Ref& BlockFile = m_ChunkBlocks[Location.BlockIndex]; LargeSizeCallback(ChunkIndex, BlockFile, Location.Offset, Location.Size); } } } bool BlockStore::Split(const std::vector& ChunkLocations, const std::filesystem::path& SourceBlockFilePath, const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount, size_t PayloadAlignment, bool CleanSource, const SplitCallback& Callback) { std::error_code Error; DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error); if (Error) { ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message()); return false; } if (Space.Free < MaxBlockSize) { ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", BlocksBasePath, MaxBlockSize, NiceBytes(Space.Free)); return false; } size_t TotalSize = 0; for (const BlockStoreLocation& Location : ChunkLocations) { TotalSize += Location.Size; } size_t ChunkCount = ChunkLocations.size(); uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount); uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; if (MaxRequiredBlockCount > MaxBlockCount) { ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", BlocksBasePath, MaxRequiredBlockCount, MaxBlockCount); return false; } constexpr const uint64_t DiskReserve = 1ul << 28; if (CleanSource) { if (Space.Free < (MaxBlockSize + DiskReserve)) { ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", BlocksBasePath, NiceBytes(MaxBlockSize + DiskReserve), NiceBytes(Space.Free)); return false; } } else { if (Space.Free < (RequiredDiskSpace + DiskReserve)) { ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", BlocksBasePath, NiceBytes(RequiredDiskSpace + DiskReserve), NiceBytes(Space.Free)); return false; } } uint32_t WriteBlockIndex = 0; while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } BasicFile BlockFile; BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); if (CleanSource && (MaxRequiredBlockCount < 2)) { MovedChunksArray Chunks; Chunks.reserve(ChunkCount); for (size_t Index = 0; Index < ChunkCount; ++Index) { const BlockStoreLocation& ChunkLocation = ChunkLocations[Index]; Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}}); } std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex); CreateDirectories(BlockPath.parent_path()); BlockFile.Close(); std::filesystem::rename(SourceBlockFilePath, BlockPath); Callback(Chunks); return true; } ChunkIndexArray ChunkIndexes; ChunkIndexes.reserve(ChunkCount); for (size_t Index = 0; Index < ChunkCount; ++Index) { ChunkIndexes.push_back(Index); } std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) { const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs]; const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs]; return LhsLocation.Offset < RhsLocation.Offset; }); uint64_t BlockSize = 0; uint64_t BlockOffset = 0; std::vector NewLocations; struct BlockData { MovedChunksArray Chunks; uint64_t BlockOffset; uint64_t BlockSize; uint32_t BlockIndex; }; std::vector BlockRanges; MovedChunksArray Chunks; BlockRanges.reserve(MaxRequiredBlockCount); for (const size_t& ChunkIndex : ChunkIndexes) { const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex]; uint64_t ChunkOffset = LegacyChunkLocation.Offset; uint64_t ChunkSize = LegacyChunkLocation.Size; uint64_t ChunkEnd = ChunkOffset + ChunkSize; if (BlockSize == 0) { BlockOffset = ChunkOffset; } if ((ChunkEnd - BlockOffset) > MaxBlockSize) { BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; BlockRange.Chunks.swap(Chunks); BlockRanges.push_back(BlockRange); WriteBlockIndex++; while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } BlockOffset = ChunkOffset; BlockSize = 0; } BlockSize = RoundUp(BlockSize, PayloadAlignment); BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; Chunks.push_back({ChunkIndex, ChunkLocation}); BlockSize = ChunkEnd - BlockOffset; } if (BlockSize > 0) { BlockRanges.push_back( {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); } Stopwatch WriteBlockTimer; std::reverse(BlockRanges.begin(), BlockRanges.end()); std::vector Buffer(1 << 28); for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) { const BlockData& BlockRange = BlockRanges[Idx]; if (Idx > 0) { uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; uint64_t Completed = BlockOffset + BlockSize - Remaining; uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", BlocksBasePath, Idx, BlockRanges.size(), NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), NiceBytes(BlockOffset + BlockSize), NiceTimeSpanMs(ETA)); } std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); BlockStoreFile ChunkBlock(BlockPath); ChunkBlock.Create(BlockRange.BlockSize); uint64_t Offset = 0; while (Offset < BlockRange.BlockSize) { uint64_t Size = BlockRange.BlockSize - Offset; if (Size > Buffer.size()) { Size = Buffer.size(); } BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); ChunkBlock.Write(Buffer.data(), Size, Offset); Offset += Size; } ChunkBlock.Truncate(Offset); ChunkBlock.Flush(); Callback(BlockRange.Chunks); if (CleanSource) { BlockFile.SetFileSize(BlockRange.BlockOffset); } } BlockFile.Close(); return true; } const char* BlockStore::GetBlockFileExtension() { return ".ucas"; } std::filesystem::path BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) { ExtendablePathBuilder<256> Path; char BlockHexString[9]; ToHexNumber(BlockIndex, BlockHexString); Path.Append(BlocksBasePath); Path.AppendSeparator(); Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); Path.AppendSeparator(); Path.Append(BlockHexString); Path.Append(GetBlockFileExtension()); return Path.ToPath(); } #if ZEN_WITH_TESTS static bool operator==(const BlockStoreLocation& Lhs, const BlockStoreLocation& Rhs) { return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size; } TEST_CASE("blockstore.blockstoredisklocation") { BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits::max()}; CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); BlockStoreLocation MaxBlockIndexAndOffset = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = std::numeric_limits::max()}; CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4096, .Size = std::numeric_limits::max()}; CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, .Size = std::numeric_limits::max() / 2}; CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); } TEST_CASE("blockstore.blockfile") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path() / "blocks"; CreateDirectories(RootDirectory); { BlockStoreFile File1(RootDirectory / "1"); File1.Create(16384); CHECK(File1.FileSize() == 16384); File1.Write("data", 5, 0); IoBuffer DataChunk = File1.GetChunk(0, 5); File1.Write("boop", 5, 5); IoBuffer BoopChunk = File1.GetChunk(5, 5); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); File1.Flush(); } { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); char DataRaw[5]; File1.Read(DataRaw, 5, 0); CHECK(std::string(DataRaw) == "data"); IoBuffer DataChunk = File1.GetChunk(0, 5); char BoopRaw[5]; File1.Read(BoopRaw, 5, 5); CHECK(std::string(BoopRaw) == "boop"); IoBuffer BoopChunk = File1.GetChunk(5, 5); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } { IoBuffer DataChunk; IoBuffer BoopChunk; { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); DataChunk = File1.GetChunk(0, 5); BoopChunk = File1.GetChunk(5, 5); } CHECK(std::filesystem::exists(RootDirectory / "1")); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } CHECK(std::filesystem::exists(RootDirectory / "1")); { IoBuffer DataChunk; IoBuffer BoopChunk; { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); File1.MarkAsDeleteOnClose(); DataChunk = File1.GetChunk(0, 5); BoopChunk = File1.GetChunk(5, 5); } const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } CHECK(!std::filesystem::exists(RootDirectory / "1")); } namespace { BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) { BlockStoreLocation Location; Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); CHECK(Location.Size == String.length()); return Location; }; std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) { IoBuffer ChunkData = Store.TryGetChunk(Location); if (!ChunkData) { return ""; } std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); return AsString; }; std::vector GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) { DirectoryContent DirectoryContent; GetDirectoryContent(RootDir, DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) | (Directories ? DirectoryContent::IncludeDirsFlag : 0), DirectoryContent); std::vector Result; Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end()); return Result; }; static IoBuffer CreateChunk(uint64_t Size) { static std::random_device rd; static std::mt19937 g(rd()); std::vector Values; Values.resize(Size); for (size_t Idx = 0; Idx < Size; ++Idx) { Values[Idx] = static_cast(Idx); } std::shuffle(Values.begin(), Values.end(), g); return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); } } // namespace TEST_CASE("blockstore.chunks") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory, 128, 1024, {}); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); } TEST_CASE("blockstore.clean.stray.blocks") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 128, 1024, {}); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; WriteStringAsChunk(Store, ThirdChunkData, 4); Store.Close(); // Not referencing the second block means that we should be deleted Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); } TEST_CASE("blockstore.flush.forces.new.block") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 128, 1024, {}); std::string FirstChunkData = "This is the data of the first chunk that we will write"; WriteStringAsChunk(Store, FirstChunkData, 4); Store.Flush(); std::string SecondChunkData = "This is the data for the second chunk that we will write"; WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; WriteStringAsChunk(Store, ThirdChunkData, 4); CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); } TEST_CASE("blockstore.iterate.chunks") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(); std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); Store.IterateChunks( {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation}, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { CHECK(Data); CHECK(Size > 0); std::string AsString((const char*)Data, Size); switch (ChunkIndex) { case 0: CHECK(AsString == FirstChunkData); break; case 1: CHECK(AsString == SecondChunkData); break; default: CHECK(false); break; } }, [&](size_t ChunkIndex, Ref BlockFile, uint64_t Offset, uint64_t Size) { CHECK(BlockFile); CHECK(ChunkIndex == 2); CHECK(Offset == VeryLargeChunkLocation.Offset); CHECK(Size == VeryLargeChunkLocation.Size); size_t StreamOffset = 0; BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { const char* VeryLargeChunkSection = &(VeryLargeChunk.data()[StreamOffset]); CHECK(memcmp(VeryLargeChunkSection, Data, Size) == 0); }); }); } TEST_CASE("blockstore.reclaim.space") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 512, 1024, {}); constexpr size_t ChunkCount = 200; constexpr size_t Alignment = 8; std::vector ChunkLocations; std::vector ChunkHashes; ChunkLocations.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateChunk(57 + ChunkIndex); Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunksToKeep; ChunksToKeep.reserve(ChunkLocations.size()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { ChunksToKeep.push_back(ChunkIndex); } Store.Flush(); BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); // If we keep all the chunks we should not get any callbacks on moved/deleted stuff Store.ReclaimSpace( State1, ChunkLocations, ChunksToKeep, Alignment, false, [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, []() { CHECK(false); return 0; }); size_t DeleteChunkCount = 38; ChunksToKeep.clear(); for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) { ChunksToKeep.push_back(ChunkIndex); } std::vector NewChunkLocations = ChunkLocations; size_t MovedChunkCount = 0; size_t DeletedChunkCount = 0; Store.ReclaimSpace( State1, ChunkLocations, ChunksToKeep, Alignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { for (const auto& MovedChunk : MovedChunks) { CHECK(MovedChunk.first >= DeleteChunkCount); NewChunkLocations[MovedChunk.first] = MovedChunk.second; } MovedChunkCount += MovedChunks.size(); for (size_t DeletedIndex : DeletedChunks) { CHECK(DeletedIndex < DeleteChunkCount); } DeletedChunkCount += DeletedChunks.size(); }, []() { CHECK(false); return 0; }); CHECK(MovedChunkCount <= DeleteChunkCount); CHECK(DeletedChunkCount == DeleteChunkCount); ChunkLocations = std::vector(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); if (ChunkIndex >= DeleteChunkCount) { IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); } } NewChunkLocations = ChunkLocations; MovedChunkCount = 0; DeletedChunkCount = 0; Store.ReclaimSpace( State1, ChunkLocations, {}, Alignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { CHECK(MovedChunks.empty()); DeletedChunkCount += DeletedChunks.size(); }, []() { CHECK(false); return 0; }); CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); } TEST_CASE("blockstore.thread.read.write") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024, {}); constexpr size_t ChunkCount = 1000; constexpr size_t Alignment = 8; std::vector Chunks; std::vector ChunkHashes; Chunks.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); Chunks.push_back(Chunk); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunkLocations; ChunkLocations.resize(ChunkCount); WorkerThreadPool WorkerPool(8); std::atomic WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } std::vector SecondChunkLocations; SecondChunkLocations.resize(ChunkCount); WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { SecondChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }); WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size() * 2) { Sleep(1); } } #endif void blockstore_forcelink() { } } // namespace zen