diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-03 22:23:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-05-03 22:23:26 +0200 |
| commit | c5b2435192f382fbaa39a8ff67de16ee3b69b7a6 (patch) | |
| tree | 294bb596d61582744dd7901f6a464c324bdec3d2 /zenstore/blockstore.cpp | |
| parent | Merge pull request #84 from EpicGames/de/cleanup-lock-sharding-in-iobuffer (diff) | |
| parent | macos compilation fix (diff) | |
| download | zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.tar.xz zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.zip | |
Merge pull request #86 from EpicGames/de/block-store-refactor
structured cache with block store
Diffstat (limited to 'zenstore/blockstore.cpp')
| -rw-r--r-- | zenstore/blockstore.cpp | 1207 |
1 files changed, 1205 insertions, 2 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index 1eb859d5a..0992662c2 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -1,13 +1,17 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "compactcas.h" - #include <zenstore/blockstore.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> + #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/workthreadpool.h> # include <algorithm> # include <random> #endif @@ -102,12 +106,814 @@ BlockStoreFile::Flush() m_File.Flush(); } +BasicFile& +BlockStoreFile::GetBasicFile() +{ + return m_File; +} + void BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) { m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } +constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; + +void +BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations) +{ + ZEN_ASSERT(MaxBlockSize > 0); + ZEN_ASSERT(MaxBlockCount > 0); + ZEN_ASSERT(IsPow2(MaxBlockCount)); + + m_BlocksBasePath = BlocksBasePath; + m_MaxBlockSize = MaxBlockSize; + + m_ChunkBlocks.clear(); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : KnownLocations) + { + KnownBlocks.insert(Entry.BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != GetBlockFileExtension()) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block at '{}'", Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } +} + +void +BlockStore::Close() +{ + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + m_WriteBlock = nullptr; + m_CurrentInsertOffset = 0; + m_WriteBlockIndex = 0; + m_ChunkBlocks.clear(); + m_BlocksBasePath.clear(); +} + +BlockStoreLocation +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) +{ + ZEN_ASSERT(Data != nullptr); + ZEN_ASSERT(Size > 0u); + ZEN_ASSERT(Size <= m_MaxBlockSize); + ZEN_ASSERT(Alignment > 0u); + + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(m_MaxBlockSize); + } + uint64_t InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + InsertLock.ReleaseNow(); + + WriteBlock->Write(Data, Size, InsertOffset); + + return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}; +} + +BlockStore::ReclaimSnapshotState +BlockStore::GetReclaimSnapshotState() +{ + ReclaimSnapshotState State; + RwLock::ExclusiveLockScope _(m_InsertLock); + State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu; + State.BlockCount = m_ChunkBlocks.size(); + _.ReleaseNow(); + return State; +} + +Ref<BlockStoreFile> +BlockStore::GetChunkBlock(const BlockStoreLocation& Location) +{ + RwLock::SharedLockScope InsertLock(m_InsertLock); + if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) + { + return BlockIt->second; + } + return {}; +} + +void +BlockStore::Flush() +{ + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } +} + +void +BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const ChunkIndexArray& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& ChangeCallback, + const ClaimDiskReserveCallback& DiskReserveCallback) +{ + if (ChunkLocations.empty()) + { + return; + } + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = ChunkLocations.size(); + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = 0; + uint64_t NewTotalSize = 0; + + uint64_t MovedCount = 0; + uint64_t DeletedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO( + "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "chunks ({}).", + m_BlocksBasePath, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); + + size_t BlockCount = Snapshot.BlockCount; + + std::unordered_set<size_t> KeepChunkMap; + KeepChunkMap.reserve(KeepChunkIndexes.size()); + for (size_t KeepChunkIndex : KeepChunkIndexes) + { + KeepChunkMap.insert(KeepChunkIndex); + } + + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<ChunkIndexArray> BlockKeepChunks; + std::vector<ChunkIndexArray> BlockDeleteChunks; + + BlockIndexToChunkMapIndex.reserve(BlockCount); + BlockKeepChunks.reserve(BlockCount); + BlockDeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + + size_t DeleteCount = 0; + for (size_t Index = 0; Index < TotalChunkCount; ++Index) + { + const BlockStoreLocation& Location = ChunkLocations[Index]; + OldTotalSize += Location.Size; + if (Location.BlockIndex == Snapshot.ExcludeBlockIndex) + { + continue; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = BlockKeepChunks.size(); + BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; + BlockKeepChunks.resize(ChunkMapIndex + 1); + BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); + BlockDeleteChunks.resize(ChunkMapIndex + 1); + BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + + if (KeepChunkMap.contains(Index)) + { + ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + NewTotalSize += Location.Size; + continue; + } + ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + DeleteCount++; + } + + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + if (DryRun) + { + ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BlocksBasePath, + DeleteCount, + NiceBytes(OldTotalSize - NewTotalSize), + TotalChunkCount, + OldTotalSize); + return; + } + + Ref<BlockStoreFile> NewBlockFile; + try + { + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + ZEN_ASSERT(OldBlockFile); + } + + const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + ChangeCallback({}, DeleteMap); + DeletedCount += DeleteMap.size(); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + } + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } + + MovedChunksArray MovedChunks; + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepMap) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + { + ChangeCallback(MovedChunks, {}); + MovedCount += KeepMap.size(); + MovedChunks.clear(); + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + + ChangeCallback(MovedChunks, DeleteMap); + MovedCount += KeepMap.size(); + DeletedCount += DeleteMap.size(); + MovedChunks.clear(); + { + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + } + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + OldBlockFile = nullptr; + } + } + catch (std::exception& ex) + { + ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); + if (NewBlockFile) + { + ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + std::error_code Ec; + NewBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", NewBlockFile->GetPath(), Ec.message()); + } + } + } +} + +void +BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + IterateChunksSmallSizeCallback SmallSizeCallback, + IterateChunksLargeSizeCallback LargeSizeCallback) +{ + // We do a read sweep through the payloads file and validate + // any entries that are contained within each segment, with + // the assumption that most entries will be checked in this + // pass. An alternative strategy would be to use memory mapping. + + { + ChunkIndexArray BigChunks; + IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_InsertLock); + + for (const auto& Block : m_ChunkBlocks) + { + uint64_t WindowStart = 0; + uint64_t WindowEnd = ScrubSmallChunkWindowSize; + uint32_t BlockIndex = Block.first; + const Ref<BlockStoreFile>& BlockFile = Block.second; + const uint64_t FileSize = BlockFile->FileSize(); + + do + { + const uint64_t ChunkSize = Min(ScrubSmallChunkWindowSize, FileSize - WindowStart); + BlockFile->Read(BufferBase, ChunkSize, WindowStart); + + // TODO: We could be smarter here if the ChunkLocations were sorted on block index - we could + // then only scan a subset of ChunkLocations instead of scanning through them all... + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) + { + const BlockStoreLocation Location = ChunkLocations[ChunkIndex]; + if (BlockIndex != Location.BlockIndex) + { + continue; + } + + const uint64_t EntryOffset = Location.Offset; + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + { + const uint64_t EntryEnd = EntryOffset + Location.Size; + + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back(ChunkIndex); + + continue; + } + + SmallSizeCallback(ChunkIndex, + reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, + Location.Size); + } + } + + WindowStart += ScrubSmallChunkWindowSize; + WindowEnd += ScrubSmallChunkWindowSize; + } while (WindowStart < FileSize); + } + + // Deal with large chunks and chunks that extend over a ScrubSmallChunkWindowSize border + for (size_t ChunkIndex : BigChunks) + { + const BlockStoreLocation Location = ChunkLocations[ChunkIndex]; + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; + LargeSizeCallback(ChunkIndex, BlockFile, Location.Offset, Location.Size); + } + } +} + +bool +BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations, + const std::filesystem::path& SourceBlockFilePath, + const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + size_t PayloadAlignment, + bool CleanSource, + const SplitCallback& Callback) +{ + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error); + if (Error) + { + ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message()); + return false; + } + + if (Space.Free < MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + BlocksBasePath, + MaxBlockSize, + NiceBytes(Space.Free)); + return false; + } + + size_t TotalSize = 0; + for (const BlockStoreLocation& Location : ChunkLocations) + { + TotalSize += Location.Size; + } + size_t ChunkCount = ChunkLocations.size(); + uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; + if (MaxRequiredBlockCount > MaxBlockCount) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + BlocksBasePath, + MaxRequiredBlockCount, + MaxBlockCount); + return false; + } + + constexpr const uint64_t DiskReserve = 1ul << 28; + + if (CleanSource) + { + if (Space.Free < (MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + BlocksBasePath, + NiceBytes(MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return false; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + BlocksBasePath, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return false; + } + } + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + + BasicFile BlockFile; + BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + MovedChunksArray Chunks; + Chunks.reserve(ChunkCount); + for (size_t Index = 0; Index < ChunkCount; ++Index) + { + const BlockStoreLocation& ChunkLocation = ChunkLocations[Index]; + Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}}); + } + std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(SourceBlockFilePath, BlockPath); + Callback(Chunks); + return true; + } + + ChunkIndexArray ChunkIndexes; + ChunkIndexes.reserve(ChunkCount); + for (size_t Index = 0; Index < ChunkCount; ++Index) + { + ChunkIndexes.push_back(Index); + } + + std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) { + const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs]; + const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs]; + return LhsLocation.Offset < RhsLocation.Offset; + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData + { + MovedChunksArray Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + MovedChunksArray Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const size_t& ChunkIndex : ChunkIndexes) + { + const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex]; + + uint64_t ChunkOffset = LegacyChunkLocation.Offset; + uint64_t ChunkSize = LegacyChunkLocation.Size; + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkIndex, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; + } + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + BlocksBasePath, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + Callback(BlockRange.Chunks); + + if (CleanSource) + { + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + BlockFile.Close(); + + return true; +} + +const char* +BlockStore::GetBlockFileExtension() +{ + return ".ucas"; +} + +std::filesystem::path +BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) +{ + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(GetBlockFileExtension()); + return Path.ToPath(); +} + #if ZEN_WITH_TESTS static bool @@ -232,6 +1038,403 @@ TEST_CASE("blockstore.blockfile") CHECK(!std::filesystem::exists(RootDirectory / "1")); } +namespace { + BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) + { + BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment); + CHECK(Location.Size == String.length()); + return Location; + }; + + std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) + { + Ref<BlockStoreFile> ChunkBlock(Store.GetChunkBlock(Location)); + if (!ChunkBlock) + { + return ""; + } + IoBuffer ChunkData = ChunkBlock->GetChunk(Location.Offset, Location.Size); + if (!ChunkData) + { + return ""; + } + std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); + return AsString; + }; + + std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) + { + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t) override + { + if (Files) + { + Items.push_back(Parent / File); + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& Dir) override + { + if (Directories) + { + Items.push_back(Parent / Dir); + } + return true; + } + + bool Files; + bool Directories; + std::vector<std::filesystem::path> Items; + } Visit; + Visit.Files = Files; + Visit.Directories = Directories; + + Traversal.TraverseFileSystem(RootDir, Visit); + return Visit.Items; + }; + + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +TEST_CASE("blockstore.chunks") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory, 128, 1024, {}); + Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); + CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); +} + +TEST_CASE("blockstore.clean.stray.blocks") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + Store.Close(); + + // Not referencing the second block means that we should be deleted + Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); +} + +TEST_CASE("blockstore.flush.forces.new.block") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + Store.Flush(); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); +} + +TEST_CASE("blockstore.iterate.chunks") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); + Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + + std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); + BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); + + Store.IterateChunks( + {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation}, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + CHECK(Data); + CHECK(Size > 0); + std::string AsString((const char*)Data, Size); + switch (ChunkIndex) + { + case 0: + CHECK(AsString == FirstChunkData); + break; + case 1: + CHECK(AsString == SecondChunkData); + break; + default: + CHECK(false); + break; + } + }, + [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) { + CHECK(BlockFile); + CHECK(ChunkIndex == 2); + CHECK(Offset == VeryLargeChunkLocation.Offset); + CHECK(Size == VeryLargeChunkLocation.Size); + size_t StreamOffset = 0; + BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { + const char* VeryLargeChunkSection = &(VeryLargeChunk.data()[StreamOffset]); + CHECK(memcmp(VeryLargeChunkSection, Data, Size) == 0); + }); + }); +} + +TEST_CASE("blockstore.reclaim.space") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 512, 1024, {}); + + constexpr size_t ChunkCount = 200; + constexpr size_t Alignment = 8; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkHashes; + ChunkLocations.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex); + ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment)); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<size_t> ChunksToKeep; + ChunksToKeep.reserve(ChunkLocations.size()); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + Store.Flush(); + BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); + Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); + + // If we keep all the chunks we should not get any callbacks on moved/deleted stuff + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, + []() { + CHECK(false); + return 0; + }); + + size_t DeleteChunkCount = 38; + ChunksToKeep.clear(); + for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations; + size_t MovedChunkCount = 0; + size_t DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + for (const auto& MovedChunk : MovedChunks) + { + CHECK(MovedChunk.first >= DeleteChunkCount); + NewChunkLocations[MovedChunk.first] = MovedChunk.second; + } + MovedChunkCount += MovedChunks.size(); + for (size_t DeletedIndex : DeletedChunks) + { + CHECK(DeletedIndex < DeleteChunkCount); + } + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(MovedChunkCount <= DeleteChunkCount); + CHECK(DeletedChunkCount == DeleteChunkCount); + ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); + + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(NewChunkLocations[ChunkIndex]); + if (ChunkIndex >= DeleteChunkCount) + { + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(NewChunkLocations[ChunkIndex].Offset, NewChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + } + } + + NewChunkLocations = ChunkLocations; + MovedChunkCount = 0; + DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + {}, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + CHECK(MovedChunks.empty()); + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); +} + +TEST_CASE("blockstore.thread.read.write") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 1088, 1024, {}); + + constexpr size_t ChunkCount = 1000; + constexpr size_t Alignment = 8; + std::vector<IoBuffer> Chunks; + std::vector<IoHash> ChunkHashes; + Chunks.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); + Chunks.push_back(Chunk); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<BlockStoreLocation> ChunkLocations; + ChunkLocations.resize(ChunkCount); + + WorkerThreadPool WorkerPool(8); + std::atomic<size_t> WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]); + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + std::vector<BlockStoreLocation> SecondChunkLocations; + SecondChunkLocations.resize(ChunkCount); + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + WorkCompleted.fetch_add(1); + }); + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]); + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size() * 2) + { + Sleep(1); + } +} + #endif void |