// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include # include #endif #include ////////////////////////////////////////////////////////////////////////// namespace zen { const FLLMTag& GetBlocksTag() { static FLLMTag _("blocks"); return _; } ////////////////////////////////////////////////////////////////////////// BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) { } BlockStoreFile::~BlockStoreFile() { m_IoBuffer = IoBuffer(); m_File.Detach(); } const std::filesystem::path& BlockStoreFile::GetPath() const { return m_Path; } void BlockStoreFile::Open() { ZEN_TRACE_CPU("BlockStoreFile::Open"); uint32_t RetriesLeft = 3; m_File.Open(m_Path, BasicFile::Mode::kDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); void* FileHandle = m_File.Handle(); m_CachedFileSize = m_File.FileSize(); m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_CachedFileSize, /*IsWholeFile*/ true); } void BlockStoreFile::Create(uint64_t InitialSize) { ZEN_TRACE_CPU("BlockStoreFile::Create"); auto ParentPath = m_Path.parent_path(); if (!IsDir(ParentPath)) { CreateDirectories(ParentPath); } uint32_t RetriesLeft = 3; m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to create cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); RemoveMeta(); void* FileHandle = m_File.Handle(); // We map our m_IoBuffer beyond the file size as we will grow it over time and want // to be able to create sub-buffers of all the written range later m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false); } uint64_t BlockStoreFile::FileSize() const { uint64_t CachedSize = m_CachedFileSize; if (CachedSize == 0) { std::error_code Ec; uint64_t Size = m_File.FileSize(Ec); if (Ec) { ZEN_WARN("Failed to get file size of block {}. Reason: {}", m_Path, Ec.message()); return 0; } uint64_t Expected = 0; if (!m_CachedFileSize.compare_exchange_strong(Expected, Size)) { // Force a new check next time file size is fetched m_CachedFileSize.store(0); } return Size; } return CachedSize; } void BlockStoreFile::MarkAsDeleteOnClose() { m_IoBuffer.SetDeleteOnClose(true); RemoveMeta(); } IoBuffer BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) { if (Offset + Size > m_IoBuffer.GetSize()) { return {}; } return IoBuffer(m_IoBuffer, Offset, Size); } void BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) { m_File.Read(Data, Size, FileOffset); } void BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) { ZEN_TRACE_CPU("BlockStoreFile::Write"); ZEN_ASSERT(Size + FileOffset <= m_IoBuffer.GetSize()); m_File.Write(Data, Size, FileOffset); uint64_t NewSize = FileOffset + Size; uint64_t CurrentSize = m_CachedFileSize.load(); while (NewSize > CurrentSize) { if (m_CachedFileSize.compare_exchange_strong(CurrentSize, NewSize)) { break; } } ZEN_ASSERT(m_CachedFileSize.load() >= NewSize); } void BlockStoreFile::Flush() { ZEN_TRACE_CPU("BlockStoreFile::Flush"); m_File.Flush(); } BasicFile& BlockStoreFile::GetBasicFile() { return m_File; } void BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function&& ChunkFun) { m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } bool BlockStoreFile::IsOpen() const { return !!m_IoBuffer; } bool BlockStoreFile::SetMetaData(const IoBuffer& Payload) { if (!Payload) { RemoveMeta(); return true; } const std::filesystem::path MetaPath = GetMetaPath(); std::error_code Ec; TemporaryFile::SafeWriteFile(MetaPath, Payload.GetView(), Ec); if (Ec) { ZEN_WARN("Unable to set meta data for block '{}' at meta path: '{}'. Reason: '{}'", m_Path, MetaPath, Ec.message()); return false; } return true; } class BlockStoreFileAppender { public: BlockStoreFileAppender() = delete; BlockStoreFileAppender(const BlockStoreFileAppender&) = delete; BlockStoreFileAppender(BlockStoreFileAppender&&) = delete; BlockStoreFileAppender& operator=(const BlockStoreFileAppender&) = delete; BlockStoreFileAppender(BlockStoreFile& BlockFile, uint64_t BufferSize) : m_BlockFile(BlockFile) , m_BufferSize(BufferSize) , m_Buffer(nullptr) { m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize); } ~BlockStoreFileAppender() { Flush(); Memory::Free(m_Buffer); } uint64_t Append(const void* Data, uint64_t Size, uint64_t Alignment) { AlignWritePos(Alignment); if (Size >= m_BufferSize) { Flush(); m_BlockFile.Write(Data, Size, m_WritePos); const uint64_t WrittenPos = m_WritePos; m_WritePos += Size; m_BufferStart = m_WritePos; return WrittenPos; } { const uint64_t PendingSize = m_WritePos - m_BufferStart; if (PendingSize + Size > m_BufferSize) { Flush(); } } ZEN_ASSERT((m_WritePos - m_BufferStart) + Size <= m_BufferSize); ZEN_ASSERT_SLOW(RoundUp(m_WritePos, Alignment) == m_WritePos); memmove(&m_Buffer[m_WritePos - m_BufferStart], Data, Size); const uint64_t WrittenPos = m_WritePos; m_WritePos += Size; return WrittenPos; } void Flush() { uint64_t PendingSize = m_WritePos - m_BufferStart; if (PendingSize > 0) { m_BlockFile.Write(m_Buffer, PendingSize, m_BufferStart); m_BufferStart = m_WritePos; } } private: void AlignWritePos(uint64_t Alignment) { const uint64_t AlignedWritePos = RoundUp(m_WritePos, Alignment); const uint64_t Padding = AlignedWritePos - m_WritePos; ZEN_ASSERT_SLOW(Padding <= m_BufferSize); if (Padding > 0) { const uint64_t PendingSize = m_WritePos - m_BufferStart; if (PendingSize + Padding > m_BufferSize) { Flush(); } memset(&m_Buffer[m_WritePos - m_BufferStart], 0, Padding); m_WritePos += Padding; ZEN_ASSERT(m_WritePos == AlignedWritePos); } } BlockStoreFile& m_BlockFile; const uint64_t m_BufferSize; uint8_t* m_Buffer; uint64_t m_BufferStart = 0; uint64_t m_WritePos = 0; }; static bool IsMetaDataValid(const std::filesystem::path& BlockPath, const std::filesystem::path& MetaPath) { std::error_code Ec; std::filesystem::file_time_type MetaWriteTime = std::filesystem::last_write_time(MetaPath, Ec); if (Ec) { return false; } std::filesystem::file_time_type BlockWriteTime = std::filesystem::last_write_time(BlockPath, Ec); if (Ec) { return false; } if (MetaWriteTime < BlockWriteTime) { RemoveFile(MetaPath, Ec); return false; } return true; } IoBuffer BlockStoreFile::GetMetaData() const { const std::filesystem::path MetaPath = GetMetaPath(); if (IsMetaDataValid(m_Path, MetaPath)) { return IoBufferBuilder::MakeFromFile(MetaPath); } return {}; } uint64_t BlockStoreFile::MetaSize() const { const std::filesystem::path MetaPath = GetMetaPath(); if (IsMetaDataValid(m_Path, MetaPath)) { std::error_code DummyEc; if (uint64_t Size = FileSizeFromPath(MetaPath, DummyEc); !DummyEc) { return Size; } } return 0; } void BlockStoreFile::RemoveMeta() { std::filesystem::path MetaPath = GetMetaPath(); std::error_code DummyEc; RemoveFile(MetaPath, DummyEc); } std::filesystem::path BlockStoreFile::GetMetaPath() const { std::filesystem::path MetaPath(m_Path); return MetaPath.replace_extension(".meta"); } //////////////////////////////////////////////////////// constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024; BlockStore::BlockStore() { } BlockStore::~BlockStore() { try { Close(); } catch (const std::exception& Ex) { ZEN_ERROR("~BlockStore() failed with: ", Ex.what()); } } void BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::Initialize"); ZEN_ASSERT(MaxBlockSize > 0); ZEN_ASSERT(MaxBlockCount > 0); ZEN_ASSERT(IsPow2(MaxBlockCount)); tsl::robin_map FoundBlocks; m_TotalSize = 0; m_BlocksBasePath = BlocksBasePath; m_MaxBlockSize = MaxBlockSize; m_MaxBlockCount = MaxBlockCount; if (IsDir(m_BlocksBasePath)) { std::vector EmptyBlockFiles; uint32_t NextBlockIndex = 0; std::vector FoldersToScan; FoldersToScan.push_back(m_BlocksBasePath); size_t FolderOffset = 0; while (FolderOffset < FoldersToScan.size()) { for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) { if (Entry.is_directory()) { FoldersToScan.push_back(Entry.path()); continue; } if (Entry.is_regular_file()) { const std::filesystem::path Path = Entry.path(); if (Path.extension() != GetBlockFileExtension()) { continue; } std::string FileName = PathToUtf8(Path.stem()); uint32_t BlockIndex; bool OK = ParseHexNumber(FileName, BlockIndex); if (!OK) { continue; } if (Entry.file_size() == 0) { EmptyBlockFiles.push_back(Path); continue; } Ref BlockFile{new BlockStoreFile(Path)}; BlockFile->Open(); m_TotalSize.fetch_add(BlockFile->TotalSize(), std::memory_order::relaxed); m_ChunkBlocks[BlockIndex] = BlockFile; FoundBlocks[BlockIndex] = BlockFile->FileSize(); if (BlockIndex >= NextBlockIndex) { NextBlockIndex = (BlockIndex + 1) & (m_MaxBlockCount - 1); } } } ++FolderOffset; } for (const std::filesystem::path& EmptyBlockFile : EmptyBlockFiles) { std::error_code Ec; RemoveFile(EmptyBlockFile, Ec); if (Ec) { ZEN_WARN("Unable to remove empty block file {}. Reason: {}", EmptyBlockFile, Ec.message()); } } m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release); } else { CreateDirectories(m_BlocksBasePath); } } BlockStore::BlockIndexSet BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk"); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); BlockIndexSet MissingBlocks; BlockIndexSet DeleteBlocks; DeleteBlocks.reserve(m_ChunkBlocks.size()); for (auto It : m_ChunkBlocks) { DeleteBlocks.insert(It.first); } for (const uint32_t BlockIndex : KnownBlocks) { DeleteBlocks.erase(BlockIndex); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull()) { continue; } else { MissingBlocks.insert(BlockIndex); } } for (std::uint32_t BlockIndex : DeleteBlocks) { std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); if (m_ChunkBlocks[BlockIndex]) { m_TotalSize.fetch_sub(m_ChunkBlocks[BlockIndex]->TotalSize(), std::memory_order::relaxed); m_ChunkBlocks[BlockIndex]->MarkAsDeleteOnClose(); } m_ChunkBlocks.erase(BlockIndex); } return MissingBlocks; } BlockStore::BlockEntryCountMap BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact"); BlockEntryCountMap Result; { RwLock::SharedLockScope InsertLock(m_InsertLock); const uint64_t SmallBlockLimit = m_MaxBlockSize / 2; std::vector SmallBlockIndexes; for (const auto& It : m_ChunkBlocks) { uint32_t BlockIndex = It.first; if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock) { continue; } if (std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), BlockIndex) != m_ActiveWriteBlocks.end()) { continue; } uint64_t UsedSize = 0; uint32_t UsedCount = 0; if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end()) { UsedSize = UsageIt->second.DiskUsage; UsedCount = UsageIt->second.EntryCount; } uint64_t PhysicalSize = It.second ? It.second->FileSize() : 0u; if (PhysicalSize == 0) { Result.insert_or_assign(BlockIndex, UsedCount); continue; } bool IsBelowUnusedLimit = false; if (BlockUsageThresholdPercent == 100) { if (UsedSize < PhysicalSize) { IsBelowUnusedLimit = true; } } else if (BlockUsageThresholdPercent == 0) { if (UsedSize == 0) { IsBelowUnusedLimit = true; } } else { const uint32_t UsedPercent = UsedSize < PhysicalSize ? gsl::narrow((100 * UsedSize) / PhysicalSize) : 100u; if (UsedPercent < BlockUsageThresholdPercent) { IsBelowUnusedLimit = true; } } if (IsBelowUnusedLimit) { Result.insert_or_assign(BlockIndex, UsedCount); } else if (PhysicalSize < SmallBlockLimit) { Result.insert_or_assign(BlockIndex, UsedCount); SmallBlockIndexes.push_back(BlockIndex); } } // If we only find one small block to compact, let it be. if (SmallBlockIndexes.size() == 1 && Result.size() == 1) { Result.erase(SmallBlockIndexes[0]); } } return Result; } void BlockStore::Close() { ZEN_TRACE_CPU("BlockStore::Close"); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; m_WriteBlockIndex = 0; m_ChunkBlocks.clear(); m_BlocksBasePath.clear(); } uint32_t BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const { if (m_ChunkBlocks.size() == m_MaxBlockCount) { return (uint32_t)m_MaxBlockCount; } while (true) { if (!m_ChunkBlocks.contains(ProbeIndex)) { OutBlockPath = GetBlockPath(m_BlocksBasePath, ProbeIndex); std::error_code Ec; bool Exists = IsFile(OutBlockPath, Ec); if (Ec) { ZEN_WARN("Failed to probe existence of file '{}' when trying to allocate a new block. Reason: '{}'", OutBlockPath, Ec.message()); return (uint32_t)m_MaxBlockCount; } if (!Exists) { return ProbeIndex; } } ProbeIndex = (ProbeIndex + 1) & (m_MaxBlockCount - 1); } return ProbeIndex; } void BlockStore::AddActiveWriteBlock(RwLock::ExclusiveLockScope& Lock, uint32_t BlockIndex) { ZEN_UNUSED(Lock); m_ActiveWriteBlocks.push_back(BlockIndex); } void BlockStore::RemoveActiveWriteBlock(uint32_t BlockIndex) { eastl::fixed_vector, 2> FlushBlocks; { RwLock::ExclusiveLockScope _(m_InsertLock); m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), BlockIndex)); for (auto It = m_BlocksToFlush.begin(); It != m_BlocksToFlush.end();) { const uint32_t FlushBlockIndex = *It; if (std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), FlushBlockIndex) == m_ActiveWriteBlocks.end()) { FlushBlocks.push_back(m_ChunkBlocks[FlushBlockIndex]); ZEN_DEBUG("Flushing block {} at '{}'", FlushBlockIndex, GetBlockPath(m_BlocksBasePath, FlushBlockIndex)); It = m_BlocksToFlush.erase(It); } else { It++; } } } for (Ref& FlushBlock : FlushBlocks) { FlushBlock->Flush(); FlushBlock = nullptr; } } void BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunk"); ZEN_ASSERT(Data != nullptr); ZEN_ASSERT(Size > 0u); ZEN_ASSERT(Size <= m_MaxBlockSize); ZEN_ASSERT(Alignment > 0u); uint32_t ChunkSize = gsl::narrow(Size); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); bool IsWriting = !!m_WriteBlock; uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment); if (!IsWriting || (AlignedInsertOffset + ChunkSize) > m_MaxBlockSize) { if (m_WriteBlock) { m_BlocksToFlush.push_back(WriteBlockIndex); } WriteBlockIndex += IsWriting ? 1 : 0; std::filesystem::path BlockPath; WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); if (WriteBlockIndex == (uint32_t)m_MaxBlockCount) { throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); } Ref NewBlockFile(new BlockStoreFile(BlockPath)); NewBlockFile->Create(m_MaxBlockSize); ZEN_DEBUG("Created block {} at '{}'", WriteBlockIndex, BlockPath); m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; m_WriteBlock = NewBlockFile; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; AlignedInsertOffset = 0; } uint32_t AlignedWriteSize = AlignedInsertOffset - m_CurrentInsertOffset + ChunkSize; m_CurrentInsertOffset = AlignedInsertOffset + ChunkSize; Ref WriteBlock = m_WriteBlock; AddActiveWriteBlock(InsertLock, WriteBlockIndex); InsertLock.ReleaseNow(); auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); WriteBlock->Write(Data, ChunkSize, AlignedInsertOffset); m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); Callback({.BlockIndex = WriteBlockIndex, .Offset = AlignedInsertOffset, .Size = ChunkSize}); } void BlockStore::WriteChunks(std::span Datas, uint32_t Alignment, const WriteChunksCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunks"); ZEN_ASSERT(!Datas.empty()); ZEN_ASSERT(Alignment > 0u); const size_t TotalCount = Datas.size(); uint64_t TotalSize = 0; uint64_t LargestSize = 0; for (const IoBuffer& Data : Datas) { uint64_t Size = Data.GetSize(); ZEN_ASSERT(Size > 0); ZEN_ASSERT(Size <= m_MaxBlockSize); TotalSize += Size; LargestSize = Max(LargestSize, Size); } const uint64_t MinSize = Max(LargestSize, 512u * 1024u); const uint64_t BufferSize = Min(TotalSize, MinSize); std::vector Buffer(BufferSize); size_t Offset = 0; while (Offset < TotalCount) { size_t Count = 1; uint32_t RangeSize = gsl::narrow(Datas[Offset].GetSize()); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment); if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize)) { if (m_WriteBlock) { m_BlocksToFlush.push_back(WriteBlockIndex); } std::filesystem::path BlockPath; WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath); if (WriteBlockIndex == (uint32_t)m_MaxBlockCount) { throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); } Ref NewBlockFile(new BlockStoreFile(BlockPath)); NewBlockFile->Create(m_MaxBlockSize); ZEN_DEBUG("Created block {} at '{}'", WriteBlockIndex, BlockPath); m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; m_WriteBlock = NewBlockFile; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; AlignedInsertOffset = 0; } while (Offset + Count < TotalCount) { uint32_t NextRangeSize = gsl::narrow(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize()); if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize)) { break; } Count++; RangeSize = NextRangeSize; } m_CurrentInsertOffset = AlignedInsertOffset + RangeSize; Ref WriteBlock = m_WriteBlock; AddActiveWriteBlock(InsertLock, WriteBlockIndex); InsertLock.ReleaseNow(); auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); if (Count > 1) { if (Buffer.empty()) { Buffer.resize(BufferSize); } MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); for (size_t Index = 0; Index < Count; Index++) { MemoryView SourceBuffer = Datas[Index + Offset]; WriteBuffer.CopyFrom(SourceBuffer); WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); } WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); } else { MemoryView SourceBuffer = Datas[Offset]; WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset); m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed); } uint32_t ChunkOffset = AlignedInsertOffset; std::vector Locations(Count); for (size_t Index = 0; Index < Count; Index++) { uint32_t ChunkSize = gsl::narrow(Datas[Offset + Index].GetSize()); Locations[Index] = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize}; ChunkOffset += gsl::narrow(RoundUp(ChunkSize, Alignment)); } Callback(Locations); Offset += Count; } } bool BlockStore::HasChunk(const BlockStoreLocation& Location) const { ZEN_TRACE_CPU("BlockStore::HasChunk"); RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { if (Ref Block = BlockIt->second; Block) { InsertLock.ReleaseNow(); const uint64_t BlockSize = Block->FileSize(); if (Location.Offset + Location.Size <= BlockSize) { return true; } else { ZEN_WARN("BlockLocation: Block {}, Offset {}, Size {} is outside block size {}", Location.BlockIndex, Location.Offset, Location.Size, BlockSize); } } } return false; } IoBuffer BlockStore::TryGetChunk(const BlockStoreLocation& Location) const { ZEN_TRACE_CPU("BlockStore::TryGetChunk"); RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { if (Ref Block = BlockIt->second; Block) { InsertLock.ReleaseNow(); IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size); if (Chunk.GetSize() == Location.Size) { return Chunk; } } } return IoBuffer(); } void BlockStore::Flush(bool ForceNewBlock) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::Flush"); if (ForceNewBlock) { RwLock::ExclusiveLockScope _(m_InsertLock); if (m_CurrentInsertOffset > 0) { if (m_WriteBlock) { m_WriteBlock->Flush(); } m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; } return; } RwLock::SharedLockScope _(m_InsertLock); if (m_WriteBlock) { m_WriteBlock->Flush(); } } bool BlockStore::IterateBlock(std::span ChunkLocations, std::span InChunkIndexes, const IterateChunksSmallSizeCallback& SmallSizeCallback, const IterateChunksLargeSizeCallback& LargeSizeCallback, uint64_t LargeSizeLimit) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::IterateBlock"); if (InChunkIndexes.empty()) { return true; } ZEN_ASSERT(ChunkLocations.size() >= InChunkIndexes.size()); if (LargeSizeLimit == 0) { LargeSizeLimit = DefaultIterateSmallChunkWindowSize; } uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit); const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 512u); IterateSmallChunkWindowSize = Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize); const size_t FirstLocationIndex = InChunkIndexes[0]; ZEN_ASSERT(FirstLocationIndex < ChunkLocations.size()); const uint32_t BlockIndex = ChunkLocations[FirstLocationIndex].BlockIndex; std::vector ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end()); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset; }); auto GetNextRange = [LargeSizeLimit, IterateSmallChunkWindowSize, IterateSmallChunkMaxGapSize, &ChunkLocations](uint64_t BlockFileSize, std::span ChunkIndexes, size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; size_t StartIndex = ChunkIndexes[StartIndexOffset]; ZEN_ASSERT(StartIndex < ChunkLocations.size()); const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; uint64_t StartOffset = StartLocation.Offset; uint64_t LastEnd = StartOffset + StartLocation.Size; while (StartIndexOffset + ChunkCount < ChunkIndexes.size()) { size_t NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount]; const BlockStoreLocation& Location = ChunkLocations[NextIndex]; ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex); if ((Location.Offset + Location.Size) > BlockFileSize) { break; } if (Location.Size > LargeSizeLimit) { break; } if (Location.Offset >= (LastEnd + IterateSmallChunkMaxGapSize)) { break; } if ((Location.Offset + Location.Size) - StartOffset > IterateSmallChunkWindowSize) { break; } LastEnd = Location.Offset + Location.Size; ++ChunkCount; } return ChunkCount; }; RwLock::SharedLockScope InsertLock(m_InsertLock); auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); if (FindBlockIt == m_ChunkBlocks.end()) { InsertLock.ReleaseNow(); ZEN_LOG_SCOPE("block #{} not available", BlockIndex); for (size_t ChunkIndex : ChunkIndexes) { if (!SmallSizeCallback(ChunkIndex, nullptr, 0)) { return false; } } } else { Ref BlockFile = FindBlockIt->second; ZEN_ASSERT(BlockFile); InsertLock.ReleaseNow(); const size_t BlockSize = BlockFile->FileSize(); IoBuffer ReadBuffer; void* BufferBase = nullptr; size_t LocationIndexOffset = 0; while (LocationIndexOffset < ChunkIndexes.size()) { size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; ZEN_ASSERT(ChunkIndex < ChunkLocations.size()); const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; ZEN_ASSERT(FirstLocation.BlockIndex == BlockIndex); const size_t RangeCount = GetNextRange(BlockSize, ChunkIndexes, LocationIndexOffset); if (RangeCount > 1) { size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; ZEN_ASSERT(LastChunkIndex < ChunkLocations.size()); const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; if (ReadBuffer.GetSize() < Size) { ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize)); BufferBase = ReadBuffer.MutableData(); } BlockFile->Read(BufferBase, Size, FirstLocation.Offset); for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; ZEN_ASSERT(NextChunkIndex < ChunkLocations.size()); const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) { ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", ChunkLocation.Offset, ChunkLocation.Size, BlockIndex, BlockSize); if (!SmallSizeCallback(NextChunkIndex, nullptr, 0)) { return false; } continue; } void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; if (!SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size)) { return false; } } LocationIndexOffset += RangeCount; continue; } if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) { ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", FirstLocation.Offset, FirstLocation.Size, BlockIndex, BlockSize); SmallSizeCallback(ChunkIndex, nullptr, 0); LocationIndexOffset++; continue; } if (!LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size)) { return false; } LocationIndexOffset++; } } return true; } bool BlockStore::IterateChunks(const std::span& ChunkLocations, const IterateChunksCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::IterateChunks"); Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_DEBUG("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); std::vector ChunkOrder(ChunkLocations.size()); for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { ChunkOrder[ChunkIndex] = ChunkIndex; } std::sort(ChunkOrder.begin(), ChunkOrder.end(), [&ChunkLocations](const size_t Lhs, const size_t Rhs) { return ChunkLocations[Lhs].BlockIndex < ChunkLocations[Rhs].BlockIndex; }); size_t RangeStart = 0; size_t RangeEnd = 0; const std::span ChunkIndexRange(ChunkOrder); while (RangeStart < ChunkOrder.size()) { const size_t ChunkIndex = ChunkOrder[RangeStart]; const uint32_t BlockIndex = ChunkLocations[ChunkIndex].BlockIndex; RangeEnd++; while (RangeEnd < ChunkOrder.size()) { const size_t NextChunkIndex = ChunkOrder[RangeEnd]; if (ChunkLocations[NextChunkIndex].BlockIndex != BlockIndex) { break; } ++RangeEnd; } ZEN_LOG_SCOPE("iterating chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex)); if (!Callback(BlockIndex, ChunkIndexRange.subspan(RangeStart, RangeEnd - RangeStart))) { return false; } RangeStart = RangeEnd; } return true; } void BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, uint32_t PayloadAlignment, const CompactCallback& ChangeCallback, const ClaimDiskReserveCallback& DiskReserveCallback, std::string_view LogPrefix) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::CompactBlocks"); uint64_t DeletedSize = 0; uint64_t MovedCount = 0; uint64_t MovedSize = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG("{}Compact blocks for '{}' DONE after {}, deleted {} and moved {} chunks ({}) ", LogPrefix, m_BlocksBasePath, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceBytes(DeletedSize), MovedCount, NiceBytes(MovedSize)); }); uint32_t NewBlockIndex = 0; MovedChunksArray MovedChunks; ChunkIndexArray ScrubbedChunks; uint64_t AddedSize = 0; uint64_t RemovedSize = 0; Ref NewBlockFile; uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block std::unique_ptr TargetFileBuffer; auto NewBlockFileGuard = MakeGuard([&]() { TargetFileBuffer.reset(); if (NewBlockFile) { { RwLock::ExclusiveLockScope _l(m_InsertLock); if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile) { m_ChunkBlocks.erase(NewBlockIndex); } } if (NewBlockFile->IsOpen()) { ZEN_DEBUG("{}Dropping incomplete cas block store file '{}'", LogPrefix, NewBlockFile->GetPath()); NewBlockFile->MarkAsDeleteOnClose(); } } }); auto ReportChanges = [&]() -> bool { bool Continue = true; if (!MovedChunks.empty() || !ScrubbedChunks.empty() || RemovedSize > 0) { Continue = ChangeCallback(MovedChunks, ScrubbedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0); DeletedSize += RemovedSize; m_TotalSize.fetch_add(AddedSize); RemovedSize = 0; AddedSize = 0; MovedCount += MovedChunks.size(); MovedChunks.clear(); ScrubbedChunks.clear(); } return Continue; }; std::vector RemovedBlocks; CompactState.IterateBlocks([&](uint32_t BlockIndex, const std::vector& KeepChunkIndexes, const std::vector& ChunkLocations) -> bool { ZEN_TRACE_CPU("BlockStore::CompactBlock"); Ref OldBlockFile; { RwLock::SharedLockScope _(m_InsertLock); if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock) { ZEN_ERROR("{}compact Block was requested to rewrite the currently active write block in '{}', Block index {}", LogPrefix, m_BlocksBasePath, BlockIndex); return false; } auto It = m_ChunkBlocks.find(BlockIndex); if (It == m_ChunkBlocks.end()) { ZEN_WARN("{}compact Block was requested to rewrite an unknown block in '{}', Block index {}", LogPrefix, m_BlocksBasePath, BlockIndex); ScrubbedChunks.insert(ScrubbedChunks.end(), KeepChunkIndexes.begin(), KeepChunkIndexes.end()); return true; } if (!It->second) { ZEN_WARN("{}compact Block was requested to rewrite a deleted block in '{}', Block index {}", LogPrefix, m_BlocksBasePath, BlockIndex); ScrubbedChunks.insert(ScrubbedChunks.end(), KeepChunkIndexes.begin(), KeepChunkIndexes.end()); return true; } OldBlockFile = It->second; } ZEN_ASSERT(OldBlockFile); uint64_t OldBlockSize = OldBlockFile->FileSize(); if (KeepChunkIndexes.empty()) { ZEN_INFO("{}dropped all chunks from '{}', freeing {}", LogPrefix, GetBlockPath(m_BlocksBasePath, BlockIndex).filename(), NiceBytes(OldBlockSize)); } else { std::vector SortedChunkIndexes(KeepChunkIndexes); std::sort(SortedChunkIndexes.begin(), SortedChunkIndexes.end(), [&ChunkLocations](size_t Lhs, size_t Rhs) { return ChunkLocations[Lhs].Offset < ChunkLocations[Rhs].Offset; }); BasicFileBuffer SourceFileBuffer(OldBlockFile->GetBasicFile(), Min(256u * 1024u, OldBlockSize)); uint64_t MovedFromBlock = 0; std::vector ChunkBuffer; for (const size_t& ChunkIndex : SortedChunkIndexes) { const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) { ZEN_WARN( "{}compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " "size {}", LogPrefix, m_BlocksBasePath, ChunkLocation.Offset, ChunkLocation.Size, OldBlockFile->GetPath(), OldBlockSize); ScrubbedChunks.push_back(ChunkIndex); continue; } MemoryView ChunkView = SourceFileBuffer.MakeView(ChunkLocation.Size, ChunkLocation.Offset); if (ChunkView.GetSize() != ChunkLocation.Size) { ChunkBuffer.resize(ChunkLocation.Size); SourceFileBuffer.Read(ChunkBuffer.data(), ChunkLocation.Size, ChunkLocation.Offset); ChunkView = MemoryView(ChunkBuffer.data(), ChunkLocation.Size); } if ((RoundUp(WriteOffset, PayloadAlignment) + ChunkView.GetSize()) > m_MaxBlockSize) { if (TargetFileBuffer) { TargetFileBuffer->Flush(); TargetFileBuffer.reset(); } if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; ZEN_ASSERT(!MovedChunks.empty() || (RemovedSize > 0 || OldBlockSize == 0)); // We should not have a new block if we haven't moved anything ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize)); #ifndef NDEBUG for (const std::pair& MovedChunk : MovedChunks) { ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex); ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize); } #endif // NDEBUG if (!ReportChanges()) { return false; } } uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); { RwLock::ExclusiveLockScope InsertLock(m_InsertLock); std::filesystem::path NewBlockPath; NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); if (NextBlockIndex == (uint32_t)m_MaxBlockCount) { ZEN_ERROR("{}unable to allocate a new block in '{}', count limit {} exeeded", LogPrefix, m_BlocksBasePath, static_cast(std::numeric_limits::max()) + 1); return false; } NewBlockFile = new BlockStoreFile(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); if (Error) { ZEN_ERROR("{}get disk space in '{}' FAILED, reason: '{}'", LogPrefix, m_BlocksBasePath, Error.message()); { RwLock::ExclusiveLockScope _l(m_InsertLock); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); } ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); NewBlockFile = nullptr; return false; } if (Space.Free < m_MaxBlockSize) { uint64_t ReclaimedSpace = DiskReserveCallback(); if (Space.Free + ReclaimedSpace < m_MaxBlockSize) { ZEN_WARN("{}garbage collect for '{}' FAILED, required disk space {}, free {}", LogPrefix, m_BlocksBasePath, m_MaxBlockSize, NiceBytes(Space.Free + ReclaimedSpace)); { RwLock::ExclusiveLockScope _l(m_InsertLock); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); } ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); NewBlockFile = nullptr; return false; } ZEN_INFO("{}using gc reserve for '{}', reclaimed {}, disk free {}", LogPrefix, m_BlocksBasePath, ReclaimedSpace, NiceBytes(Space.Free + ReclaimedSpace)); } NewBlockFile->Create(m_MaxBlockSize); NewBlockIndex = NextBlockIndex; WriteOffset = 0; TargetFileBuffer = std::make_unique(*NewBlockFile, Min(256u * 1024u, m_MaxBlockSize)); } const uint64_t OldWriteOffset = WriteOffset; WriteOffset = TargetFileBuffer->Append(ChunkView.GetData(), ChunkLocation.Size, PayloadAlignment); MovedChunks.push_back( {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow(WriteOffset), .Size = ChunkLocation.Size}}); WriteOffset += ChunkLocation.Size; MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset; uint64_t WrittenBytes = WriteOffset - OldWriteOffset; AddedSize += WrittenBytes; } ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}", LogPrefix, KeepChunkIndexes.size(), NiceBytes(MovedFromBlock), GetBlockPath(m_BlocksBasePath, BlockIndex).filename(), OldBlockSize > MovedFromBlock ? NiceBytes(OldBlockSize - MovedFromBlock) : 0); } if (TargetFileBuffer) { TargetFileBuffer->Flush(); } #ifndef NDEBUG if (NewBlockFile) { const uint64_t NewBlockSize = NewBlockFile->FileSize(); for (const std::pair& MovedChunk : MovedChunks) { ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex); ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize); } } #endif // NDEBUG if (!ReportChanges()) { return false; } { RwLock::ExclusiveLockScope InsertLock(m_InsertLock); ZEN_DEBUG("{}marking cas block store file '{}' for delete ({})", LogPrefix, OldBlockFile->GetPath().filename(), NiceBytes(OldBlockSize)); m_TotalSize.fetch_sub(OldBlockSize); m_TotalSize.fetch_sub(OldBlockFile->MetaSize()); OldBlockFile->MarkAsDeleteOnClose(); m_ChunkBlocks.erase(BlockIndex); RemovedSize += OldBlockSize; } return true; }); if (TargetFileBuffer) { TargetFileBuffer->Flush(); TargetFileBuffer.reset(); } if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); const uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize)); #ifndef NDEBUG for (const std::pair& MovedChunk : MovedChunks) { ZEN_ASSERT(MovedChunk.second.BlockIndex == NewBlockIndex); ZEN_ASSERT(MovedChunk.second.Offset + MovedChunk.second.Size <= NewBlockSize); } #endif // NDEBUG } ReportChanges(); } const char* BlockStore::GetBlockFileExtension() { return ".ucas"; } std::filesystem::path BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) { ExtendablePathBuilder<256> Path; char BlockHexString[9]; ToHexNumber(BlockIndex, BlockHexString); Path.Append(BlocksBasePath); Path.AppendSeparator(); Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); Path.AppendSeparator(); Path.Append(BlockHexString); Path.Append(GetBlockFileExtension()); return Path.ToPath(); } bool BlockStore::IsWriting(uint32_t BlockIndex) const { RwLock::SharedLockScope _(m_InsertLock); if (std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), BlockIndex) != m_ActiveWriteBlocks.end()) { return true; } if (BlockIndex == m_WriteBlockIndex.load() && m_WriteBlock) { return true; } return false; } void BlockStore::SetMetaData(uint32_t BlockIndex, const IoBuffer& Payload) { RwLock::ExclusiveLockScope _(m_InsertLock); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && It->second) { uint64_t OldMetaSize = It->second->MetaSize(); if (It->second->SetMetaData(Payload)) { uint64_t NewMetaSize = It->second->MetaSize(); m_TotalSize += NewMetaSize; m_TotalSize -= OldMetaSize; } } } IoBuffer BlockStore::GetMetaData(uint32_t BlockIndex) const { RwLock::SharedLockScope _(m_InsertLock); if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && It->second) { return It->second->GetMetaData(); } return {}; } #if ZEN_WITH_TESTS TEST_CASE("blockstore.blockstoredisklocation") { BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits::max()}; CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); BlockStoreLocation MaxBlockIndexAndOffset = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = std::numeric_limits::max()}; CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4096, .Size = std::numeric_limits::max()}; CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, .Size = std::numeric_limits::max() / 2}; CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); } TEST_CASE("blockstore.blockfile") { ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path() / "blocks"; CreateDirectories(RootDirectory); { BlockStoreFile File1(RootDirectory / "1"); File1.Create(16384); CHECK(File1.FileSize() == 0); File1.Write("data", 5, 0); IoBuffer DataChunk = File1.GetChunk(0, 5); File1.Write("boop", 5, 5); IoBuffer BoopChunk = File1.GetChunk(5, 5); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); File1.Flush(); CHECK(File1.FileSize() == 10); } { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); char DataRaw[5]; File1.Read(DataRaw, 5, 0); CHECK(std::string(DataRaw) == "data"); IoBuffer DataChunk = File1.GetChunk(0, 5); char BoopRaw[5]; File1.Read(BoopRaw, 5, 5); CHECK(std::string(BoopRaw) == "boop"); IoBuffer BoopChunk = File1.GetChunk(5, 5); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } { IoBuffer DataChunk; IoBuffer BoopChunk; { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); DataChunk = File1.GetChunk(0, 5); BoopChunk = File1.GetChunk(5, 5); } CHECK(IsFile(RootDirectory / "1")); const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } CHECK(IsFile(RootDirectory / "1")); { IoBuffer DataChunk; IoBuffer BoopChunk; { BlockStoreFile File1(RootDirectory / "1"); File1.Open(); File1.MarkAsDeleteOnClose(); DataChunk = File1.GetChunk(0, 5); BoopChunk = File1.GetChunk(5, 5); } const char* Data = static_cast(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } CHECK(!IsFile(RootDirectory / "1")); } namespace blockstore::impl { BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, uint32_t PayloadAlignment) { BlockStoreLocation Location; Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); CHECK(Location.Size == String.length()); return Location; }; std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) { IoBuffer ChunkData = Store.TryGetChunk(Location); if (!ChunkData) { return ""; } std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); return AsString; }; std::vector GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) { DirectoryContent DirectoryContent; GetDirectoryContent(RootDir, DirectoryContentFlags::Recursive | (Files ? DirectoryContentFlags::IncludeFiles : DirectoryContentFlags::None) | (Directories ? DirectoryContentFlags::IncludeDirs : DirectoryContentFlags::None), DirectoryContent); std::vector Result; Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end()); return Result; }; } // namespace blockstore::impl TEST_CASE("blockstore.multichunks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory, 128, 1024); std::vector MultiChunkData; std::string FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123"; MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size())); std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456"; MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size())); std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890"; MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size())); BlockStoreLocation Locations[5]; size_t ChunkOffset = 0; Store.WriteChunks(MultiChunkData, 4, [&](std::span InLocations) { for (const BlockStoreLocation& Location : InLocations) { Locations[ChunkOffset++] = Location; } CHECK(ChunkOffset <= MultiChunkData.size()); }); CHECK(ChunkOffset == 3); CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData); CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex); CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData); CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex); CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData); MultiChunkData.resize(0); std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23"; MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size())); std::string FifthChunkData = "ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93"; MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size())); Store.WriteChunks(MultiChunkData, 4, [&](std::span InLocations) { for (const BlockStoreLocation& Location : InLocations) { CHECK(ChunkOffset < 5); Locations[ChunkOffset++] = Location; } }); CHECK(ChunkOffset == 5); CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData); CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex); CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData); CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex); CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData); CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex); CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData); CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex); CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData); } TEST_CASE("blockstore.chunks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory, 128, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); } TEST_CASE("blockstore.flush.force.new.block") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 128, 1024); std::string FirstChunkData = "This is the data of the first chunk that we will write"; WriteStringAsChunk(Store, FirstChunkData, 4); Store.Flush(/*ForceNewBlock*/ true); std::string SecondChunkData = "This is the data for the second chunk that we will write"; WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ true); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; WriteStringAsChunk(Store, ThirdChunkData, 4); CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); } TEST_CASE("blockstore.iterate.chunks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", DefaultIterateSmallChunkWindowSize * 2, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ false); std::string VeryLargeChunk(DefaultIterateSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, .Offset = DefaultIterateSmallChunkWindowSize, .Size = DefaultIterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 4u)); std::vector Locations{FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}; Latch WorkLatch(1); Store.IterateChunks(Locations, [&](uint32_t, std::span ChunkIndexes) -> bool { WorkLatch.AddCount(1); WorkerPool.ScheduleWork( [&, ChunkIndexes = std::vector(ChunkIndexes.begin(), ChunkIndexes.end())]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); bool Continue = Store.IterateBlock( Locations, ChunkIndexes, [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { switch (ChunkIndex) { case 0: CHECK(Data); CHECK(Size == FirstChunkData.size()); CHECK(std::string((const char*)Data, Size) == FirstChunkData); break; case 1: CHECK(Data); CHECK(Size == SecondChunkData.size()); CHECK(std::string((const char*)Data, Size) == SecondChunkData); break; case 2: CHECK(false); break; case 3: CHECK(!Data); break; case 4: CHECK(!Data); break; case 5: CHECK(!Data); break; default: CHECK(false); break; } return true; }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { switch (ChunkIndex) { case 0: case 1: CHECK(false); break; case 2: { CHECK(Size == VeryLargeChunk.size()); char* Buffer = new char[Size]; size_t HashOffset = 0; File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { memcpy(&Buffer[HashOffset], Data, Size); HashOffset += Size; }); CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); delete[] Buffer; } break; case 3: CHECK(false); break; case 4: CHECK(false); break; case 5: CHECK(false); break; default: CHECK(false); break; } return true; }, 0); CHECK(Continue); }, WorkerThreadPool::EMode::DisableBacklog); return true; }); WorkLatch.CountDown(); WorkLatch.Wait(); } TEST_CASE("blockstore.thread.read.write") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024); constexpr size_t ChunkCount = 500; constexpr size_t Alignment = 8; std::vector Chunks; std::vector ChunkHashes; Chunks.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex / 2); Chunks.push_back(Chunk); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunkLocations; ChunkLocations.resize(ChunkCount); WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 8u)); { std::atomic WorkCompleted = 0; Latch L(1); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { L.AddCount(1); WorkerPool.ScheduleWork( [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted, &L]() { auto _ = MakeGuard([&L]() { L.CountDown(); }); IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } L.CountDown(); L.Wait(); CHECK(WorkCompleted == Chunks.size()); } { std::atomic WorkCompleted = 0; Latch L(1); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { L.AddCount(1); WorkerPool.ScheduleWork( [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() { auto _ = MakeGuard([&L]() { L.CountDown(); }); IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } L.CountDown(); L.Wait(); CHECK(WorkCompleted == Chunks.size()); } std::vector SecondChunkLocations; SecondChunkLocations.resize(ChunkCount); { std::atomic WorkCompleted = 0; Latch L(1); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { L.AddCount(1); WorkerPool.ScheduleWork( [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted, &L]() { auto _ = MakeGuard([&L]() { L.CountDown(); }); IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { SecondChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); L.AddCount(1); WorkerPool.ScheduleWork( [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() { auto _ = MakeGuard([&L]() { L.CountDown(); }); IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); CHECK(VerifyChunk); IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); CHECK(VerifyHash == ChunkHashes[ChunkIndex]); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } L.CountDown(); L.Wait(); CHECK(WorkCompleted == Chunks.size() * 2); } } TEST_CASE("blockstore.compact.blocks") { using namespace blockstore::impl; ScopedTemporaryDirectory TempDir; auto RootDirectory = TempDir.Path(); BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024); constexpr size_t ChunkCount = 200; constexpr size_t Alignment = 8; std::vector Chunks; std::vector ChunkHashes; Chunks.reserve(ChunkCount); ChunkHashes.reserve(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex / 2); Chunks.push_back(Chunk); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } std::vector ChunkLocations; ChunkLocations.resize(ChunkCount); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer& Chunk = Chunks[ChunkIndex]; Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); } SUBCASE("touch nothing") { uint64_t PreSize = Store.TotalSize(); CHECK(PreSize > 0); BlockStoreCompactState State; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&, uint64_t) { CHECK(false); return true; }, []() { CHECK(false); return 0; }); CHECK_EQ(PreSize, Store.TotalSize()); } SUBCASE("keep nothing") { Store.Flush(true); uint64_t PreSize = Store.TotalSize(); CHECK(PreSize > 0); BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { State.IncludeBlock(Location.BlockIndex); } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, const BlockStore::ChunkIndexArray& Scrubbed, uint64_t Removed) { RemovedSize += Removed; CHECK(Moved.empty()); CHECK(Scrubbed.empty()); return true; }, []() { return 0; }); CHECK_EQ(RemovedSize, PreSize); CHECK_EQ(0u, Store.TotalSize()); } SUBCASE("keep current write block") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { if (Store.IsWriting(Location.BlockIndex)) { continue; } State.IncludeBlock(Location.BlockIndex); } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, const BlockStore::ChunkIndexArray& Scrubbed, uint64_t Removed) { RemovedSize += Removed; CHECK(Moved.empty()); CHECK(Scrubbed.empty()); return true; }, []() { return 0; }); CHECK_EQ(Store.TotalSize() + RemovedSize, PreSize); CHECK_LE(Store.TotalSize(), 1088); CHECK_GT(Store.TotalSize(), 0); } SUBCASE("keep everything") { Store.Flush(true); uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { State.AddKeepLocation(Location); } Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&, uint64_t) { CHECK(false); return true; }, []() { CHECK(false); return 0; }); CHECK_EQ(Store.TotalSize(), PreSize); } SUBCASE("drop first block") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; CHECK(!Store.IsWriting(0)); State.IncludeBlock(0); uint64_t FirstBlockSize = 0; for (const BlockStoreLocation& Location : ChunkLocations) { if (Location.BlockIndex == 0) { FirstBlockSize = Max(FirstBlockSize, Location.Offset + Location.Size); } } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, const BlockStore::ChunkIndexArray& Scrubbed, uint64_t Removed) { CHECK(Moved.empty()); CHECK(Scrubbed.empty()); RemovedSize += Removed; return true; }, []() { CHECK(false); return 0; }); CHECK_EQ(FirstBlockSize, RemovedSize); CHECK_EQ(Store.TotalSize(), PreSize - FirstBlockSize); } SUBCASE("compact first block") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; CHECK(!Store.IsWriting(0)); State.IncludeBlock(0); uint64_t SkipChunkCount = 2; std::vector DroppedLocations(ChunkLocations.begin(), ChunkLocations.begin() + 2); for (auto It = ChunkLocations.begin() + 2; It != ChunkLocations.end(); It++) { const BlockStoreLocation& Location = *It; if (Location.BlockIndex != 0) { continue; } State.AddKeepLocation(Location); } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, const BlockStore::ChunkIndexArray& Scrubbed, uint64_t Removed) { CHECK(Scrubbed.empty()); for (const auto& Move : Moved) { const BlockStoreLocation& OldLocation = State.GetLocation(Move.first); CHECK(OldLocation.BlockIndex == 0); // Only move from block 0 CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), OldLocation) == DroppedLocations.end()); auto It = std::find(ChunkLocations.begin(), ChunkLocations.end(), OldLocation); CHECK(It != ChunkLocations.end()); (*It) = Move.second; } RemovedSize += Removed; return true; }, []() { CHECK(false); return 0; }); SkipChunkCount = 2; for (size_t Index = 0; Index < ChunkLocations.size(); Index++) { const BlockStoreLocation& Location = ChunkLocations[Index]; if (Location.BlockIndex == 0 && SkipChunkCount > 0) { CHECK(!Store.TryGetChunk(Location)); continue; } IoBuffer Buffer = Store.TryGetChunk(Location); CHECK(Buffer); IoHash RawHash = IoHash::HashBuffer(Buffer.Data(), Buffer.Size()); CHECK_EQ(ChunkHashes[Index], RawHash); } CHECK_LT(Store.TotalSize(), PreSize); } SUBCASE("compact every other item") { uint64_t PreSize = Store.TotalSize(); BlockStoreCompactState State; bool SkipFlag = false; for (const BlockStoreLocation& Location : ChunkLocations) { if (Store.IsWriting(Location.BlockIndex)) { continue; } if (SkipFlag) { State.IncludeBlock(Location.BlockIndex); SkipFlag = false; continue; } SkipFlag = true; } SkipFlag = false; std::vector DroppedLocations; for (const BlockStoreLocation& Location : ChunkLocations) { if (Store.IsWriting(Location.BlockIndex)) { continue; } if (SkipFlag) { DroppedLocations.push_back(Location); SkipFlag = false; continue; } State.AddKeepLocation(Location); SkipFlag = true; } uint64_t RemovedSize = 0; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray& Moved, const BlockStore::ChunkIndexArray& Scrubbed, uint64_t Removed) { CHECK(Scrubbed.empty()); for (const auto& Move : Moved) { const BlockStoreLocation& OldLocation = State.GetLocation(Move.first); CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), OldLocation) == DroppedLocations.end()); auto It = std::find(ChunkLocations.begin(), ChunkLocations.end(), OldLocation); CHECK(It != ChunkLocations.end()); (*It) = Move.second; } RemovedSize += Removed; return true; }, []() { CHECK(false); return 0; }); SkipFlag = false; for (size_t Index = 0; Index < ChunkLocations.size(); Index++) { const BlockStoreLocation& Location = ChunkLocations[Index]; if (SkipFlag && !Store.IsWriting(Location.BlockIndex)) { CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), Location) != DroppedLocations.end()); CHECK(!Store.TryGetChunk(Location)); SkipFlag = false; continue; } IoBuffer Buffer = Store.TryGetChunk(Location); CHECK(Buffer); IoHash RawHash = IoHash::HashBuffer(Buffer.Data(), Buffer.Size()); CHECK_EQ(ChunkHashes[Index], RawHash); SkipFlag = true; } CHECK_LT(Store.TotalSize(), PreSize); } SUBCASE("scrub") { Store.Flush(true); BlockStoreCompactState State; for (const BlockStoreLocation& Location : ChunkLocations) { State.IncludeBlock(Location.BlockIndex); CHECK(State.AddKeepLocation(Location)); } State.IncludeBlock(0); State.IncludeBlock(999); std::vector ExpectedScrubbedIndexes; ExpectedScrubbedIndexes.push_back(ChunkLocations.size() + 0); State.AddKeepLocation(BlockStoreLocation{.BlockIndex = 0, .Offset = 2000, .Size = 322}); ExpectedScrubbedIndexes.push_back(ChunkLocations.size() + 1); State.AddKeepLocation(BlockStoreLocation{.BlockIndex = 0, .Offset = 10, .Size = 3220}); ExpectedScrubbedIndexes.push_back(ChunkLocations.size() + 2); State.AddKeepLocation(BlockStoreLocation{.BlockIndex = 999, .Offset = 2, .Size = 40}); std::vector ScrubbedIndexes; Store.CompactBlocks( State, Alignment, [&](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray& ScrubbedArray, uint64_t) { ScrubbedIndexes.insert(ScrubbedIndexes.end(), ScrubbedArray.begin(), ScrubbedArray.end()); return true; }, []() { CHECK(false); return 0; }); std::sort(ScrubbedIndexes.begin(), ScrubbedIndexes.end()); CHECK_EQ(ExpectedScrubbedIndexes, ScrubbedIndexes); } } TEST_CASE("blockstore.BlockStoreFileAppender") { const uint64_t kMaxChunkSize = 32u * 1024u; const size_t kBlobCount = 753u; const size_t kAlignment = 32u; ScopedTemporaryDirectory Tmp; BlockStoreFile Block(Tmp.Path() / "block.0"); Block.Create(kMaxChunkSize * kBlobCount); std::vector BlobHashes; std::vector> BlobOffsetAndSize; BlobHashes.reserve(kBlobCount); BlobOffsetAndSize.reserve(kBlobCount); { BlockStoreFileAppender Appender(Block, 16u * 1024u); FastRandom Random; for (size_t Index = 0; Index < kBlobCount; Index++) { IoBuffer Blob = CreateRandomBlob(Random, Random.Next() % 32u * 1024u + 64u); IoHash BlobHash = IoHash::HashBuffer(Blob); BlobHashes.push_back(BlobHash); uint64_t Offset = Appender.Append(Blob.GetData(), Blob.GetSize(), kAlignment); BlobOffsetAndSize.push_back({Offset, Blob.GetSize()}); } } for (size_t Index = 0; Index < kBlobCount; Index++) { const IoHash& BlobHash = BlobHashes[Index]; uint64_t Offset = BlobOffsetAndSize[Index].first; uint64_t Size = BlobOffsetAndSize[Index].second; IoBuffer Blob = Block.GetChunk(Offset, Size); IoHash VerifyBlobHash = IoHash::HashBuffer(Blob); CHECK(BlobHash == VerifyBlobHash); BlobOffsetAndSize.push_back({Offset, Blob.GetSize()}); } } #endif void blockstore_forcelink() { } } // namespace zen